sha512.cpp 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. //
  2. // Copyright (c) 2019-2021 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. /*
  7. * This is an example how to implement interruptible blocking operations using
  8. * std::thread backend. Here, as blocking I/O operation the the reading disk
  9. * file and calculating its sha512 digest is used.
  10. *
  11. * The whole work is split into pieces, and once a piece is complete the
  12. * continuation message (with the whole job state) is send to the next
  13. * piece to be processed and so on. Between continuation messages other
  14. * messages might appear (in the case, the shutdown message) or timers
  15. * might be triggered.
  16. *
  17. * This is an example of blocking messages multiplexing pattern.
  18. *
  19. * The "ctrl+c" can be anytime pressed on the terminal, and the program
  20. * will correctly shutdown (including sanitizer build). Try it!
  21. *
  22. */
  23. #include "rotor.hpp"
  24. #include "rotor/thread.hpp"
  25. #include <cstdint>
  26. #include <string>
  27. #include <iostream>
  28. #include <fstream>
  29. #include <memory>
  30. #include <atomic>
  31. #include <openssl/sha.h>
  32. #ifndef _WIN32
  33. #include <signal.h>
  34. #endif
  35. namespace r = rotor;
  36. namespace rth = rotor::thread;
  37. using buffer_t = std::vector<std::byte>;
  38. enum class work_result_t { done, completed, errored };
  39. struct work_t {
  40. work_t(std::ifstream &&in_, size_t file_size_, size_t buff_sz_)
  41. : in(std::move(in_)), file_size{file_size_}, buff(buff_sz_) {
  42. if (SHA512_Init(&sha_ctx) != 1) {
  43. error = "fail to init sha";
  44. }
  45. }
  46. std::string get_error() const noexcept {
  47. assert(error.size() && "has error");
  48. return error;
  49. }
  50. std::string get_result() const noexcept {
  51. assert(error.size() == 0 && "has no error");
  52. assert(result.size() != 0 && "has result");
  53. return result;
  54. }
  55. work_result_t io() noexcept {
  56. if (error.size()) {
  57. return work_result_t::errored;
  58. }
  59. auto bytes_left = file_size - bytes_read;
  60. auto final = bytes_left < buff.size();
  61. auto bytes_to_read = final ? bytes_left : buff.size();
  62. in.read(reinterpret_cast<char *>(buff.data()), bytes_to_read);
  63. if (!in) {
  64. error = "reading file error";
  65. return work_result_t::errored;
  66. }
  67. // printf("read %llu bytes\n", bytes_to_read);
  68. bytes_read += bytes_to_read;
  69. auto r = SHA512_Update(&sha_ctx, buff.data(), bytes_to_read);
  70. if (r != 1) {
  71. error = "sha update failed";
  72. return work_result_t::errored;
  73. }
  74. if (!final) {
  75. return work_result_t::done;
  76. }
  77. unsigned char digest[SHA512_DIGEST_LENGTH];
  78. r = SHA512_Final(digest, &sha_ctx);
  79. if (r != 1) {
  80. error = "sha final failed";
  81. return work_result_t::errored;
  82. }
  83. result = std::string((char *)digest, SHA512_DIGEST_LENGTH);
  84. return work_result_t::completed;
  85. }
  86. private:
  87. std::ifstream in;
  88. size_t file_size;
  89. buffer_t buff;
  90. size_t bytes_read = 0;
  91. SHA512_CTX sha_ctx;
  92. std::string error;
  93. std::string result;
  94. };
  95. namespace payload {
  96. struct work_progress_t {
  97. std::unique_ptr<work_t> work;
  98. };
  99. } // namespace payload
  100. namespace message {
  101. using work_progress_t = r::message_t<payload::work_progress_t>;
  102. }
  103. struct sah_actor_config : r::actor_config_t {
  104. std::string path = "";
  105. std::size_t block_size = 0;
  106. };
  107. template <typename Actor> struct sah_actor_config_builder_t : r::actor_config_builder_t<Actor> {
  108. using builder_t = typename Actor::template config_builder_t<Actor>;
  109. using parent_t = r::actor_config_builder_t<Actor>;
  110. using parent_t::parent_t;
  111. builder_t &&path(const std::string &value) &&noexcept {
  112. parent_t::config.path = value;
  113. return std::move(*static_cast<typename parent_t::builder_t *>(this));
  114. }
  115. builder_t &&block_size(std::size_t value) &&noexcept {
  116. parent_t::config.block_size = value;
  117. return std::move(*static_cast<typename parent_t::builder_t *>(this));
  118. }
  119. };
  120. struct sha_actor_t : public r::actor_base_t {
  121. using config_t = sah_actor_config;
  122. template <typename Actor> using config_builder_t = sah_actor_config_builder_t<Actor>;
  123. explicit sha_actor_t(config_t &cfg) : r::actor_base_t{cfg}, path{cfg.path}, block_size{cfg.block_size} {}
  124. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  125. r::actor_base_t::configure(plugin);
  126. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  127. p.subscribe_actor(&sha_actor_t::on_process)->tag_io(); // important
  128. });
  129. }
  130. void on_start() noexcept override {
  131. rotor::actor_base_t::on_start();
  132. std::ifstream in(path, std::ifstream::ate | std::ifstream::binary);
  133. if (!in.is_open()) {
  134. std::cout << "failed to open " << path << '\n';
  135. return supervisor->do_shutdown();
  136. }
  137. auto sz = in.tellg();
  138. in = std::ifstream(path, std::ifstream::binary);
  139. auto work = std::make_unique<work_t>(std::move(in), sz, block_size);
  140. send<payload::work_progress_t>(address, payload::work_progress_t{std::move(work)});
  141. }
  142. private:
  143. std::string path;
  144. std::size_t block_size;
  145. void print_result(const work_t &work) noexcept {
  146. auto r = work.get_result();
  147. const std::byte *buff = reinterpret_cast<const std::byte *>(r.data());
  148. for (size_t i = 0; i < r.size(); ++i) {
  149. std::cout << std::hex << std::setfill('0') << std::setw(2) << (unsigned)buff[i];
  150. }
  151. std::cout << "\n";
  152. }
  153. void on_process(message::work_progress_t &msg) noexcept {
  154. auto &work = msg.payload.work;
  155. auto result = msg.payload.work->io();
  156. switch (result) {
  157. case work_result_t::done:
  158. send<payload::work_progress_t>(address, payload::work_progress_t{std::move(work)});
  159. break;
  160. case work_result_t::errored:
  161. std::cout << "error: " << work->get_error() << "\n";
  162. supervisor->do_shutdown();
  163. break;
  164. case work_result_t::completed:
  165. print_result(*work);
  166. supervisor->do_shutdown();
  167. break;
  168. }
  169. }
  170. };
  171. #ifndef _WIN32
  172. std::atomic_bool shutdown_flag = false;
  173. #endif
  174. int main(int argc, char **argv) {
  175. std::string path = argv[0];
  176. if (argc < 2) {
  177. std::cout << "usage:: " << argv[0] << " /path/to/file [block_size = 1048576]\n";
  178. std::cout << "will calculate for " << argv[0] << "\n";
  179. } else {
  180. path = argv[1];
  181. }
  182. size_t block_size = 1048576;
  183. if (argc == 3) {
  184. try {
  185. block_size = static_cast<size_t>(std::stoll(argv[2]));
  186. } catch (...) {
  187. std::cout << "can't convert '" << argv[2] << "', using default one\n";
  188. }
  189. }
  190. rth::system_context_thread_t ctx;
  191. auto timeout = boost::posix_time::milliseconds{100};
  192. auto sup = ctx.create_supervisor<rth::supervisor_thread_t>().timeout(timeout).finish();
  193. auto act = sup->create_actor<sha_actor_t>().block_size(block_size).path(path).timeout(timeout).finish();
  194. #ifndef _WIN32
  195. struct sigaction action;
  196. action.sa_handler = [](int) { shutdown_flag = true; };
  197. if (sigaction(SIGINT, &action, nullptr) != 0) {
  198. std::cout << "critical :: cannot set signal handler\n";
  199. return -1;
  200. }
  201. auto console_thread = std::thread([&] {
  202. while (!shutdown_flag) {
  203. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  204. }
  205. std::cout << "going to terminate...\n";
  206. sup->shutdown();
  207. });
  208. #endif
  209. ctx.run();
  210. #ifndef _WIN32
  211. shutdown_flag = true;
  212. console_thread.join();
  213. #endif
  214. std::cout << "normal exit\n";
  215. return 0;
  216. }