010-sup-start_stop.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. //
  2. // Copyright (c) 2019-2020 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. #include "catch.hpp"
  7. #include "rotor.hpp"
  8. #include "supervisor_test.h"
  9. #include "actor_test.h"
  10. #include "access.h"
  11. namespace r = rotor;
  12. namespace rt = rotor::test;
  13. static std::uint32_t destroyed = 0;
  14. struct init_shutdown_plugin_t;
  15. namespace payload {
  16. struct sample_payload_t {};
  17. } // namespace payload
  18. namespace message {
  19. using sample_payload_t = r::message_t<payload::sample_payload_t>;
  20. }
  21. struct sample_sup_t : public rt::supervisor_test_t {
  22. using sup_base_t = rt::supervisor_test_t;
  23. using plugins_list_t = std::tuple<r::plugin::address_maker_plugin_t, r::plugin::locality_plugin_t,
  24. r::plugin::delivery_plugin_t<r::plugin::local_delivery_t>,
  25. r::plugin::lifetime_plugin_t, init_shutdown_plugin_t, /* use custom */
  26. r::plugin::foreigners_support_plugin_t, r::plugin::child_manager_plugin_t,
  27. r::plugin::starter_plugin_t>;
  28. std::uint32_t initialized = 0;
  29. std::uint32_t init_invoked = 0;
  30. std::uint32_t shutdown_started = 0;
  31. std::uint32_t shutdown_finished = 0;
  32. std::uint32_t shutdown_conf_invoked = 0;
  33. r::address_ptr_t shutdown_addr;
  34. using rt::supervisor_test_t::supervisor_test_t;
  35. ~sample_sup_t() override { ++destroyed; }
  36. void do_initialize(r::system_context_t *ctx) noexcept override {
  37. ++initialized;
  38. sup_base_t::do_initialize(ctx);
  39. }
  40. void shutdown_finish() noexcept override {
  41. ++shutdown_finished;
  42. rt::supervisor_test_t::shutdown_finish();
  43. }
  44. };
  45. struct init_shutdown_plugin_t : r::plugin::init_shutdown_plugin_t {
  46. using parent_t = r::plugin::init_shutdown_plugin_t;
  47. void deactivate() noexcept override { parent_t::deactivate(); }
  48. bool handle_shutdown(r::message::shutdown_request_t *message) noexcept override {
  49. auto sup = static_cast<sample_sup_t *>(actor);
  50. sup->shutdown_started++;
  51. return parent_t::handle_shutdown(message);
  52. }
  53. bool handle_init(r::message::init_request_t *message) noexcept override {
  54. auto sup = static_cast<sample_sup_t *>(actor);
  55. sup->init_invoked++;
  56. return parent_t::handle_init(message);
  57. }
  58. };
  59. struct sample_sup2_t : public rt::supervisor_test_t {
  60. using sup_base_t = rt::supervisor_test_t;
  61. std::uint32_t initialized = 0;
  62. std::uint32_t init_invoked = 0;
  63. std::uint32_t shutdown_finished = 0;
  64. std::uint32_t shutdown_conf_invoked = 0;
  65. r::address_ptr_t shutdown_addr;
  66. actor_base_t *init_child = nullptr;
  67. actor_base_t *shutdown_child = nullptr;
  68. std::error_code init_ec;
  69. std::error_code shutdown_ec;
  70. using rt::supervisor_test_t::supervisor_test_t;
  71. ~sample_sup2_t() override { ++destroyed; }
  72. void do_initialize(r::system_context_t *ctx) noexcept override {
  73. ++initialized;
  74. sup_base_t::do_initialize(ctx);
  75. }
  76. void init_finish() noexcept override {
  77. ++init_invoked;
  78. sup_base_t::init_finish();
  79. }
  80. virtual void shutdown_finish() noexcept override {
  81. ++shutdown_finished;
  82. rt::supervisor_test_t::shutdown_finish();
  83. }
  84. void on_child_init(actor_base_t *actor, const std::error_code &ec) noexcept override {
  85. init_child = actor;
  86. init_ec = ec;
  87. }
  88. void on_child_shutdown(actor_base_t *actor, const std::error_code &ec) noexcept override {
  89. shutdown_child = actor;
  90. shutdown_ec = ec;
  91. }
  92. };
  93. struct sample_sup3_t : public rt::supervisor_test_t {
  94. using sup_base_t = rt::supervisor_test_t;
  95. using rt::supervisor_test_t::supervisor_test_t;
  96. std::uint32_t received = 0;
  97. void make_subscription() noexcept {
  98. subscribe(&sample_sup3_t::on_sample);
  99. send<payload::sample_payload_t>(address);
  100. }
  101. void on_sample(message::sample_payload_t &) noexcept { ++received; }
  102. };
  103. struct unsubscriber_sup_t : public rt::supervisor_test_t {
  104. using sup_base_t = rt::supervisor_test_t;
  105. using rt::supervisor_test_t::supervisor_test_t;
  106. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  107. plugin.with_casted<r::plugin::starter_plugin_t>(
  108. [](auto &p) { p.subscribe_actor(&unsubscriber_sup_t::on_sample); });
  109. }
  110. void on_start() noexcept override {
  111. rt::supervisor_test_t::on_start();
  112. unsubscribe(&unsubscriber_sup_t::on_sample);
  113. }
  114. void on_sample(message::sample_payload_t &) noexcept {}
  115. };
  116. struct sample_actor_t : public r::actor_base_t {
  117. using r::actor_base_t::actor_base_t;
  118. };
  119. struct sample_actor2_t : public rt::actor_test_t {
  120. using rt::actor_test_t::actor_test_t;
  121. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  122. plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) { alternative = p.create_address(); });
  123. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  124. p.subscribe_actor(&sample_actor2_t::on_link, alternative);
  125. send<payload::sample_payload_t>(alternative);
  126. });
  127. }
  128. void on_link(message::sample_payload_t &) noexcept { ++received; }
  129. r::address_ptr_t alternative;
  130. int received = 0;
  131. };
  132. struct sample_actor3_t : public rt::actor_test_t {
  133. using rt::actor_test_t::actor_test_t;
  134. void shutdown_start() noexcept override {
  135. rt::actor_test_t::shutdown_start();
  136. resources->acquire();
  137. }
  138. };
  139. TEST_CASE("on_initialize, on_start, simple on_shutdown (handled by plugin)", "[supervisor]") {
  140. destroyed = 0;
  141. r::system_context_t *system_context = new r::system_context_t{};
  142. auto sup = system_context->create_supervisor<sample_sup_t>().timeout(rt::default_timeout).finish();
  143. REQUIRE(&sup->get_supervisor() == sup.get());
  144. REQUIRE(sup->initialized == 1);
  145. sup->do_process();
  146. CHECK(sup->init_invoked == 1);
  147. CHECK(sup->shutdown_started == 0);
  148. CHECK(sup->shutdown_conf_invoked == 0);
  149. CHECK(sup->active_timers.size() == 0);
  150. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  151. sup->do_shutdown();
  152. sup->do_process();
  153. REQUIRE(sup->shutdown_started == 1);
  154. REQUIRE(sup->shutdown_finished == 1);
  155. REQUIRE(sup->active_timers.size() == 0);
  156. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  157. REQUIRE(sup->get_leader_queue().size() == 0);
  158. REQUIRE(sup->get_points().size() == 0);
  159. CHECK(rt::empty(sup->get_subscription()));
  160. REQUIRE(destroyed == 0);
  161. delete system_context;
  162. sup->shutdown_addr.reset();
  163. sup.reset();
  164. REQUIRE(destroyed == 1);
  165. }
  166. TEST_CASE("on_initialize, on_start, simple on_shutdown", "[supervisor]") {
  167. destroyed = 0;
  168. r::system_context_t *system_context = new r::system_context_t{};
  169. auto sup = system_context->create_supervisor<sample_sup2_t>().timeout(rt::default_timeout).finish();
  170. REQUIRE(&sup->get_supervisor() == sup.get());
  171. REQUIRE(sup->initialized == 1);
  172. REQUIRE(sup->init_child == nullptr);
  173. sup->do_process();
  174. REQUIRE(sup->init_invoked == 1);
  175. REQUIRE(sup->shutdown_conf_invoked == 0);
  176. REQUIRE(sup->active_timers.size() == 0);
  177. REQUIRE(sup->get_state() == r::state_t::OPERATIONAL);
  178. sup->do_shutdown();
  179. sup->do_process();
  180. REQUIRE(sup->shutdown_finished == 1);
  181. REQUIRE(sup->active_timers.size() == 0);
  182. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  183. REQUIRE(sup->get_leader_queue().size() == 0);
  184. REQUIRE(sup->get_points().size() == 0);
  185. CHECK(rt::empty(sup->get_subscription()));
  186. REQUIRE(sup->shutdown_child == nullptr);
  187. REQUIRE(destroyed == 0);
  188. delete system_context;
  189. sup->shutdown_addr.reset();
  190. sup.reset();
  191. REQUIRE(destroyed == 1);
  192. }
  193. TEST_CASE("start/shutdown 1 child & 1 supervisor", "[supervisor]") {
  194. r::system_context_ptr_t system_context = new r::system_context_t();
  195. auto sup = system_context->create_supervisor<sample_sup2_t>().timeout(rt::default_timeout).finish();
  196. auto act = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  197. /* for better coverage */
  198. auto last = sup->access<rt::to::last_req_id>();
  199. auto &request_map = sup->access<rt::to::request_map>();
  200. request_map[last + 1] = r::request_curry_t();
  201. sup->do_process();
  202. request_map.clear();
  203. CHECK(sup->access<rt::to::last_req_id>() > 1);
  204. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  205. CHECK(act->access<rt::to::state>() == r::state_t::OPERATIONAL);
  206. CHECK(act->access<rt::to::resources>()->has() == 0);
  207. CHECK(sup->init_child == act.get());
  208. CHECK(!sup->init_ec);
  209. CHECK(sup->shutdown_child == nullptr);
  210. sup->do_shutdown();
  211. sup->do_process();
  212. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  213. CHECK(act->access<rt::to::state>() == r::state_t::SHUT_DOWN);
  214. CHECK(sup->shutdown_child == act.get());
  215. CHECK(!sup->shutdown_ec);
  216. }
  217. TEST_CASE("custom subscription", "[supervisor]") {
  218. r::system_context_ptr_t system_context = new r::system_context_t();
  219. auto sup = system_context->create_supervisor<sample_sup3_t>().timeout(rt::default_timeout).finish();
  220. sup->do_process();
  221. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  222. sup->make_subscription();
  223. sup->do_process();
  224. CHECK(sup->received == 1);
  225. sup->do_shutdown();
  226. sup->do_process();
  227. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  228. }
  229. TEST_CASE("shutdown immediately", "[supervisor]") {
  230. r::system_context_ptr_t system_context = new r::system_context_t();
  231. auto sup = system_context->create_supervisor<sample_sup3_t>().timeout(rt::default_timeout).finish();
  232. sup->do_shutdown();
  233. sup->do_process();
  234. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  235. }
  236. TEST_CASE("self unsubscriber", "[actor]") {
  237. r::system_context_ptr_t system_context = new r::system_context_t();
  238. auto sup = system_context->create_supervisor<unsubscriber_sup_t>().timeout(rt::default_timeout).finish();
  239. sup->do_process();
  240. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  241. sup->do_shutdown();
  242. sup->do_process();
  243. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  244. }
  245. TEST_CASE("alternative address subscriber", "[actor]") {
  246. r::system_context_ptr_t system_context = new r::system_context_t();
  247. auto sup = system_context->create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  248. auto act = sup->create_actor<sample_actor2_t>().timeout(rt::default_timeout).finish();
  249. sup->do_process();
  250. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  251. CHECK(act->get_state() == r::state_t::OPERATIONAL);
  252. CHECK(act->received == 1);
  253. sup->do_shutdown();
  254. sup->do_process();
  255. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  256. CHECK(act->get_state() == r::state_t::SHUT_DOWN);
  257. }
  258. TEST_CASE("acquire resources on shutdown start", "[actor]") {
  259. r::system_context_ptr_t system_context = new r::system_context_t();
  260. auto sup = system_context->create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  261. auto act = sup->create_actor<sample_actor3_t>().timeout(rt::default_timeout).finish();
  262. sup->do_process();
  263. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  264. sup->do_shutdown();
  265. sup->do_process();
  266. CHECK(act->get_state() == r::state_t::SHUTTING_DOWN);
  267. act->access<rt::to::resources>()->release();
  268. sup->do_process();
  269. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  270. CHECK(act->get_state() == r::state_t::SHUT_DOWN);
  271. }