010-sup-start_stop.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. //
  2. // Copyright (c) 2019-2020 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. #include "catch.hpp"
  7. #include "rotor.hpp"
  8. #include "supervisor_test.h"
  9. #include "actor_test.h"
  10. #include "access.h"
  11. namespace r = rotor;
  12. namespace rt = rotor::test;
  13. static std::uint32_t destroyed = 0;
  14. struct init_shutdown_plugin_t;
  15. namespace payload {
  16. struct sample_payload_t {};
  17. } // namespace payload
  18. namespace message {
  19. using sample_payload_t = r::message_t<payload::sample_payload_t>;
  20. }
  21. struct sample_sup_t : public rt::supervisor_test_t {
  22. using sup_base_t = rt::supervisor_test_t;
  23. using plugins_list_t = std::tuple<r::plugin::address_maker_plugin_t, r::plugin::locality_plugin_t,
  24. r::plugin::delivery_plugin_t<r::plugin::local_delivery_t>,
  25. r::plugin::lifetime_plugin_t, init_shutdown_plugin_t, /* use custom */
  26. r::plugin::foreigners_support_plugin_t, r::plugin::child_manager_plugin_t,
  27. r::plugin::starter_plugin_t>;
  28. std::uint32_t initialized = 0;
  29. std::uint32_t init_invoked = 0;
  30. std::uint32_t shutdown_started = 0;
  31. std::uint32_t shutdown_finished = 0;
  32. std::uint32_t shutdown_conf_invoked = 0;
  33. r::address_ptr_t shutdown_addr;
  34. using rt::supervisor_test_t::supervisor_test_t;
  35. ~sample_sup_t() override { ++destroyed; }
  36. void do_initialize(r::system_context_t *ctx) noexcept override {
  37. ++initialized;
  38. sup_base_t::do_initialize(ctx);
  39. }
  40. void shutdown_finish() noexcept override {
  41. ++shutdown_finished;
  42. rt::supervisor_test_t::shutdown_finish();
  43. }
  44. };
  45. struct init_shutdown_plugin_t : r::plugin::init_shutdown_plugin_t {
  46. using parent_t = r::plugin::init_shutdown_plugin_t;
  47. void deactivate() noexcept override { parent_t::deactivate(); }
  48. bool handle_shutdown(r::message::shutdown_request_t *message) noexcept override {
  49. auto sup = static_cast<sample_sup_t *>(actor);
  50. sup->shutdown_started++;
  51. return parent_t::handle_shutdown(message);
  52. }
  53. bool handle_init(r::message::init_request_t *message) noexcept override {
  54. auto sup = static_cast<sample_sup_t *>(actor);
  55. sup->init_invoked++;
  56. return parent_t::handle_init(message);
  57. }
  58. };
  59. struct sample_plugin_t : r::plugin::plugin_base_t {
  60. using parent_t = r::plugin::plugin_base_t;
  61. static const void *class_identity;
  62. const void *identity() const noexcept override { return class_identity; }
  63. void activate(r::actor_base_t *actor_) noexcept override {
  64. parent_t::activate(actor_);
  65. subscribe(&sample_plugin_t::on_message)->tag_io();
  66. }
  67. void deactivate() noexcept override { parent_t::deactivate(); }
  68. void on_message(message::sample_payload_t &) noexcept { message_received = true; }
  69. bool message_received = false;
  70. };
  71. const void *sample_plugin_t::class_identity = &sample_plugin_t::class_identity;
  72. struct sample_sup2_t : public rt::supervisor_test_t {
  73. using sup_base_t = rt::supervisor_test_t;
  74. std::uint32_t initialized = 0;
  75. std::uint32_t init_invoked = 0;
  76. std::uint32_t shutdown_finished = 0;
  77. std::uint32_t shutdown_conf_invoked = 0;
  78. r::address_ptr_t shutdown_addr;
  79. actor_base_t *init_child = nullptr;
  80. actor_base_t *shutdown_child = nullptr;
  81. std::error_code init_ec;
  82. std::error_code shutdown_ec;
  83. using rt::supervisor_test_t::supervisor_test_t;
  84. ~sample_sup2_t() override { ++destroyed; }
  85. void do_initialize(r::system_context_t *ctx) noexcept override {
  86. ++initialized;
  87. sup_base_t::do_initialize(ctx);
  88. }
  89. void init_finish() noexcept override {
  90. ++init_invoked;
  91. sup_base_t::init_finish();
  92. }
  93. virtual void shutdown_finish() noexcept override {
  94. ++shutdown_finished;
  95. rt::supervisor_test_t::shutdown_finish();
  96. }
  97. void on_child_init(actor_base_t *actor, const std::error_code &ec) noexcept override {
  98. init_child = actor;
  99. init_ec = ec;
  100. }
  101. void on_child_shutdown(actor_base_t *actor, const std::error_code &ec) noexcept override {
  102. shutdown_child = actor;
  103. shutdown_ec = ec;
  104. }
  105. };
  106. struct sample_sup3_t : public rt::supervisor_test_t {
  107. using sup_base_t = rt::supervisor_test_t;
  108. using rt::supervisor_test_t::supervisor_test_t;
  109. std::uint32_t received = 0;
  110. void make_subscription() noexcept {
  111. subscribe(&sample_sup3_t::on_sample);
  112. send<payload::sample_payload_t>(address);
  113. }
  114. void on_sample(message::sample_payload_t &) noexcept { ++received; }
  115. };
  116. struct sample_sup4_t : public rt::supervisor_test_t {
  117. using sup_base_t = rt::supervisor_test_t;
  118. using rt::supervisor_test_t::supervisor_test_t;
  119. std::uint32_t counter = 0;
  120. void intercept(r::message_ptr_t &, const void *tag, const r::continuation_t &continuation) noexcept override {
  121. CHECK(tag == rotor::tags::io);
  122. if (++counter % 2) {
  123. continuation();
  124. }
  125. }
  126. };
  127. struct unsubscriber_sup_t : public rt::supervisor_test_t {
  128. using sup_base_t = rt::supervisor_test_t;
  129. using rt::supervisor_test_t::supervisor_test_t;
  130. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  131. plugin.with_casted<r::plugin::starter_plugin_t>(
  132. [](auto &p) { p.subscribe_actor(&unsubscriber_sup_t::on_sample); });
  133. }
  134. void on_start() noexcept override {
  135. rt::supervisor_test_t::on_start();
  136. unsubscribe(&unsubscriber_sup_t::on_sample);
  137. }
  138. void on_sample(message::sample_payload_t &) noexcept {}
  139. };
  140. struct sample_actor_t : public r::actor_base_t {
  141. using r::actor_base_t::actor_base_t;
  142. };
  143. struct sample_actor2_t : public rt::actor_test_t {
  144. using rt::actor_test_t::actor_test_t;
  145. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  146. plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) { alternative = p.create_address(); });
  147. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  148. p.subscribe_actor(&sample_actor2_t::on_link, alternative);
  149. send<payload::sample_payload_t>(alternative);
  150. });
  151. }
  152. void on_link(message::sample_payload_t &) noexcept { ++received; }
  153. r::address_ptr_t alternative;
  154. int received = 0;
  155. };
  156. struct sample_actor3_t : public rt::actor_test_t {
  157. using rt::actor_test_t::actor_test_t;
  158. void shutdown_start() noexcept override {
  159. rt::actor_test_t::shutdown_start();
  160. resources->acquire();
  161. }
  162. };
  163. struct sample_actor4_t : public rt::actor_test_t {
  164. using rt::actor_test_t::actor_test_t;
  165. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  166. rt::actor_test_t::configure(plugin);
  167. plugin.with_casted<r::plugin::starter_plugin_t>(
  168. [&](auto &p) { p.subscribe_actor(&sample_actor4_t::on_message)->tag_io(); });
  169. }
  170. void on_start() noexcept override {
  171. rt::actor_test_t::on_start();
  172. send<payload::sample_payload_t>(get_address());
  173. send<payload::sample_payload_t>(get_address());
  174. }
  175. void on_message(message::sample_payload_t &) noexcept { ++received; }
  176. std::size_t received = 0;
  177. };
  178. struct sample_actor5_t : public rt::actor_test_t {
  179. using rt::actor_test_t::actor_test_t;
  180. // clang-format off
  181. using plugins_list_t = std::tuple<
  182. r::plugin::address_maker_plugin_t,
  183. r::plugin::lifetime_plugin_t,
  184. r::plugin::init_shutdown_plugin_t,
  185. r::plugin::link_server_plugin_t,
  186. r::plugin::link_client_plugin_t,
  187. r::plugin::registry_plugin_t,
  188. r::plugin::resources_plugin_t,
  189. r::plugin::starter_plugin_t,
  190. sample_plugin_t
  191. >;
  192. // clang-format on
  193. void on_start() noexcept override {
  194. rt::actor_test_t::on_start();
  195. send<payload::sample_payload_t>(get_address());
  196. send<payload::sample_payload_t>(get_address());
  197. }
  198. };
  199. struct sample_actor6_t : public rt::actor_test_t {
  200. using rt::actor_test_t::actor_test_t;
  201. void on_start() noexcept override {
  202. rt::actor_test_t::on_start();
  203. start_timer(r::pt::minutes(1), *this, &sample_actor6_t::on_timer);
  204. }
  205. void on_timer(r::request_id_t, bool cancelled) noexcept { this->cancelled = cancelled; }
  206. bool cancelled = false;
  207. };
  208. TEST_CASE("on_initialize, on_start, simple on_shutdown (handled by plugin)", "[supervisor]") {
  209. destroyed = 0;
  210. r::system_context_t *system_context = new r::system_context_t{};
  211. auto sup = system_context->create_supervisor<sample_sup_t>().timeout(rt::default_timeout).finish();
  212. REQUIRE(&sup->get_supervisor() == sup.get());
  213. REQUIRE(sup->initialized == 1);
  214. sup->do_process();
  215. CHECK(sup->init_invoked == 1);
  216. CHECK(sup->shutdown_started == 0);
  217. CHECK(sup->shutdown_conf_invoked == 0);
  218. CHECK(sup->active_timers.size() == 0);
  219. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  220. sup->do_shutdown();
  221. sup->do_process();
  222. REQUIRE(sup->shutdown_started == 1);
  223. REQUIRE(sup->shutdown_finished == 1);
  224. REQUIRE(sup->active_timers.size() == 0);
  225. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  226. REQUIRE(sup->get_leader_queue().size() == 0);
  227. REQUIRE(sup->get_points().size() == 0);
  228. CHECK(rt::empty(sup->get_subscription()));
  229. REQUIRE(destroyed == 0);
  230. delete system_context;
  231. sup->shutdown_addr.reset();
  232. sup.reset();
  233. REQUIRE(destroyed == 1);
  234. }
  235. TEST_CASE("on_initialize, on_start, simple on_shutdown", "[supervisor]") {
  236. destroyed = 0;
  237. r::system_context_t *system_context = new r::system_context_t{};
  238. auto sup = system_context->create_supervisor<sample_sup2_t>().timeout(rt::default_timeout).finish();
  239. REQUIRE(&sup->get_supervisor() == sup.get());
  240. REQUIRE(sup->initialized == 1);
  241. REQUIRE(sup->init_child == nullptr);
  242. sup->do_process();
  243. REQUIRE(sup->init_invoked == 1);
  244. REQUIRE(sup->shutdown_conf_invoked == 0);
  245. REQUIRE(sup->active_timers.size() == 0);
  246. REQUIRE(sup->get_state() == r::state_t::OPERATIONAL);
  247. sup->do_shutdown();
  248. sup->do_process();
  249. REQUIRE(sup->shutdown_finished == 1);
  250. REQUIRE(sup->active_timers.size() == 0);
  251. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  252. REQUIRE(sup->get_leader_queue().size() == 0);
  253. REQUIRE(sup->get_points().size() == 0);
  254. CHECK(rt::empty(sup->get_subscription()));
  255. REQUIRE(sup->shutdown_child == nullptr);
  256. REQUIRE(destroyed == 0);
  257. delete system_context;
  258. sup->shutdown_addr.reset();
  259. sup.reset();
  260. REQUIRE(destroyed == 1);
  261. }
  262. TEST_CASE("start/shutdown 1 child & 1 supervisor", "[supervisor]") {
  263. r::system_context_ptr_t system_context = new r::system_context_t();
  264. auto sup = system_context->create_supervisor<sample_sup2_t>().timeout(rt::default_timeout).finish();
  265. auto act = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  266. /* for better coverage */
  267. auto last = sup->access<rt::to::last_req_id>();
  268. auto &request_map = sup->access<rt::to::request_map>();
  269. request_map[last + 1] = r::request_curry_t();
  270. sup->do_process();
  271. request_map.clear();
  272. CHECK(sup->access<rt::to::last_req_id>() > 1);
  273. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  274. CHECK(act->access<rt::to::state>() == r::state_t::OPERATIONAL);
  275. CHECK(act->access<rt::to::resources>()->has() == 0);
  276. CHECK(sup->init_child == act.get());
  277. CHECK(!sup->init_ec);
  278. CHECK(sup->shutdown_child == nullptr);
  279. sup->do_shutdown();
  280. sup->do_process();
  281. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  282. CHECK(act->access<rt::to::state>() == r::state_t::SHUT_DOWN);
  283. CHECK(sup->shutdown_child == act.get());
  284. CHECK(!sup->shutdown_ec);
  285. }
  286. TEST_CASE("custom subscription", "[supervisor]") {
  287. r::system_context_ptr_t system_context = new r::system_context_t();
  288. auto sup = system_context->create_supervisor<sample_sup3_t>().timeout(rt::default_timeout).finish();
  289. sup->do_process();
  290. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  291. sup->make_subscription();
  292. sup->do_process();
  293. CHECK(sup->received == 1);
  294. sup->do_shutdown();
  295. sup->do_process();
  296. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  297. }
  298. TEST_CASE("shutdown immediately", "[supervisor]") {
  299. r::system_context_ptr_t system_context = new r::system_context_t();
  300. auto sup = system_context->create_supervisor<sample_sup3_t>().timeout(rt::default_timeout).finish();
  301. sup->do_shutdown();
  302. sup->do_process();
  303. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  304. }
  305. TEST_CASE("self unsubscriber", "[actor]") {
  306. r::system_context_ptr_t system_context = new r::system_context_t();
  307. auto sup = system_context->create_supervisor<unsubscriber_sup_t>().timeout(rt::default_timeout).finish();
  308. sup->do_process();
  309. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  310. sup->do_shutdown();
  311. sup->do_process();
  312. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  313. }
  314. TEST_CASE("alternative address subscriber", "[actor]") {
  315. r::system_context_ptr_t system_context = new r::system_context_t();
  316. auto sup = system_context->create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  317. auto act = sup->create_actor<sample_actor2_t>().timeout(rt::default_timeout).finish();
  318. sup->do_process();
  319. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  320. CHECK(act->get_state() == r::state_t::OPERATIONAL);
  321. CHECK(act->received == 1);
  322. sup->do_shutdown();
  323. sup->do_process();
  324. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  325. CHECK(act->get_state() == r::state_t::SHUT_DOWN);
  326. }
  327. TEST_CASE("acquire resources on shutdown start", "[actor]") {
  328. r::system_context_ptr_t system_context = new r::system_context_t();
  329. auto sup = system_context->create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  330. auto act = sup->create_actor<sample_actor3_t>().timeout(rt::default_timeout).finish();
  331. sup->do_process();
  332. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  333. sup->do_shutdown();
  334. sup->do_process();
  335. CHECK(act->get_state() == r::state_t::SHUTTING_DOWN);
  336. act->access<rt::to::resources>()->release();
  337. sup->do_process();
  338. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  339. CHECK(act->get_state() == r::state_t::SHUT_DOWN);
  340. }
  341. TEST_CASE("io tagging & intercepting", "[actor]") {
  342. r::system_context_ptr_t system_context = new r::system_context_t();
  343. auto sup = system_context->create_supervisor<sample_sup4_t>().timeout(rt::default_timeout).finish();
  344. auto act = sup->create_actor<sample_actor4_t>().timeout(rt::default_timeout).finish();
  345. sup->do_process();
  346. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  347. CHECK(act->received == 1);
  348. CHECK(sup->counter == 2);
  349. sup->do_shutdown();
  350. sup->do_process();
  351. CHECK(act->get_state() == r::state_t::SHUT_DOWN);
  352. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  353. }
  354. TEST_CASE("io tagging (in plugin) & intercepting", "[actor]") {
  355. r::system_context_ptr_t system_context = new r::system_context_t();
  356. auto sup = system_context->create_supervisor<sample_sup4_t>().timeout(rt::default_timeout).finish();
  357. auto act = sup->create_actor<sample_actor5_t>().timeout(rt::default_timeout).finish();
  358. sup->do_process();
  359. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  360. CHECK(sup->counter == 2);
  361. auto plugin = act->access<rt::to::get_plugin>(sample_plugin_t::class_identity);
  362. CHECK(plugin);
  363. CHECK(static_cast<sample_plugin_t *>(plugin)->message_received);
  364. sup->do_shutdown();
  365. sup->do_process();
  366. CHECK(act->get_state() == r::state_t::SHUT_DOWN);
  367. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  368. }
  369. TEST_CASE("timers cancellation", "[actor]") {
  370. r::system_context_ptr_t system_context = new r::system_context_t();
  371. auto sup = system_context->create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  372. auto act = sup->create_actor<sample_actor6_t>().timeout(rt::default_timeout).finish();
  373. sup->do_process();
  374. CHECK(act->get_state() == r::state_t::OPERATIONAL);
  375. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  376. CHECK(!act->access<rt::to::timers_map>().empty());
  377. sup->do_shutdown();
  378. sup->do_process();
  379. CHECK(act->get_state() == r::state_t::SHUT_DOWN);
  380. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  381. CHECK(act->access<rt::to::timers_map>().empty());
  382. }