ping-pong-timer.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. //
  2. // Copyright (c) 2019-2021 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. #include "rotor.hpp"
  7. #include "rotor/asio.hpp"
  8. #include <boost/asio.hpp>
  9. #include <boost/lexical_cast.hpp>
  10. #include <chrono>
  11. #include <functional>
  12. #include <iomanip>
  13. #include <iostream>
  14. #include <memory>
  15. #include <random>
  16. #include <type_traits>
  17. #include <utility>
  18. #include <optional>
  19. #include <unordered_map>
  20. namespace asio = boost::asio;
  21. namespace pt = boost::posix_time;
  22. namespace ra = rotor::asio;
  23. namespace constants {
  24. static float failure_probability = 0.70f;
  25. static pt::time_duration ping_timeout = pt::milliseconds{100};
  26. static pt::time_duration ping_reply_base = pt::milliseconds{50};
  27. static pt::time_duration check_interval = pt::milliseconds{3000};
  28. static std::uint32_t ping_reply_scale = 70;
  29. } // namespace constants
  30. namespace resource {
  31. rotor::plugin::resource_id_t timer = 0;
  32. rotor::plugin::resource_id_t ping = 1;
  33. } // namespace resource
  34. namespace payload {
  35. struct pong_t {};
  36. struct ping_t {
  37. using response_t = pong_t;
  38. };
  39. } // namespace payload
  40. namespace message {
  41. using ping_t = rotor::request_traits_t<payload::ping_t>::request::message_t;
  42. using pong_t = rotor::request_traits_t<payload::ping_t>::response::message_t;
  43. using cancel_t = rotor::request_traits_t<payload::ping_t>::cancel::message_t;
  44. } // namespace message
  45. struct pinger_t : public rotor::actor_base_t {
  46. using rotor::actor_base_t::actor_base_t;
  47. void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
  48. rotor::actor_base_t::configure(plugin);
  49. plugin.with_casted<rotor::plugin::starter_plugin_t>([](auto &p) { p.subscribe_actor(&pinger_t::on_pong); });
  50. plugin.with_casted<rotor::plugin::registry_plugin_t>(
  51. [&](auto &p) { p.discover_name("ponger", ponger_addr, true).link(); });
  52. }
  53. void on_start() noexcept override {
  54. rotor::actor_base_t::on_start();
  55. do_ping();
  56. timer_id = start_timer(constants::check_interval, *this, &pinger_t::on_custom_timeout);
  57. resources->acquire(resource::timer);
  58. }
  59. void do_ping() noexcept {
  60. resources->acquire(resource::ping);
  61. request_id = request<payload::ping_t>(ponger_addr).send(constants::ping_timeout);
  62. ++attempts;
  63. }
  64. void on_custom_timeout(rotor::request_id_t, bool cancelled) {
  65. resources->release(resource::timer);
  66. timer_id.reset();
  67. std::cout << "pinger_t, (" << (void *)this << "), on_custom_timeout, cancelled: " << cancelled << "\n";
  68. if (!cancelled) {
  69. do_shutdown();
  70. }
  71. }
  72. void shutdown_start() noexcept override {
  73. std::cout << "pinger_t, (" << (void *)this << ") shutdown_start() \n";
  74. if (request_id)
  75. send<message::cancel_t>(ponger_addr, get_address());
  76. if (timer_id) {
  77. cancel_timer(*timer_id);
  78. timer_id.reset();
  79. }
  80. rotor::actor_base_t::shutdown_start();
  81. }
  82. void shutdown_finish() noexcept override {
  83. std::cout << "pinger_t, (" << (void *)this << ") finished attempts done " << attempts << "\n";
  84. rotor::actor_base_t::shutdown_finish();
  85. }
  86. void on_pong(message::pong_t &msg) noexcept {
  87. resources->release(resource::ping);
  88. request_id.reset();
  89. auto &ec = msg.payload.ee;
  90. if (!ec) {
  91. std::cout << "pinger_t, (" << (void *)this << ") success!, pong received, attemps : " << attempts << "\n";
  92. do_shutdown();
  93. } else {
  94. std::cout << "pinger_t, (" << (void *)this << ") pong failed (" << attempts << ")\n";
  95. if (timer_id) {
  96. do_ping();
  97. }
  98. }
  99. }
  100. std::optional<rotor::request_id_t> timer_id;
  101. std::optional<rotor::request_id_t> request_id;
  102. std::uint32_t attempts = 0;
  103. rotor::address_ptr_t ponger_addr;
  104. };
  105. struct ponger_t : public rotor::actor_base_t {
  106. using generator_t = std::mt19937;
  107. using distrbution_t = std::uniform_real_distribution<double>;
  108. using message_ptr_t = rotor::intrusive_ptr_t<message::ping_t>;
  109. using requests_map_t = std::unordered_map<rotor::request_id_t, message_ptr_t>;
  110. std::random_device rd;
  111. generator_t gen;
  112. distrbution_t dist;
  113. requests_map_t requests;
  114. explicit ponger_t(config_t &cfg) : rotor::actor_base_t(cfg), gen(rd()) {}
  115. void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
  116. rotor::actor_base_t::configure(plugin);
  117. plugin.with_casted<rotor::plugin::starter_plugin_t>([](auto &p) {
  118. p.subscribe_actor(&ponger_t::on_ping);
  119. p.subscribe_actor(&ponger_t::on_cancel);
  120. });
  121. plugin.with_casted<rotor::plugin::registry_plugin_t>(
  122. [&](auto &p) { p.register_name("ponger", get_address()); });
  123. }
  124. void on_ping(message::ping_t &req) noexcept {
  125. if (state != rotor::state_t::OPERATIONAL) {
  126. auto ec = rotor::make_error_code(rotor::error_code_t::cancelled);
  127. reply_with_error(req, make_error(ec));
  128. return;
  129. }
  130. auto dice = constants::ping_reply_scale * dist(gen);
  131. pt::time_duration reply_after = constants::ping_reply_base + pt::millisec{(int)dice};
  132. auto timer_id = start_timer(reply_after, *this, &ponger_t::on_ping_timer);
  133. resources->acquire(resource::timer);
  134. requests.emplace(timer_id, message_ptr_t(&req));
  135. }
  136. void on_cancel(message::cancel_t &notify) noexcept {
  137. auto request_id = notify.payload.id;
  138. auto &source = notify.payload.source;
  139. std::cout << "cancellation notify\n";
  140. auto predicate = [&](auto &it) {
  141. return it.second->payload.id == request_id && it.second->payload.origin == source;
  142. };
  143. auto it = std::find_if(requests.begin(), requests.end(), predicate);
  144. if (it != requests.end()) {
  145. cancel_timer(it->first);
  146. }
  147. }
  148. void on_ping_timer(rotor::request_id_t timer_id, bool cancelled) noexcept {
  149. resources->release(resource::timer);
  150. auto it = requests.find(timer_id);
  151. if (!cancelled) {
  152. auto dice = dist(gen);
  153. if (dice > constants::failure_probability) {
  154. auto &msg = it->second;
  155. reply_to(*msg);
  156. }
  157. } else {
  158. auto ec = rotor::make_error_code(rotor::error_code_t::cancelled);
  159. reply_with_error(*it->second, make_error(ec));
  160. }
  161. requests.erase(it);
  162. }
  163. void shutdown_start() noexcept override {
  164. while (!requests.empty()) {
  165. auto &timer_id = requests.begin()->first;
  166. cancel_timer(timer_id);
  167. }
  168. rotor::actor_base_t::shutdown_start();
  169. }
  170. void shutdown_finish() noexcept override {
  171. std::cout << "ponger_t, shutdown_finish\n";
  172. rotor::actor_base_t::shutdown_finish();
  173. }
  174. };
  175. struct custom_supervisor_t : ra::supervisor_asio_t {
  176. using ra::supervisor_asio_t::supervisor_asio_t;
  177. void on_child_shutdown(actor_base_t *) noexcept override {
  178. if (state < rotor::state_t::SHUTTING_DOWN) {
  179. do_shutdown();
  180. }
  181. }
  182. void shutdown_finish() noexcept override {
  183. ra::supervisor_asio_t::shutdown_finish();
  184. strand->context().stop();
  185. }
  186. };
  187. std::atomic_bool shutdown_flag = false;
  188. int main() {
  189. asio::io_context io_context;
  190. auto system_context = rotor::asio::system_context_asio_t::ptr_t{new rotor::asio::system_context_asio_t(io_context)};
  191. auto strand = std::make_shared<asio::io_context::strand>(io_context);
  192. auto timeout = pt::milliseconds{50};
  193. auto sup = system_context->create_supervisor<custom_supervisor_t>()
  194. .strand(strand)
  195. .create_registry()
  196. .timeout(timeout)
  197. .guard_context(false)
  198. .finish();
  199. // sup->create_actor<pinger_t>().timeout(timeout).finish();
  200. sup->create_actor<pinger_t>().timeout(timeout).finish();
  201. sup->create_actor<ponger_t>().timeout(timeout).finish();
  202. sup->start();
  203. #ifndef _WIN32
  204. struct sigaction act;
  205. act.sa_handler = [](int) { shutdown_flag = true; };
  206. if (sigaction(SIGINT, &act, nullptr) != 0) {
  207. std::cout << "critical :: cannot set signal handler\n";
  208. return -1;
  209. }
  210. auto console_thread = std::thread([&] {
  211. while (!shutdown_flag) {
  212. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  213. }
  214. sup->shutdown();
  215. });
  216. #endif
  217. io_context.run();
  218. #ifndef _WIN32
  219. shutdown_flag = true;
  220. console_thread.join();
  221. #endif
  222. return 0;
  223. }
  224. /* output samples:
  225. (all ping failed)
  226. ./examples/boost-asio/ping-pong-timer
  227. pinger_t, (0x556d13bbd8a0) pong failed (1)
  228. pinger_t, (0x556d13bbd8a0) pong failed (2)
  229. pinger_t, (0x556d13bbd8a0) pong failed (3)
  230. pinger_t, (0x556d13bbd8a0) pong failed (4)
  231. pinger_t, (0x556d13bbd8a0) pong failed (5)
  232. pinger_t, (0x556d13bbd8a0) pong failed (6)
  233. pinger_t, (0x556d13bbd8a0) pong failed (7)
  234. pinger_t, (0x556d13bbd8a0) pong failed (8)
  235. pinger_t, (0x556d13bbd8a0) pong failed (9)
  236. pinger_t, (0x556d13bbd8a0) pong failed (10)
  237. pinger_t, (0x556d13bbd8a0) pong failed (11)
  238. pinger_t, (0x556d13bbd8a0) pong failed (12)
  239. pinger_t, (0x556d13bbd8a0) pong failed (13)
  240. pinger_t, (0x556d13bbd8a0) pong failed (14)
  241. pinger_t, (0x556d13bbd8a0) pong failed (15)
  242. pinger_t, (0x556d13bbd8a0) pong failed (16)
  243. pinger_t, (0x556d13bbd8a0) pong failed (17)
  244. pinger_t, (0x556d13bbd8a0) pong failed (18)
  245. pinger_t, (0x556d13bbd8a0) pong failed (19)
  246. pinger_t, (0x556d13bbd8a0) pong failed (20)
  247. pinger_t, (0x556d13bbd8a0) pong failed (21)
  248. pinger_t, (0x556d13bbd8a0) pong failed (22)
  249. pinger_t, (0x556d13bbd8a0) pong failed (23)
  250. pinger_t, (0x556d13bbd8a0) pong failed (24)
  251. pinger_t, (0x556d13bbd8a0) pong failed (25)
  252. pinger_t, (0x556d13bbd8a0) pong failed (26)
  253. pinger_t, (0x556d13bbd8a0) pong failed (27)
  254. pinger_t, (0x556d13bbd8a0) pong failed (28)
  255. pinger_t, (0x556d13bbd8a0) pong failed (29)
  256. pinger_t, (0x556d13bbd8a0), on_custom_timeout, cancelled: 0
  257. pinger_t, (0x556d13bbd8a0) shutdown_start()
  258. pinger_t, (0x556d13bbd8a0) pong failed (30)
  259. pinger_t, (0x556d13bbd8a0) finished attempts done 30
  260. ponger_t, shutdown_finish
  261. (11-th ping was successful)
  262. ./examples/boost-asio/ping-pong-timer
  263. pinger_t, (0x55f9f90048a0) pong failed (1)
  264. pinger_t, (0x55f9f90048a0) pong failed (2)
  265. pinger_t, (0x55f9f90048a0) pong failed (3)
  266. pinger_t, (0x55f9f90048a0) pong failed (4)
  267. pinger_t, (0x55f9f90048a0) pong failed (5)
  268. pinger_t, (0x55f9f90048a0) pong failed (6)
  269. pinger_t, (0x55f9f90048a0) pong failed (7)
  270. pinger_t, (0x55f9f90048a0) pong failed (8)
  271. pinger_t, (0x55f9f90048a0) pong failed (9)
  272. pinger_t, (0x55f9f90048a0) pong failed (10)
  273. pinger_t, (0x55f9f90048a0) success!, pong received, attemps : 11
  274. pinger_t, (0x55f9f90048a0) shutdown_start()
  275. pinger_t, (0x55f9f90048a0), on_custom_timeout, cancelled: 1
  276. pinger_t, (0x55f9f90048a0) finished attempts done 11
  277. ponger_t, shutdown_finish
  278. (premature termination via CTRL+C pressing)
  279. ./examples/boost-asio/ping-pong-timer
  280. pinger_t, (0x55d5d95d98a0) pong failed (1)
  281. pinger_t, (0x55d5d95d98a0) pong failed (2)
  282. pinger_t, (0x55d5d95d98a0) pong failed (3)
  283. pinger_t, (0x55d5d95d98a0) pong failed (4)
  284. pinger_t, (0x55d5d95d98a0) pong failed (5)
  285. pinger_t, (0x55d5d95d98a0) pong failed (6)
  286. ^Cpinger_t, (0x55d5d95d98a0) shutdown_start()
  287. pinger_t, (0x55d5d95d98a0), on_custom_timeout, cancelled: 1
  288. pinger_t, (0x55d5d95d98a0) pong failed (7)
  289. pinger_t, (0x55d5d95d98a0) finished attempts done 7
  290. ponger_t, shutdown_finish
  291. */