ping-pong-ev_and_asio.cpp 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. //
  2. // Copyright (c) 2019-2022 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. /*
  7. *
  8. * The example below works because the lifetimes of supervisor's is known apriory, i.e.
  9. * sup_ev is shutted down while sup_asio is still operational. As the last don't have
  10. * more job to do, we initiate it's shutdown. In other words, shutdown is partly
  11. * controlled outside of actors the the sake of simplification.
  12. *
  13. */
  14. #include <boost/asio.hpp>
  15. #include <rotor/asio.hpp>
  16. #include <rotor/ev.hpp>
  17. #include <iostream>
  18. #include <iomanip>
  19. #include <chrono>
  20. #include <cstdlib>
  21. #include <thread>
  22. namespace asio = boost::asio;
  23. namespace payload {
  24. struct pong_t {};
  25. struct ping_t {
  26. using response_t = pong_t;
  27. };
  28. } // namespace payload
  29. namespace message {
  30. using ping_t = rotor::request_traits_t<payload::ping_t>::request::message_t;
  31. using pong_t = rotor::request_traits_t<payload::ping_t>::response::message_t;
  32. } // namespace message
  33. static const char ponger_name[] = "service:ponger";
  34. static const auto timeout = boost::posix_time::milliseconds{500};
  35. struct pinger_t : public rotor::actor_base_t {
  36. using timepoint_t = std::chrono::time_point<std::chrono::high_resolution_clock>;
  37. using rotor::actor_base_t::actor_base_t;
  38. void set_pings(std::size_t pings) {
  39. pings_left = pings;
  40. pings_count = pings;
  41. }
  42. void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
  43. rotor::actor_base_t::configure(plugin);
  44. plugin.with_casted<rotor::plugin::starter_plugin_t>([](auto &p) {
  45. std::cout << "pinger_t::configure, subscribing on_pong\n";
  46. p.subscribe_actor(&pinger_t::on_pong);
  47. });
  48. plugin.with_casted<rotor::plugin::registry_plugin_t>([&](auto &p) {
  49. p.discover_name(ponger_name, ponger_addr).link(true).callback([](auto phase, auto &ec) {
  50. auto p = (phase == rotor::plugin::registry_plugin_t::phase_t::linking) ? "link" : "discovery";
  51. std::cout << "executing " << p << " in accordance with ponger : " << (!ec ? "yes" : "no") << "\n";
  52. });
  53. });
  54. }
  55. void on_start() noexcept override {
  56. rotor::actor_base_t::on_start();
  57. std::cout << "pings start (" << pings_left << ")\n";
  58. start = std::chrono::high_resolution_clock::now();
  59. send_ping();
  60. }
  61. void shutdown_finish() noexcept override {
  62. rotor::actor_base_t::shutdown_finish();
  63. std::cout << "pinger_t::shutdown_finish\n";
  64. do_shutdown();
  65. ponger_addr.reset(); // do not hold reference to to ponger's supervisor
  66. }
  67. void on_pong(message::pong_t &reply) noexcept {
  68. auto &ee = reply.payload.ee;
  69. if (ee) {
  70. std::cout << "pong error: " << ee->message() << "\n";
  71. }
  72. // std::cout << "pinger_t::on_pong\n";
  73. send_ping();
  74. }
  75. private:
  76. void send_ping() {
  77. if (pings_left) {
  78. request<payload::ping_t>(ponger_addr).send(timeout);
  79. --pings_left;
  80. } else {
  81. using namespace std::chrono;
  82. auto end = high_resolution_clock::now();
  83. std::chrono::duration<double> diff = end - start;
  84. double freq = ((double)pings_count) / diff.count();
  85. std::cout << "pings complete (" << pings_left << ") in " << diff.count() << "s"
  86. << ", freq = " << std::fixed << std::setprecision(10) << freq << ", real freq = " << std::fixed
  87. << std::setprecision(10) << freq * 2 << "\n";
  88. do_shutdown();
  89. }
  90. }
  91. timepoint_t start;
  92. rotor::address_ptr_t ponger_addr;
  93. std::size_t pings_left;
  94. std::size_t pings_count;
  95. };
  96. struct ponger_t : public rotor::actor_base_t {
  97. using rotor::actor_base_t::actor_base_t;
  98. void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
  99. rotor::actor_base_t::configure(plugin);
  100. plugin.with_casted<rotor::plugin::starter_plugin_t>([](auto &p) {
  101. std::cout << "ponger_t::configure, subscribing on_ping\n";
  102. p.subscribe_actor(&ponger_t::on_ping);
  103. });
  104. plugin.with_casted<rotor::plugin::registry_plugin_t>([&](auto &p) {
  105. std::cout << "ponger_t::configure, registering name\n";
  106. p.register_name(ponger_name, address);
  107. });
  108. }
  109. void on_ping(message::ping_t &ping) noexcept { reply_to(ping); }
  110. void shutdown_finish() noexcept override {
  111. rotor::actor_base_t::shutdown_finish();
  112. std::cout << "ponger_t::shutdown_finish\n";
  113. }
  114. };
  115. int main(int argc, char **argv) {
  116. try {
  117. std::uint32_t count = 10000;
  118. if (argc > 1) {
  119. count = static_cast<std::uint32_t>(std::atoi(argv[1]));
  120. }
  121. asio::io_context io_ctx;
  122. auto sys_ctx_asio = rotor::asio::system_context_asio_t::ptr_t{new rotor::asio::system_context_asio_t(io_ctx)};
  123. auto strand = std::make_shared<asio::io_context::strand>(io_ctx);
  124. auto *loop = ev_loop_new(0);
  125. auto sys_ctx_ev = rotor::ev::system_context_ptr_t{new rotor::ev::system_context_ev_t()};
  126. auto sup_asio = sys_ctx_asio->create_supervisor<rotor::asio::supervisor_asio_t>()
  127. .strand(strand)
  128. .timeout(timeout)
  129. .create_registry(true)
  130. .guard_context(true)
  131. .finish();
  132. auto sup_ev = sys_ctx_ev->create_supervisor<rotor::ev::supervisor_ev_t>()
  133. .loop(loop)
  134. .loop_ownership(true) /* let supervisor takes ownership on the loop */
  135. .timeout(timeout)
  136. .registry_address(sup_asio->get_registry_address())
  137. .finish();
  138. auto pinger = sup_ev->create_actor<pinger_t>().autoshutdown_supervisor().timeout(timeout).finish();
  139. auto ponger = sup_asio->create_actor<ponger_t>().timeout(timeout).finish();
  140. pinger->set_pings(count);
  141. sup_asio->start();
  142. sup_ev->start();
  143. // explain why
  144. auto thread_asio = std::thread([&] { io_ctx.run(); });
  145. ev_run(loop);
  146. sup_asio->shutdown();
  147. thread_asio.join();
  148. sup_asio->do_process();
  149. sup_ev->do_process();
  150. std::cout << "finalization complete\n";
  151. } catch (const std::exception &ex) {
  152. std::cout << "exception : " << ex.what();
  153. }
  154. std::cout << "exiting...\n";
  155. return 0;
  156. }