ping-pong-comp.cpp 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. //
  2. // Copyright (c) 2019-2022 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. #include <rotor/ev.hpp>
  7. #include <iostream>
  8. #include <random>
  9. #include <unordered_map>
  10. #include <boost/asio/detail/winsock_init.hpp> // for calling WSAStartup on Windows
  11. namespace payload {
  12. struct pong_t {};
  13. struct ping_t {
  14. using response_t = pong_t;
  15. };
  16. } // namespace payload
  17. namespace message {
  18. using ping_t = rotor::request_traits_t<payload::ping_t>::request::message_t;
  19. using pong_t = rotor::request_traits_t<payload::ping_t>::response::message_t;
  20. } // namespace message
  21. struct shared_context_t {
  22. std::size_t pings_left;
  23. std::size_t pings_success = 0;
  24. std::size_t pings_error = 0;
  25. };
  26. struct pinger_t : public rotor::actor_base_t {
  27. using map_t = std::unordered_map<rotor::address_ptr_t, shared_context_t>;
  28. using rotor::actor_base_t::actor_base_t;
  29. void set_ponger_addr1(const rotor::address_ptr_t &addr) { ponger_addr1 = addr; }
  30. void set_ponger_addr2(const rotor::address_ptr_t &addr) { ponger_addr2 = addr; }
  31. void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
  32. rotor::actor_base_t::configure(plugin);
  33. plugin.with_casted<rotor::plugin::starter_plugin_t>([&](auto &p) {
  34. if (!reply_addr)
  35. reply_addr = create_address();
  36. p.subscribe_actor(&pinger_t::on_pong, reply_addr);
  37. });
  38. }
  39. void on_start() noexcept override {
  40. rotor::actor_base_t::on_start();
  41. request_via<payload::ping_t>(ponger_addr1, reply_addr).send(rotor::pt::seconds(1));
  42. request_via<payload::ping_t>(ponger_addr2, reply_addr).send(rotor::pt::seconds(1));
  43. request_map.emplace(reply_addr, shared_context_t{2});
  44. }
  45. void on_pong(message::pong_t &msg) noexcept {
  46. auto &ctx = request_map[msg.address];
  47. --ctx.pings_left;
  48. auto &ec = msg.payload.ee;
  49. if (ec) {
  50. ++ctx.pings_error;
  51. } else {
  52. ++ctx.pings_success;
  53. }
  54. if (!ctx.pings_left) {
  55. std::cout << "success: " << ctx.pings_success << ", errors: " << ctx.pings_error << "\n";
  56. // optional cleanup
  57. request_map.erase(msg.address);
  58. do_shutdown();
  59. }
  60. }
  61. map_t request_map;
  62. rotor::address_ptr_t ponger_addr1;
  63. rotor::address_ptr_t ponger_addr2;
  64. rotor::address_ptr_t reply_addr;
  65. };
  66. struct ponger_t : public rotor::actor_base_t {
  67. using generator_t = std::mt19937;
  68. using distribution_t = std::uniform_real_distribution<double>;
  69. std::random_device rd;
  70. generator_t gen;
  71. distribution_t dist;
  72. explicit ponger_t(config_t &cfg) : rotor::actor_base_t(cfg), gen(rd()) {}
  73. void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
  74. rotor::actor_base_t::configure(plugin);
  75. plugin.with_casted<rotor::plugin::starter_plugin_t>([](auto &p) { p.subscribe_actor(&ponger_t::on_ping); });
  76. }
  77. void on_ping(message::ping_t &req) noexcept {
  78. auto dice = dist(gen);
  79. std::cout << "pong, dice = " << dice << std::endl;
  80. if (dice > 0.5) {
  81. reply_to(req);
  82. }
  83. }
  84. };
  85. int main() {
  86. try {
  87. auto *loop = ev_loop_new(0);
  88. auto system_context = rotor::ev::system_context_ptr_t{new rotor::ev::system_context_ev_t()};
  89. auto timeout = boost::posix_time::milliseconds{10};
  90. auto sup = system_context->create_supervisor<rotor::ev::supervisor_ev_t>()
  91. .loop(loop)
  92. .loop_ownership(true) /* let supervisor takes ownership on the loop */
  93. .timeout(timeout)
  94. .finish();
  95. auto pinger = sup->create_actor<pinger_t>().timeout(timeout).autoshutdown_supervisor().finish();
  96. auto ponger1 = sup->create_actor<ponger_t>().timeout(timeout).finish();
  97. auto ponger2 = sup->create_actor<ponger_t>().timeout(timeout).finish();
  98. pinger->set_ponger_addr1(ponger1->get_address());
  99. pinger->set_ponger_addr2(ponger2->get_address());
  100. sup->start();
  101. ev_run(loop);
  102. } catch (const std::exception &ex) {
  103. std::cout << "exception : " << ex.what();
  104. }
  105. std::cout << "exiting...\n";
  106. return 0;
  107. }