pong-registry.cpp 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. //
  2. // Copyright (c) 2019 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. #include <rotor/ev.hpp>
  7. #include <rotor/registry.h>
  8. #include <iostream>
  9. #include <iomanip>
  10. #include <chrono>
  11. #include <cstdlib>
  12. namespace payload {
  13. struct pong_t {};
  14. struct ping_t {
  15. using response_t = pong_t;
  16. };
  17. } // namespace payload
  18. namespace message {
  19. using ping_t = rotor::request_traits_t<payload::ping_t>::request::message_t;
  20. using pong_t = rotor::request_traits_t<payload::ping_t>::response::message_t;
  21. } // namespace message
  22. static const char ponger_name[] = "service:ponger";
  23. static const auto timeout = boost::posix_time::milliseconds{5};
  24. struct pinger_t : public rotor::actor_base_t {
  25. using rotor::actor_base_t::actor_base_t;
  26. void set_registry_addr(const rotor::address_ptr_t &addr) { registry_addr = addr; }
  27. void init_start() noexcept override {
  28. subscribe(&pinger_t::on_discovery);
  29. subscribe(&pinger_t::on_status);
  30. subscribe(&pinger_t::on_pong);
  31. request<rotor::payload::discovery_request_t>(registry_addr, ponger_name).send(timeout);
  32. }
  33. void on_discovery(rotor::message::discovery_response_t &msg) noexcept {
  34. auto &ec = msg.payload.ec;
  35. if (ec) {
  36. std::cout << "ponger address wasn't found: " << ec.message() << "\n";
  37. if (attempts) {
  38. --attempts;
  39. std::cout << "lets try to discover ponger address again (" << attempts << " attempts left)\n";
  40. request<rotor::payload::discovery_request_t>(registry_addr, ponger_name).send(timeout);
  41. } else {
  42. supervisor.do_shutdown();
  43. }
  44. return;
  45. }
  46. unsubscribe(&pinger_t::on_discovery); // optional
  47. std::cout << "ponger address was succesfully discovered\n";
  48. ponger_addr = msg.payload.res.service_addr;
  49. auto ponger_sup_addr = ponger_addr->supervisor.get_address();
  50. request<rotor::payload::state_request_t>(ponger_sup_addr, ponger_addr).send(timeout);
  51. }
  52. void on_status(rotor::message::state_response_t &msg) noexcept {
  53. auto &ec = msg.payload.ec;
  54. if (ec) {
  55. std::cout << "ponger state cannot be determined: " << ec.message() << "\n";
  56. supervisor.do_shutdown();
  57. return;
  58. }
  59. auto state = msg.payload.res.state;
  60. if (state == rotor::state_t::OPERATIONAL) {
  61. std::cout << "ponger state is operational, continue pinger init\n";
  62. rotor::actor_base_t::init_start();
  63. } else {
  64. std::cout << "ponger state " << static_cast<int>(state) << " isnt operational\n";
  65. if (attempts) {
  66. --attempts;
  67. std::cout << "lets try to check the state again (" << attempts << " attempts left)\n";
  68. auto ponger_sup_addr = ponger_addr->supervisor.get_address();
  69. request<rotor::payload::state_request_t>(ponger_sup_addr, ponger_addr).send(timeout);
  70. } else {
  71. supervisor.do_shutdown();
  72. }
  73. }
  74. }
  75. void on_start(rotor::message_t<rotor::payload::start_actor_t> &msg) noexcept override {
  76. std::cout << "lets send ping\n";
  77. rotor::actor_base_t::on_start(msg);
  78. request<payload::ping_t>(ponger_addr).send(timeout);
  79. }
  80. void on_pong(message::pong_t &) noexcept {
  81. std::cout << "pong received, going to shutdown\n";
  82. supervisor.do_shutdown();
  83. }
  84. std::uint32_t attempts = 3;
  85. rotor::address_ptr_t registry_addr;
  86. rotor::address_ptr_t ponger_addr;
  87. };
  88. struct ponger_t : public rotor::actor_base_t {
  89. using rotor::actor_base_t::actor_base_t;
  90. void set_registry_addr(const rotor::address_ptr_t &addr) { registry_addr = addr; }
  91. void init_start() noexcept override {
  92. subscribe(&ponger_t::on_ping);
  93. subscribe(&ponger_t::on_registration);
  94. request<rotor::payload::registration_request_t>(registry_addr, ponger_name, address).send(timeout);
  95. }
  96. void shutdown_start() noexcept override {
  97. send<rotor::payload::deregistration_notify_t>(registry_addr, address);
  98. rotor::actor_base_t::shutdown_start();
  99. }
  100. void on_registration(rotor::message::registration_response_t &msg) noexcept {
  101. auto &ec = msg.payload.ec;
  102. if (ec) {
  103. std::cout << "ponger registration failure: " << ec.message() << "\n";
  104. return;
  105. }
  106. std::cout << "ponger has been registered, resume initialization \n";
  107. rotor::actor_base_t::init_start();
  108. }
  109. void on_ping(message::ping_t &req) noexcept {
  110. std::cout << "ponger recevied ping request\n";
  111. reply_to(req);
  112. }
  113. rotor::address_ptr_t registry_addr;
  114. };
  115. int main() {
  116. try {
  117. auto *loop = ev_loop_new(0);
  118. auto system_context = rotor::ev::system_context_ev_t::ptr_t{new rotor::ev::system_context_ev_t()};
  119. auto timeout = boost::posix_time::milliseconds{10};
  120. auto conf = rotor::ev::supervisor_config_ev_t{
  121. timeout, loop, true, /* let supervisor takes ownership on the loop */
  122. };
  123. auto sup = system_context->create_supervisor<rotor::ev::supervisor_ev_t>(conf);
  124. auto registry = sup->create_actor<rotor::registry_t>(timeout);
  125. auto pinger = sup->create_actor<pinger_t>(timeout);
  126. auto ponger = sup->create_actor<ponger_t>(timeout);
  127. ponger->set_registry_addr(registry->get_address());
  128. pinger->set_registry_addr(registry->get_address());
  129. sup->start();
  130. ev_run(loop);
  131. } catch (const std::exception &ex) {
  132. std::cout << "exception : " << ex.what();
  133. }
  134. std::cout << "exiting...\n";
  135. return 0;
  136. }
  137. /*
  138. sample output:
  139. ponger address wasn't found: the requested service name is not registered
  140. lets try to discover ponger address again (2 attempts left)
  141. ponger has been registered, resume initialization
  142. ponger address was succesfully discovered
  143. ponger state is operational, continue pinger init
  144. lets send ping
  145. ponger recevied ping request
  146. pong received, going to shutdown
  147. exiting...
  148. */