019-link-unlink.cpp 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. //
  2. // Copyright (c) 2019-2021 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. #include "catch.hpp"
  7. #include "rotor.hpp"
  8. #include "actor_test.h"
  9. #include "supervisor_test.h"
  10. #include "system_context_test.h"
  11. #include "access.h"
  12. namespace r = rotor;
  13. namespace rt = r::test;
  14. struct double_linked_actor_t : r::actor_base_t {
  15. using r::actor_base_t::actor_base_t;
  16. using message_ptr_t = r::intrusive_ptr_t<r::message::link_response_t>;
  17. struct resource {
  18. static const constexpr r::plugin::resource_id_t linking = 0;
  19. static const constexpr r::plugin::resource_id_t unlinking = 1;
  20. };
  21. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  22. plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) { alternative = p.create_address(); });
  23. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  24. p.subscribe_actor(&double_linked_actor_t::on_link_res, alternative);
  25. p.subscribe_actor(&double_linked_actor_t::on_unlink_req, alternative);
  26. for (auto i = 0; i < 2; ++i) {
  27. resources->acquire(resource::linking);
  28. request_via<r::payload::link_request_t>(target, alternative, false).send(rt::default_timeout);
  29. }
  30. });
  31. }
  32. void on_link_res(r::message::link_response_t &res) noexcept {
  33. resources->release(resource::linking);
  34. if (!message1)
  35. message1 = &res;
  36. else if (!message2)
  37. message2 = &res;
  38. }
  39. virtual void on_start() noexcept override {
  40. r::actor_base_t::on_start();
  41. resources->acquire(resource::unlinking);
  42. }
  43. void on_unlink_req(r::message::unlink_request_t &message) noexcept {
  44. reply_to(message, alternative);
  45. if (resources->has(resource::unlinking))
  46. resources->release(resource::unlinking);
  47. }
  48. r::address_ptr_t target;
  49. message_ptr_t message1, message2;
  50. r::address_ptr_t alternative;
  51. };
  52. struct tracked_actor_t : rt::actor_test_t {
  53. using rt::actor_test_t::actor_test_t;
  54. std::uint32_t shutdown_event = 0;
  55. };
  56. struct ignore_unlink_actor_t : rt::actor_test_t {
  57. using rt::actor_test_t::actor_test_t;
  58. r::address_ptr_t server_addr;
  59. bool on_unlink(const r::address_ptr_t &addr) noexcept override {
  60. server_addr = addr;
  61. return false;
  62. }
  63. };
  64. TEST_CASE("client/server, common workflow", "[actor]") {
  65. r::system_context_t system_context;
  66. auto sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  67. auto act_s = sup->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  68. auto act_c = sup->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  69. auto &addr_s = act_s->get_address();
  70. bool invoked = false;
  71. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  72. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) {
  73. p.link(addr_s, false, [&](auto &ee) mutable {
  74. REQUIRE(!ee);
  75. invoked = true;
  76. });
  77. });
  78. };
  79. sup->do_process();
  80. REQUIRE(sup->get_state() == r::state_t::OPERATIONAL);
  81. REQUIRE(invoked);
  82. SECTION("simultaneous shutdown") {
  83. sup->do_shutdown();
  84. sup->do_process();
  85. }
  86. SECTION("controlled shutdown") {
  87. SECTION("indirect client-initiated unlink via client-shutdown") {
  88. act_c->do_shutdown();
  89. sup->do_process();
  90. CHECK(act_c->get_state() == r::state_t::SHUT_DOWN);
  91. }
  92. SECTION("indirect client-initiated unlink via server-shutdown") {
  93. act_s->do_shutdown();
  94. sup->do_process();
  95. CHECK(act_s->get_state() == r::state_t::SHUT_DOWN);
  96. CHECK(act_c->get_state() == r::state_t::SHUT_DOWN);
  97. }
  98. sup->do_shutdown();
  99. sup->do_process();
  100. }
  101. }
  102. TEST_CASE("link not possible (timeout) => shutdown", "[actor]") {
  103. r::system_context_t system_context;
  104. auto sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  105. auto act_c = sup->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  106. auto some_addr = sup->make_address();
  107. bool invoked = false;
  108. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  109. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) {
  110. p.link(some_addr, false, [&](auto &ec) mutable {
  111. REQUIRE(ec);
  112. invoked = true;
  113. });
  114. });
  115. };
  116. sup->do_process();
  117. REQUIRE(sup->get_state() == r::state_t::INITIALIZING);
  118. REQUIRE(sup->active_timers.size() == 3);
  119. auto timer_it = *(sup->active_timers.rbegin());
  120. sup->do_invoke_timer(timer_it->request_id);
  121. sup->do_process();
  122. REQUIRE(invoked);
  123. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  124. }
  125. #if 0
  126. TEST_CASE("link not possible => supervisor is shutted down", "[actor]") {
  127. r::system_context_t system_context;
  128. auto sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  129. auto act_s = sup->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  130. auto act_c = sup->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  131. auto server_addr = act_s->get_address();
  132. act_c->link_request(server_addr, rt::default_timeout);
  133. sup->do_process();
  134. REQUIRE(act_c->get_state() == r::state_t::SHUT_DOWN);
  135. REQUIRE(act_s->get_state() == r::state_t::SHUT_DOWN);
  136. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  137. }
  138. #endif
  139. TEST_CASE("unlink", "[actor]") {
  140. rt::system_context_test_t system_context;
  141. const char l1[] = "abc";
  142. const char l2[] = "def";
  143. auto sup1 =
  144. system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l1).finish();
  145. auto sup2 = sup1->create_actor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l2).finish();
  146. auto act_s = sup1->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  147. auto act_c = sup2->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  148. auto &addr_s = act_s->get_address();
  149. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  150. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) { p.link(addr_s, false, [&](auto &) {}); });
  151. };
  152. while (!sup1->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
  153. sup1->do_process();
  154. sup2->do_process();
  155. }
  156. REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
  157. SECTION("unlink failure") {
  158. act_s->do_shutdown();
  159. sup1->do_process();
  160. REQUIRE(sup1->active_timers.size() == 2);
  161. auto unlink_req = sup1->get_timer(1);
  162. sup1->do_invoke_timer(unlink_req);
  163. sup1->do_process();
  164. REQUIRE(system_context.reason->ec == r::error_code_t::request_timeout);
  165. REQUIRE(act_s->get_state() == r::state_t::SHUTTING_DOWN);
  166. act_s->force_cleanup();
  167. }
  168. SECTION("unlink-notify on unlink-request") {
  169. SECTION("client, then server") {
  170. act_s->do_shutdown();
  171. act_c->do_shutdown();
  172. sup2->do_process();
  173. sup1->do_process();
  174. sup2->do_process();
  175. sup1->do_process();
  176. }
  177. SECTION("server, then client") {
  178. act_s->do_shutdown();
  179. act_c->do_shutdown();
  180. sup1->do_process();
  181. sup2->do_process();
  182. sup1->do_process();
  183. sup2->do_process();
  184. }
  185. }
  186. sup1->do_shutdown();
  187. while (!sup1->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
  188. sup1->do_process();
  189. sup2->do_process();
  190. }
  191. REQUIRE(sup1->get_state() == r::state_t::SHUT_DOWN);
  192. }
  193. TEST_CASE("unlink reaction", "[actor]") {
  194. using request_ptr_t = r::intrusive_ptr_t<r::message::unlink_request_t>;
  195. rt::system_context_test_t system_context;
  196. auto sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  197. auto act_s = sup->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  198. auto act_c = sup->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  199. auto &addr_s = act_s->get_address();
  200. request_ptr_t unlink_req;
  201. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  202. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) {
  203. p.link(addr_s, false, [&](auto &) {});
  204. p.on_unlink([&](auto &req) {
  205. unlink_req = &req;
  206. p.forget_link(req);
  207. return true;
  208. });
  209. });
  210. };
  211. sup->do_process();
  212. act_s->do_shutdown();
  213. sup->do_process();
  214. REQUIRE(unlink_req);
  215. REQUIRE(unlink_req->message_type == r::message::unlink_request_t::message_type);
  216. sup->do_shutdown();
  217. sup->do_process();
  218. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  219. }
  220. TEST_CASE("auto-unlink on shutdown", "[actor]") {
  221. rt::system_context_test_t ctx1;
  222. rt::system_context_test_t ctx2;
  223. const char l1[] = "abc";
  224. const char l2[] = "def";
  225. auto sup1 = ctx1.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l1).finish();
  226. auto sup2 = ctx2.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l2).finish();
  227. auto act_c = sup1->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  228. auto act_s = sup2->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  229. auto &addr_s = act_s->get_address();
  230. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  231. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) { p.link(addr_s, false, [&](auto &) {}); });
  232. };
  233. sup1->do_process();
  234. REQUIRE(act_c->get_state() == r::state_t::INITIALIZING);
  235. act_c->do_shutdown();
  236. sup1->do_process();
  237. REQUIRE(act_c->get_state() == r::state_t::SHUT_DOWN);
  238. REQUIRE(sup1->get_state() == r::state_t::SHUT_DOWN);
  239. sup2->do_process();
  240. REQUIRE(sup2->get_state() == r::state_t::OPERATIONAL);
  241. sup2->do_shutdown();
  242. sup2->do_process();
  243. REQUIRE(sup2->get_state() == r::state_t::SHUT_DOWN);
  244. }
  245. TEST_CASE("link to operational only", "[actor]") {
  246. rt::system_context_test_t ctx1;
  247. rt::system_context_test_t ctx2;
  248. rt::system_context_test_t ctx3;
  249. const char l1[] = "abc";
  250. const char l2[] = "def";
  251. const char l3[] = "ghi";
  252. auto sup1 = ctx1.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l1).finish();
  253. auto sup2 = ctx2.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l2).finish();
  254. auto sup3 = ctx3.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l3).finish();
  255. auto act_c = sup1->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  256. auto act_s1 = sup2->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  257. auto act_s2 = sup3->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  258. auto &addr_s1 = act_s1->get_address();
  259. auto &addr_s2 = act_s2->get_address();
  260. auto process_12 = [&]() {
  261. while (!sup1->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
  262. sup1->do_process();
  263. sup2->do_process();
  264. }
  265. };
  266. auto process_123 = [&]() {
  267. while (!sup1->get_leader_queue().empty() || !sup2->get_leader_queue().empty() ||
  268. !sup3->get_leader_queue().empty()) {
  269. sup1->do_process();
  270. sup2->do_process();
  271. sup3->do_process();
  272. }
  273. };
  274. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  275. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) { p.link(addr_s1, true, [&](auto &) {}); });
  276. };
  277. act_s1->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  278. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) { p.link(addr_s2, true, [&](auto &) {}); });
  279. };
  280. process_12();
  281. CHECK(act_c->get_state() == r::state_t::INITIALIZING);
  282. CHECK(act_s1->get_state() == r::state_t::INITIALIZING);
  283. process_123();
  284. CHECK(act_c->get_state() == r::state_t::OPERATIONAL);
  285. CHECK(act_s1->get_state() == r::state_t::OPERATIONAL);
  286. CHECK(act_s2->get_state() == r::state_t::OPERATIONAL);
  287. sup1->do_shutdown();
  288. sup2->do_shutdown();
  289. sup3->do_shutdown();
  290. process_123();
  291. CHECK(act_c->get_state() == r::state_t::SHUT_DOWN);
  292. CHECK(act_s1->get_state() == r::state_t::SHUT_DOWN);
  293. CHECK(act_s2->get_state() == r::state_t::SHUT_DOWN);
  294. }
  295. TEST_CASE("unlink notify / response race", "[actor]") {
  296. rt::system_context_test_t system_context;
  297. const char l1[] = "abc";
  298. const char l2[] = "def";
  299. auto sup1 =
  300. system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l1).finish();
  301. auto sup2 = sup1->create_actor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l2).finish();
  302. auto act_s = sup1->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  303. auto act_c = sup2->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  304. auto &addr_s = act_s->get_address();
  305. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  306. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) { p.link(addr_s, true, [&](auto &) {}); });
  307. };
  308. while (!sup1->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
  309. sup1->do_process();
  310. sup2->do_process();
  311. }
  312. REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
  313. act_s->do_shutdown();
  314. act_c->do_shutdown();
  315. sup1->do_process();
  316. // extract unlink request to let it produce unlink notify
  317. auto unlink_request = sup2->get_leader_queue().back();
  318. REQUIRE(unlink_request->type_index == r::message::unlink_request_t::message_type);
  319. sup2->get_leader_queue().pop_back();
  320. sup2->do_process();
  321. sup1->do_shutdown();
  322. while (!sup1->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
  323. sup1->do_process();
  324. sup2->do_process();
  325. }
  326. CHECK(sup1->active_timers.size() == 0);
  327. CHECK(sup1->get_state() == r::state_t::SHUT_DOWN);
  328. }
  329. TEST_CASE("link errors", "[actor]") {
  330. rt::system_context_test_t ctx1;
  331. rt::system_context_test_t ctx2;
  332. const char l1[] = "abc";
  333. const char l2[] = "def";
  334. auto sup1 = ctx1.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l1).finish();
  335. auto sup2 = ctx2.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l2).finish();
  336. auto process_12 = [&]() {
  337. while (!sup1->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
  338. sup1->do_process();
  339. sup2->do_process();
  340. }
  341. };
  342. SECTION("double link attempt") {
  343. auto act_c = sup1->create_actor<double_linked_actor_t>().timeout(rt::default_timeout).finish();
  344. auto act_s = sup2->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  345. act_c->target = act_s->get_address();
  346. process_12();
  347. REQUIRE(act_c->message1);
  348. CHECK(!act_c->message1->payload.ee);
  349. REQUIRE(act_c->message2);
  350. CHECK(act_c->message2->payload.ee);
  351. CHECK(act_c->message2->payload.ee->ec.message() == std::string("already linked"));
  352. }
  353. SECTION("not linkeable") {
  354. auto act_s = sup2->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  355. sup2->do_process();
  356. act_s->access<rt::to::resources>()->acquire();
  357. act_s->do_shutdown();
  358. sup2->do_process();
  359. REQUIRE(act_s->get_state() == r::state_t::SHUTTING_DOWN);
  360. SECTION("check error") {
  361. r::extended_error_ptr_t err;
  362. auto act_c = sup1->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  363. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  364. plugin.with_casted<r::plugin::link_client_plugin_t>(
  365. [&](auto &p) { p.link(act_s->get_address(), false, [&](auto &ec) { err = ec; }); });
  366. };
  367. process_12();
  368. CHECK(act_c->get_state() == r::state_t::SHUT_DOWN);
  369. REQUIRE(err);
  370. CHECK(err->ec.message() == std::string("actor is not linkeable"));
  371. }
  372. SECTION("get the error during shutdown") {
  373. auto act_c = sup1->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  374. sup1->do_process();
  375. CHECK(act_c->get_state() == r::state_t::OPERATIONAL);
  376. auto plugin1 = act_c->access<rt::to::get_plugin>(r::plugin::link_client_plugin_t::class_identity);
  377. auto p1 = static_cast<r::plugin::link_client_plugin_t *>(plugin1);
  378. p1->link(act_s->get_address(), false, [&](auto &) {});
  379. act_c->access<rt::to::resources>()->acquire();
  380. act_c->do_shutdown();
  381. process_12();
  382. CHECK(act_c->get_state() == r::state_t::SHUTTING_DOWN);
  383. act_c->access<rt::to::resources>()->release();
  384. }
  385. act_s->access<rt::to::resources>()->release();
  386. }
  387. SECTION("unlink during shutring down") {
  388. auto act_c = sup1->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  389. auto act_s = sup2->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  390. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  391. plugin.with_casted<r::plugin::link_client_plugin_t>(
  392. [&](auto &p) { p.link(act_s->get_address(), false, [&](auto &) {}); });
  393. };
  394. process_12();
  395. CHECK(sup1->get_state() == r::state_t::OPERATIONAL);
  396. CHECK(sup2->get_state() == r::state_t::OPERATIONAL);
  397. act_c->do_shutdown();
  398. act_c->access<rt::to::resources>()->acquire();
  399. sup1->do_process();
  400. CHECK(act_c->get_state() == r::state_t::SHUTTING_DOWN);
  401. act_s->do_shutdown();
  402. sup2->do_process();
  403. sup1->do_process();
  404. CHECK(act_c->get_state() == r::state_t::SHUTTING_DOWN);
  405. act_c->access<rt::to::resources>()->release();
  406. process_12();
  407. }
  408. sup1->do_shutdown();
  409. sup2->do_shutdown();
  410. process_12();
  411. CHECK(sup1->get_state() == r::state_t::SHUT_DOWN);
  412. CHECK(sup2->get_state() == r::state_t::SHUT_DOWN);
  413. }
  414. TEST_CASE("proper shutdown order, defined by linkage", "[actor]") {
  415. r::system_context_t system_context;
  416. auto sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  417. auto act_1 = sup->create_actor<tracked_actor_t>().timeout(rt::default_timeout).finish();
  418. auto act_2 = sup->create_actor<tracked_actor_t>().timeout(rt::default_timeout).finish();
  419. auto act_3 = sup->create_actor<tracked_actor_t>().timeout(rt::default_timeout).finish();
  420. /*
  421. printf("a1 = %p(%p), a2 = %p(%p), a3 = %p(%p)\n", act_1.get(), act_1->get_address().get(),
  422. act_2.get(), act_2->get_address().get(), act_3.get(), act_3->get_address().get());
  423. */
  424. std::uint32_t event_id = 1;
  425. auto shutdowner = [&](auto &me) {
  426. auto &self = static_cast<tracked_actor_t &>(me);
  427. self.shutdown_event = event_id++;
  428. };
  429. act_1->shutdowner = act_2->shutdowner = act_3->shutdowner = shutdowner;
  430. act_1->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  431. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) {
  432. p.link(act_2->get_address(), false);
  433. p.link(act_3->get_address(), false);
  434. });
  435. };
  436. act_2->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  437. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) { p.link(act_3->get_address(), false); });
  438. };
  439. sup->do_process();
  440. REQUIRE(sup->get_state() == r::state_t::OPERATIONAL);
  441. sup->do_shutdown();
  442. sup->do_process();
  443. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  444. CHECK(act_1->shutdown_event == 1);
  445. CHECK(act_2->shutdown_event == 2);
  446. CHECK(act_3->shutdown_event == 3);
  447. }
  448. TEST_CASE("unlink of root supervisor", "[actor]") {
  449. rt::system_context_test_t ctx;
  450. rt::system_context_test_t ctx1;
  451. rt::system_context_test_t ctx2;
  452. const char l1[] = "abc";
  453. const char l2[] = "def";
  454. auto sup1 = ctx1.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l1).finish();
  455. auto sup2 = ctx2.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).locality(l2).finish();
  456. sup2->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  457. plugin.with_casted<r::plugin::link_client_plugin_t>(
  458. [&](auto &p) { p.link(sup1->get_address(), false, [&](auto &) {}); });
  459. };
  460. auto process_12 = [&]() {
  461. while (!sup1->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
  462. sup1->do_process();
  463. sup2->do_process();
  464. }
  465. };
  466. process_12();
  467. REQUIRE(sup1->get_state() == r::state_t::OPERATIONAL);
  468. REQUIRE(sup2->get_state() == r::state_t::OPERATIONAL);
  469. sup1->do_shutdown();
  470. sup1->do_process();
  471. sup2->do_shutdown();
  472. process_12();
  473. CHECK(sup1->get_state() == r::state_t::SHUT_DOWN);
  474. CHECK(sup2->get_state() == r::state_t::SHUT_DOWN);
  475. }
  476. TEST_CASE("ignore unlink", "[actor]") {
  477. rt::system_context_test_t ctx;
  478. auto sup = ctx.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  479. auto act_c = sup->create_actor<ignore_unlink_actor_t>().timeout(rt::default_timeout).finish();
  480. auto act_s = sup->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
  481. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  482. plugin.with_casted<r::plugin::link_client_plugin_t>([&](auto &p) { p.link(act_s->get_address(), true); });
  483. };
  484. sup->do_process();
  485. REQUIRE(sup->get_state() == r::state_t::OPERATIONAL);
  486. act_s->do_shutdown();
  487. sup->do_process();
  488. CHECK(act_c->get_state() == r::state_t::OPERATIONAL);
  489. CHECK(act_s->get_state() == r::state_t::SHUT_DOWN);
  490. CHECK(act_c->server_addr == act_s->get_address());
  491. sup->do_shutdown();
  492. sup->do_process();
  493. }