ping-pong-comp.cpp 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. //
  2. // Copyright (c) 2019-2020 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. #include <rotor/ev.hpp>
  7. #include <iostream>
  8. #include <random>
  9. #include <unordered_map>
  10. namespace payload {
  11. struct pong_t {};
  12. struct ping_t {
  13. using response_t = pong_t;
  14. };
  15. } // namespace payload
  16. namespace message {
  17. using ping_t = rotor::request_traits_t<payload::ping_t>::request::message_t;
  18. using pong_t = rotor::request_traits_t<payload::ping_t>::response::message_t;
  19. } // namespace message
  20. struct shared_context_t {
  21. std::size_t pings_left;
  22. std::size_t pings_success = 0;
  23. std::size_t pings_error = 0;
  24. };
  25. struct pinger_t : public rotor::actor_base_t {
  26. using map_t = std::unordered_map<rotor::address_ptr_t, shared_context_t>;
  27. using rotor::actor_base_t::actor_base_t;
  28. void set_ponger_addr1(const rotor::address_ptr_t &addr) { ponger_addr1 = addr; }
  29. void set_ponger_addr2(const rotor::address_ptr_t &addr) { ponger_addr2 = addr; }
  30. void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
  31. rotor::actor_base_t::configure(plugin);
  32. plugin.with_casted<rotor::plugin::starter_plugin_t>([&](auto &p) {
  33. if (!reply_addr)
  34. reply_addr = create_address();
  35. p.subscribe_actor(&pinger_t::on_pong, reply_addr);
  36. });
  37. }
  38. void on_start() noexcept override {
  39. rotor::actor_base_t::on_start();
  40. request_via<payload::ping_t>(ponger_addr1, reply_addr).send(rotor::pt::seconds(1));
  41. request_via<payload::ping_t>(ponger_addr2, reply_addr).send(rotor::pt::seconds(1));
  42. request_map.emplace(reply_addr, shared_context_t{2});
  43. }
  44. void on_pong(message::pong_t &msg) noexcept {
  45. auto &ctx = request_map[msg.address];
  46. --ctx.pings_left;
  47. auto &ec = msg.payload.ee;
  48. if (ec) {
  49. ++ctx.pings_error;
  50. } else {
  51. ++ctx.pings_success;
  52. }
  53. if (!ctx.pings_left) {
  54. std::cout << "success: " << ctx.pings_success << ", errors: " << ctx.pings_error << "\n";
  55. // optional cleanup
  56. request_map.erase(msg.address);
  57. supervisor->do_shutdown();
  58. }
  59. }
  60. map_t request_map;
  61. rotor::address_ptr_t ponger_addr1;
  62. rotor::address_ptr_t ponger_addr2;
  63. rotor::address_ptr_t reply_addr;
  64. };
  65. struct ponger_t : public rotor::actor_base_t {
  66. using generator_t = std::mt19937;
  67. using distrbution_t = std::uniform_real_distribution<double>;
  68. std::random_device rd;
  69. generator_t gen;
  70. distrbution_t dist;
  71. explicit ponger_t(config_t &cfg) : rotor::actor_base_t(cfg), gen(rd()) {}
  72. void configure(rotor::plugin::plugin_base_t &plugin) noexcept override {
  73. rotor::actor_base_t::configure(plugin);
  74. plugin.with_casted<rotor::plugin::starter_plugin_t>([](auto &p) { p.subscribe_actor(&ponger_t::on_ping); });
  75. }
  76. void on_ping(message::ping_t &req) noexcept {
  77. auto dice = dist(gen);
  78. std::cout << "pong, dice = " << dice << std::endl;
  79. if (dice > 0.5) {
  80. reply_to(req);
  81. }
  82. }
  83. };
  84. int main() {
  85. try {
  86. auto *loop = ev_loop_new(0);
  87. auto system_context = rotor::ev::system_context_ptr_t{new rotor::ev::system_context_ev_t()};
  88. auto timeout = boost::posix_time::milliseconds{10};
  89. auto sup = system_context->create_supervisor<rotor::ev::supervisor_ev_t>()
  90. .loop(loop)
  91. .loop_ownership(true) /* let supervisor takes ownership on the loop */
  92. .timeout(timeout)
  93. .finish();
  94. auto pinger = sup->create_actor<pinger_t>().timeout(timeout).finish();
  95. auto ponger1 = sup->create_actor<ponger_t>().timeout(timeout).finish();
  96. auto ponger2 = sup->create_actor<ponger_t>().timeout(timeout).finish();
  97. pinger->set_ponger_addr1(ponger1->get_address());
  98. pinger->set_ponger_addr2(ponger2->get_address());
  99. sup->start();
  100. ev_run(loop);
  101. } catch (const std::exception &ex) {
  102. std::cout << "exception : " << ex.what();
  103. }
  104. std::cout << "exiting...\n";
  105. return 0;
  106. }