ping-pong-ev_and_asio.cpp 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. //
  2. // Copyright (c) 2019 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. /*
  7. * This is a little bit simplified example, as we don't supervise actor's synchonization.
  8. * In the real-world application the proper actions will be:
  9. * 1. pinger should wait ponger start, and only after that, start pinging it.
  10. * 2. after finishing it's job, pinger should trigger ponger shutdown
  11. * 3. pinger should wait ponger shutdown confirmation
  12. * 4. pinger should release(reset) ponger's address and initiate it's own supervisor shutdown.
  13. *
  14. * If (1) is ommitted, then a few messages(pings) might be lost. If (2)..(4) are omitted, then
  15. * it might lead to memory leak.
  16. *
  17. * The example below works because the lifetimes of supervisor's is known apriory, i.e.
  18. * sup_ev is shutted down while sup_asio is still operational. As the last don't have
  19. * more job to do, we initiate it's shutdown. In other words, shutdown is partly
  20. * controlled outside of actors the the sake of simplification.
  21. *
  22. */
  23. #include <boost/asio.hpp>
  24. #include <rotor/asio.hpp>
  25. #include <rotor/ev.hpp>
  26. #include <iostream>
  27. #include <iomanip>
  28. #include <chrono>
  29. #include <cstdlib>
  30. #include <thread>
  31. namespace asio = boost::asio;
  32. struct ping_t {};
  33. struct pong_t {};
  34. struct pinger_t : public rotor::actor_base_t {
  35. using timepoint_t = std::chrono::time_point<std::chrono::high_resolution_clock>;
  36. pinger_t(rotor::supervisor_t &sup, std::size_t pings)
  37. : rotor::actor_base_t{sup}, pings_left{pings}, pings_count{pings} {}
  38. void set_ponger_addr(const rotor::address_ptr_t &addr) { ponger_addr = addr; }
  39. void init_start() noexcept override {
  40. std::cout << "pinger_t::on_initialize\n";
  41. subscribe(&pinger_t::on_pong);
  42. rotor::actor_base_t::init_start();
  43. }
  44. void on_start(rotor::message_t<rotor::payload::start_actor_t> &) noexcept override {
  45. std::cout << "pings start (" << pings_left << ")\n";
  46. start = std::chrono::high_resolution_clock::now();
  47. send_ping();
  48. }
  49. void on_pong(rotor::message_t<pong_t> &) noexcept {
  50. // std::cout << "pinger_t::on_pong\n";
  51. send_ping();
  52. }
  53. private:
  54. void send_ping() {
  55. if (pings_left) {
  56. send<ping_t>(ponger_addr);
  57. --pings_left;
  58. } else {
  59. using namespace std::chrono;
  60. auto end = high_resolution_clock::now();
  61. std::chrono::duration<double> diff = end - start;
  62. double freq = ((double)pings_count) / diff.count();
  63. std::cout << "pings finishes (" << pings_left << ") in " << diff.count() << "s"
  64. << ", freq = " << std::fixed << std::setprecision(10) << freq << ", real freq = " << std::fixed
  65. << std::setprecision(10) << freq * 2 << "\n";
  66. supervisor.shutdown();
  67. ponger_addr.reset(); // do not hold reference to to ponger's supervisor
  68. // send<rotor::payload::shutdown_request_t>(ponger_addr->supervisor.get_address(), address);
  69. }
  70. }
  71. timepoint_t start;
  72. rotor::address_ptr_t ponger_addr;
  73. std::size_t pings_left;
  74. std::size_t pings_count;
  75. };
  76. struct ponger_t : public rotor::actor_base_t {
  77. ponger_t(rotor::supervisor_t &sup) : rotor::actor_base_t{sup} {}
  78. void set_pinger_addr(const rotor::address_ptr_t &addr) { pinger_addr = addr; }
  79. void init_start() noexcept override {
  80. std::cout << "ponger_t::on_initialize\n";
  81. subscribe(&ponger_t::on_ping);
  82. rotor::actor_base_t::init_start();
  83. }
  84. void on_ping(rotor::message_t<ping_t> &) noexcept { send<pong_t>(pinger_addr); }
  85. private:
  86. rotor::address_ptr_t pinger_addr;
  87. };
  88. int main(int argc, char **argv) {
  89. try {
  90. std::uint32_t count = 10000;
  91. if (argc > 1) {
  92. count = static_cast<std::uint32_t>(std::atoi(argv[1]));
  93. }
  94. asio::io_context io_ctx;
  95. auto timeout = boost::posix_time::milliseconds{500};
  96. auto asio_guard = asio::make_work_guard(io_ctx);
  97. auto sys_ctx_asio = rotor::asio::system_context_asio_t::ptr_t{new rotor::asio::system_context_asio_t(io_ctx)};
  98. auto stand = std::make_shared<asio::io_context::strand>(io_ctx);
  99. rotor::asio::supervisor_config_asio_t conf_asio{timeout, std::move(stand)};
  100. auto *loop = ev_loop_new(0);
  101. auto sys_ctx_ev = rotor::ev::system_context_ev_t::ptr_t{new rotor::ev::system_context_ev_t()};
  102. auto conf_ev = rotor::ev::supervisor_config_ev_t{
  103. timeout, loop, true, /* let supervisor takes ownership on the loop */
  104. };
  105. auto sup_ev = sys_ctx_ev->create_supervisor<rotor::ev::supervisor_ev_t>(conf_ev);
  106. auto sup_asio = sys_ctx_asio->create_supervisor<rotor::asio::supervisor_asio_t>(conf_asio);
  107. auto pinger = sup_ev->create_actor<pinger_t>(timeout, count);
  108. auto ponger = sup_asio->create_actor<ponger_t>(timeout);
  109. pinger->set_ponger_addr(ponger->get_address());
  110. ponger->set_pinger_addr(pinger->get_address());
  111. sup_asio->start();
  112. sup_ev->start();
  113. auto thread_asio = std::thread([&] { io_ctx.run(); });
  114. ev_run(loop);
  115. sup_asio->shutdown();
  116. asio_guard.reset();
  117. thread_asio.join();
  118. std::cout << "main execution complete\n";
  119. sup_asio->do_process();
  120. sup_ev->do_process();
  121. std::cout << "finalization complete\n";
  122. } catch (const std::exception &ex) {
  123. std::cout << "exception : " << ex.what();
  124. }
  125. std::cout << "exiting...\n";
  126. return 0;
  127. }