ping-pong-ev_and_asio.cpp 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. //
  2. // Copyright (c) 2019-2022 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. /*
  7. *
  8. * The example below works because the lifetimes of supervisor's is known a priory, i.e.
  9. * sup_ev is shutted down while sup_asio is still operational. As the last don't have
  10. * more job to do, we initiate it's shutdown. In other words, shutdown is partly
  11. * controlled outside of actors the the sake of simplification.
  12. *
  13. */
  14. #include <boost/asio.hpp>
  15. #include <rotor/asio.hpp>
  16. #include <rotor/ev.hpp>
  17. #include <iostream>
  18. #include <iomanip>
  19. #include <chrono>
  20. #include <cstdlib>
  21. #include <thread>
  22. #include <boost/asio/detail/winsock_init.hpp> // for calling WSAStartup on Windows
  23. namespace asio = boost::asio;
  24. namespace payload {
  25. struct pong_t {};
  26. struct ping_t {
  27. using response_t = pong_t;
  28. };
  29. } // namespace payload
  30. namespace message {
  31. using ping_t = rotor::request_traits_t<payload::ping_t>::request::message_t;
  32. using pong_t = rotor::request_traits_t<payload::ping_t>::response::message_t;
  33. } // namespace message
  34. static const char ponger_name[] = "service:ponger";
  35. static const auto timeout = boost::posix_time::milliseconds{500};
  36. struct pinger_t : public rotor::actor_base_t {
  37. using timepoint_t = std::chrono::time_point<std::chrono::high_resolution_clock>;
  38. using rotor::actor_base_t::actor_base_t;
  39. void set_pings(std::size_t pings) {
  40. pings_left = pings;
  41. pings_count = pings;
  42. }
  43. void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
  44. rotor::actor_base_t::configure(plugin);
  45. plugin.with_casted<rotor::plugin::starter_plugin_t>([](auto &p) {
  46. std::cout << "pinger_t::configure, subscribing on_pong\n";
  47. p.subscribe_actor(&pinger_t::on_pong);
  48. });
  49. plugin.with_casted<rotor::plugin::registry_plugin_t>([&](auto &p) {
  50. p.discover_name(ponger_name, ponger_addr).link(true).callback([](auto phase, auto &ec) {
  51. auto p = (phase == rotor::plugin::registry_plugin_t::phase_t::linking) ? "link" : "discovery";
  52. std::cout << "executing " << p << " in accordance with ponger : " << (!ec ? "yes" : "no") << "\n";
  53. });
  54. });
  55. }
  56. void on_start() noexcept override {
  57. rotor::actor_base_t::on_start();
  58. std::cout << "pings start (" << pings_left << ")\n";
  59. start = std::chrono::high_resolution_clock::now();
  60. send_ping();
  61. }
  62. void shutdown_finish() noexcept override {
  63. rotor::actor_base_t::shutdown_finish();
  64. std::cout << "pinger_t::shutdown_finish\n";
  65. do_shutdown();
  66. ponger_addr.reset(); // do not hold reference to to ponger's supervisor
  67. }
  68. void on_pong(message::pong_t &reply) noexcept {
  69. auto &ee = reply.payload.ee;
  70. if (ee) {
  71. std::cout << "pong error: " << ee->message() << "\n";
  72. }
  73. // std::cout << "pinger_t::on_pong\n";
  74. send_ping();
  75. }
  76. private:
  77. void send_ping() {
  78. if (pings_left) {
  79. request<payload::ping_t>(ponger_addr).send(timeout);
  80. --pings_left;
  81. } else {
  82. using namespace std::chrono;
  83. auto end = high_resolution_clock::now();
  84. std::chrono::duration<double> diff = end - start;
  85. double freq = ((double)pings_count) / diff.count();
  86. std::cout << "pings complete (" << pings_left << ") in " << diff.count() << "s"
  87. << ", freq = " << std::fixed << std::setprecision(10) << freq << ", real freq = " << std::fixed
  88. << std::setprecision(10) << freq * 2 << "\n";
  89. do_shutdown();
  90. }
  91. }
  92. timepoint_t start;
  93. rotor::address_ptr_t ponger_addr;
  94. std::size_t pings_left;
  95. std::size_t pings_count;
  96. };
  97. struct ponger_t : public rotor::actor_base_t {
  98. using rotor::actor_base_t::actor_base_t;
  99. void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
  100. rotor::actor_base_t::configure(plugin);
  101. plugin.with_casted<rotor::plugin::starter_plugin_t>([](auto &p) {
  102. std::cout << "ponger_t::configure, subscribing on_ping\n";
  103. p.subscribe_actor(&ponger_t::on_ping);
  104. });
  105. plugin.with_casted<rotor::plugin::registry_plugin_t>([&](auto &p) {
  106. std::cout << "ponger_t::configure, registering name\n";
  107. p.register_name(ponger_name, address);
  108. });
  109. }
  110. void on_ping(message::ping_t &ping) noexcept { reply_to(ping); }
  111. void shutdown_finish() noexcept override {
  112. rotor::actor_base_t::shutdown_finish();
  113. std::cout << "ponger_t::shutdown_finish\n";
  114. }
  115. };
  116. int main(int argc, char **argv) {
  117. try {
  118. std::uint32_t count = 10000;
  119. if (argc > 1) {
  120. count = static_cast<std::uint32_t>(std::atoi(argv[1]));
  121. }
  122. asio::io_context io_ctx;
  123. auto sys_ctx_asio = rotor::asio::system_context_asio_t::ptr_t{new rotor::asio::system_context_asio_t(io_ctx)};
  124. auto strand = std::make_shared<asio::io_context::strand>(io_ctx);
  125. auto *loop = ev_loop_new(0);
  126. auto sys_ctx_ev = rotor::ev::system_context_ptr_t{new rotor::ev::system_context_ev_t()};
  127. auto sup_asio = sys_ctx_asio->create_supervisor<rotor::asio::supervisor_asio_t>()
  128. .strand(strand)
  129. .timeout(timeout)
  130. .create_registry(true)
  131. .guard_context(true)
  132. .finish();
  133. auto sup_ev = sys_ctx_ev->create_supervisor<rotor::ev::supervisor_ev_t>()
  134. .loop(loop)
  135. .loop_ownership(true) /* let supervisor takes ownership on the loop */
  136. .timeout(timeout)
  137. .registry_address(sup_asio->get_registry_address())
  138. .finish();
  139. auto pinger = sup_ev->create_actor<pinger_t>().autoshutdown_supervisor().timeout(timeout).finish();
  140. auto ponger = sup_asio->create_actor<ponger_t>().timeout(timeout).finish();
  141. pinger->set_pings(count);
  142. sup_asio->start();
  143. sup_ev->start();
  144. // explain why
  145. auto thread_asio = std::thread([&] { io_ctx.run(); });
  146. ev_run(loop);
  147. sup_asio->shutdown();
  148. thread_asio.join();
  149. sup_asio->do_process();
  150. sup_ev->do_process();
  151. std::cout << "finalization complete\n";
  152. } catch (const std::exception &ex) {
  153. std::cout << "exception : " << ex.what();
  154. }
  155. std::cout << "exiting...\n";
  156. return 0;
  157. }