ping-pong-comp.cpp 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. //
  2. // Copyright (c) 2019 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. #include <rotor/ev.hpp>
  7. #include <iostream>
  8. #include <random>
  9. #include <unordered_map>
  10. namespace payload {
  11. struct pong_t {};
  12. struct ping_t {
  13. using response_t = pong_t;
  14. };
  15. } // namespace payload
  16. namespace message {
  17. using ping_t = rotor::request_traits_t<payload::ping_t>::request::message_t;
  18. using pong_t = rotor::request_traits_t<payload::ping_t>::response::message_t;
  19. } // namespace message
  20. struct shared_context_t {
  21. std::size_t pings_left;
  22. std::size_t pings_success = 0;
  23. std::size_t pings_error = 0;
  24. };
  25. struct pinger_t : public rotor::actor_base_t {
  26. using map_t = std::unordered_map<rotor::address_ptr_t, shared_context_t>;
  27. using rotor::actor_base_t::actor_base_t;
  28. void set_ponger_addr1(const rotor::address_ptr_t &addr) { ponger_addr1 = addr; }
  29. void set_ponger_addr2(const rotor::address_ptr_t &addr) { ponger_addr2 = addr; }
  30. void on_start(rotor::message_t<rotor::payload::start_actor_t> &) noexcept override {
  31. reply_addr = create_address();
  32. subscribe(&pinger_t::on_pong, reply_addr);
  33. request_via<payload::ping_t>(ponger_addr1, reply_addr).send(rotor::pt::seconds(1));
  34. request_via<payload::ping_t>(ponger_addr2, reply_addr).send(rotor::pt::seconds(1));
  35. request_map.emplace(reply_addr, shared_context_t{2});
  36. }
  37. void on_pong(message::pong_t &msg) noexcept {
  38. auto &ctx = request_map[msg.address];
  39. --ctx.pings_left;
  40. auto &ec = msg.payload.ec;
  41. if (ec) {
  42. ++ctx.pings_error;
  43. } else {
  44. ++ctx.pings_success;
  45. }
  46. if (!ctx.pings_left) {
  47. std::cout << "success: " << ctx.pings_success << ", errors: " << ctx.pings_error << "\n";
  48. // optional cleanup
  49. unsubscribe(&pinger_t::on_pong, reply_addr);
  50. request_map.erase(msg.address);
  51. supervisor.do_shutdown();
  52. }
  53. }
  54. map_t request_map;
  55. rotor::address_ptr_t ponger_addr1;
  56. rotor::address_ptr_t ponger_addr2;
  57. rotor::address_ptr_t reply_addr;
  58. };
  59. struct ponger_t : public rotor::actor_base_t {
  60. using generator_t = std::mt19937;
  61. using distrbution_t = std::uniform_real_distribution<double>;
  62. std::random_device rd;
  63. generator_t gen;
  64. distrbution_t dist;
  65. ponger_t(rotor::supervisor_t &sup) : rotor::actor_base_t{sup}, gen(rd()) {}
  66. void on_initialize(rotor::message::init_request_t &msg) noexcept override {
  67. rotor::actor_base_t::on_initialize(msg);
  68. subscribe(&ponger_t::on_ping);
  69. }
  70. void on_ping(message::ping_t &req) noexcept {
  71. auto dice = dist(gen);
  72. std::cout << "pong, dice = " << dice << std::endl;
  73. if (dice > 0.5) {
  74. reply_to(req);
  75. }
  76. }
  77. };
  78. int main() {
  79. try {
  80. auto *loop = ev_loop_new(0);
  81. auto system_context = rotor::ev::system_context_ev_t::ptr_t{new rotor::ev::system_context_ev_t()};
  82. auto timeout = boost::posix_time::milliseconds{10};
  83. auto conf = rotor::ev::supervisor_config_ev_t{
  84. timeout, loop, true, /* let supervisor takes ownership on the loop */
  85. };
  86. auto sup = system_context->create_supervisor<rotor::ev::supervisor_ev_t>(conf);
  87. auto pinger = sup->create_actor<pinger_t>(timeout);
  88. auto ponger1 = sup->create_actor<ponger_t>(timeout);
  89. auto ponger2 = sup->create_actor<ponger_t>(timeout);
  90. pinger->set_ponger_addr1(ponger1->get_address());
  91. pinger->set_ponger_addr2(ponger2->get_address());
  92. sup->start();
  93. ev_run(loop);
  94. } catch (const std::exception &ex) {
  95. std::cout << "exception : " << ex.what();
  96. }
  97. std::cout << "exiting...\n";
  98. return 0;
  99. }