022-supervisor-tree.cpp 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. //
  2. // Copyright (c) 2019-2020 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. #include "catch.hpp"
  7. #include "rotor.hpp"
  8. #include "supervisor_test.h"
  9. #include "access.h"
  10. namespace r = rotor;
  11. namespace rt = r::test;
  12. static std::uint32_t ping_received = 0;
  13. static std::uint32_t ping_sent = 0;
  14. struct ping_t {};
  15. struct pinger_t : public r::actor_base_t {
  16. using r::actor_base_t::actor_base_t;
  17. void set_ponger_addr(const r::address_ptr_t &addr) { ponger_addr = addr; }
  18. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  19. plugin.with_casted<r::plugin::starter_plugin_t>([](auto &p) { p.subscribe_actor(&pinger_t::on_state); });
  20. }
  21. void on_start() noexcept override {
  22. r::actor_base_t::on_start();
  23. request_status();
  24. }
  25. void request_status() noexcept {
  26. auto &sup_addr = static_cast<r::actor_base_t &>(ponger_addr->supervisor).get_address();
  27. request<r::payload::state_request_t>(sup_addr, ponger_addr).send(r::pt::seconds{1});
  28. ++attempts;
  29. }
  30. void on_state(r::message::state_response_t &msg) noexcept {
  31. auto &state = msg.payload.res.state;
  32. if (state == r::state_t::OPERATIONAL) {
  33. send<ping_t>(ponger_addr);
  34. ponger_addr.reset();
  35. ping_sent++;
  36. } else if (attempts > 10) {
  37. do_shutdown();
  38. } else {
  39. request_status();
  40. }
  41. }
  42. std::uint32_t attempts = 0;
  43. r::address_ptr_t ponger_addr;
  44. };
  45. struct ponger_t : public r::actor_base_t {
  46. using r::actor_base_t::actor_base_t;
  47. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  48. plugin.with_casted<r::plugin::starter_plugin_t>([](auto &p) { p.subscribe_actor(&ponger_t::on_ping); });
  49. }
  50. void on_ping(r::message_t<ping_t> &) noexcept {
  51. ping_received++;
  52. do_shutdown();
  53. }
  54. };
  55. struct custom_sup : rt::supervisor_test_t {
  56. using rt::supervisor_test_t::supervisor_test_t;
  57. void on_child_init(actor_base_t *, const std::error_code &ec) noexcept override { error_code = ec; }
  58. std::error_code error_code;
  59. };
  60. /*
  61. * Let's have the following tree of supervisors
  62. *
  63. * S_root
  64. * | |
  65. * S_A1 S_B1
  66. * | |
  67. * S_A2 S_B2
  68. * / \
  69. * pinger ponger
  70. *
  71. * 1. Pinger should be able to send ping message to ponger. The message should
  72. * be processed by S_1, still it have to be delivered to ponger
  73. *
  74. * 2. Ponger should receive the message, and initiate it's own shutdown procedure
  75. *
  76. * 3. As all supervisors have the same localitiy, the S_2 supervisor should
  77. * receive ponger shutdown request and spawn a new ponger.
  78. *
  79. * 4. All messaging (except initialization) should happen in single do_process
  80. * pass
  81. *
  82. */
  83. TEST_CASE("supervisor/locality tree ", "[supervisor]") {
  84. r::system_context_t system_context;
  85. const void *locality = &system_context;
  86. auto sup_root = system_context.create_supervisor<rt::supervisor_test_t>()
  87. .locality(locality)
  88. .timeout(rt::default_timeout)
  89. .finish();
  90. auto sup_A1 =
  91. sup_root->create_actor<rt::supervisor_test_t>().locality(locality).timeout(rt::default_timeout).finish();
  92. auto sup_A2 =
  93. sup_A1->create_actor<rt::supervisor_test_t>().locality(locality).timeout(rt::default_timeout).finish();
  94. auto sup_B1 =
  95. sup_root->create_actor<rt::supervisor_test_t>().locality(locality).timeout(rt::default_timeout).finish();
  96. auto sup_B2 =
  97. sup_B1->create_actor<rt::supervisor_test_t>().locality(locality).timeout(rt::default_timeout).finish();
  98. auto pinger = sup_A2->create_actor<pinger_t>().timeout(rt::default_timeout).finish();
  99. auto ponger = sup_B2->create_actor<ponger_t>().timeout(rt::default_timeout).finish();
  100. pinger->set_ponger_addr(ponger->get_address());
  101. sup_A2->do_process();
  102. REQUIRE(sup_A2->get_children_count() == 1 + 1);
  103. REQUIRE(sup_B2->get_children_count() == 1);
  104. REQUIRE(ping_sent == 1);
  105. REQUIRE(ping_received == 1);
  106. sup_root->do_shutdown();
  107. sup_root->do_process();
  108. REQUIRE(sup_A2->get_state() == r::state_t::SHUT_DOWN);
  109. REQUIRE(sup_B2->get_state() == r::state_t::SHUT_DOWN);
  110. REQUIRE(sup_A1->get_state() == r::state_t::SHUT_DOWN);
  111. REQUIRE(sup_B1->get_state() == r::state_t::SHUT_DOWN);
  112. REQUIRE(sup_root->get_state() == r::state_t::SHUT_DOWN);
  113. }
  114. TEST_CASE("failure escalation") {
  115. r::system_context_t system_context;
  116. auto sup_root =
  117. system_context.create_supervisor<custom_sup>().timeout(rt::default_timeout).create_registry().finish();
  118. auto sup_child = sup_root->create_actor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  119. r::address_ptr_t dummy_addr;
  120. auto act = sup_child->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  121. act->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  122. plugin.with_casted<r::plugin::registry_plugin_t>([&](auto &p) { p.discover_name("service-name", dummy_addr); });
  123. };
  124. sup_root->do_process();
  125. CHECK(act->get_state() == r::state_t::SHUT_DOWN);
  126. CHECK(sup_child->get_state() == r::state_t::SHUT_DOWN);
  127. CHECK(sup_root->get_state() == r::state_t::SHUT_DOWN);
  128. CHECK(sup_root->error_code.message() == "failure escalation (child actor died)");
  129. }