030-registry.cpp 20 KB


  1. //
  2. // Copyright (c) 2019-2021 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. #include "access.h"
  7. #include "catch.hpp"
  8. #include "rotor.hpp"
  9. #include "supervisor_test.h"
  10. #include "actor_test.h"
  11. namespace r = rotor;
  12. namespace rt = r::test;
  13. struct manual_actor_t : public r::actor_base_t {
  14. using r::actor_base_t::actor_base_t;
  15. // no registry plugin
  16. // clang-format off
  17. using plugins_list_t = std::tuple<
  18. r::plugin::address_maker_plugin_t,
  19. r::plugin::lifetime_plugin_t,
  20. r::plugin::init_shutdown_plugin_t,
  21. r::plugin::starter_plugin_t>;
  22. // clang-format on
  23. using discovery_reply_t = r::intrusive_ptr_t<r::message::discovery_response_t>;
  24. using future_reply_t = r::intrusive_ptr_t<r::message::discovery_future_t>;
  25. using registration_reply_t = r::intrusive_ptr_t<r::message::registration_response_t>;
  26. r::address_ptr_t registry_addr;
  27. discovery_reply_t discovery_reply;
  28. future_reply_t future_reply;
  29. registration_reply_t registration_reply;
  30. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  31. r::actor_base_t::configure(plugin);
  32. plugin.with_casted<r::plugin::starter_plugin_t>([](auto &p) {
  33. p.subscribe_actor(&manual_actor_t::on_discovery);
  34. p.subscribe_actor(&manual_actor_t::on_registration_reply);
  35. p.subscribe_actor(&manual_actor_t::on_future);
  36. });
  37. }
  38. void query_name(const std::string &name) {
  39. request<r::payload::discovery_request_t>(registry_addr, name).send(rt::default_timeout);
  40. }
  41. r::request_id_t promise_name(const std::string &name) {
  42. return request<r::payload::discovery_promise_t>(registry_addr, name).send(rt::default_timeout);
  43. }
  44. void cancel_name(r::request_id_t request_id) {
  45. using payload_t = r::message::discovery_cancel_t::payload_t;
  46. send<payload_t>(registry_addr, request_id, address);
  47. }
  48. void register_name(const std::string &name) {
  49. request<r::payload::registration_request_t>(registry_addr, name, address).send(rt::default_timeout);
  50. }
  51. void unregister_all() { send<r::payload::deregistration_notify_t>(registry_addr, address); }
  52. void unregister_name(const std::string &name) { send<r::payload::deregistration_service_t>(registry_addr, name); }
  53. void on_discovery(r::message::discovery_response_t &reply) noexcept { discovery_reply.reset(&reply); }
  54. void on_future(r::message::discovery_future_t &reply) noexcept { future_reply.reset(&reply); }
  55. void on_registration_reply(r::message::registration_response_t &reply) noexcept {
  56. registration_reply.reset(&reply);
  57. }
  58. };
  59. struct sample_actor_t : rt::actor_test_t {
  60. using rt::actor_test_t::actor_test_t;
  61. r::address_ptr_t service_addr;
  62. };
  63. TEST_CASE("supervisor related tests", "[registry][supervisor]") {
  64. r::system_context_t system_context;
  65. rt::supervisor_test_ptr_t sup;
  66. SECTION("no registry on supervisor by default") {
  67. sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  68. sup->do_process();
  69. CHECK(!sup->access<rt::to::registry>());
  70. }
  71. SECTION("registry is created, when asked") {
  72. sup = system_context.create_supervisor<rt::supervisor_test_t>()
  73. .timeout(rt::default_timeout)
  74. .create_registry(true)
  75. .finish();
  76. sup->do_process();
  77. CHECK(sup->access<rt::to::registry>());
  78. }
  79. SECTION("registry is inherited") {
  80. sup = system_context.create_supervisor<rt::supervisor_test_t>()
  81. .timeout(rt::default_timeout)
  82. .create_registry(true)
  83. .finish();
  84. auto sup2 = sup->create_actor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  85. sup->do_process();
  86. CHECK(sup->access<rt::to::registry>());
  87. CHECK(sup2->access<rt::to::registry>());
  88. }
  89. SECTION("registry is set from different locality") {
  90. const char locality1[] = "abc";
  91. const char locality2[] = "def";
  92. sup = system_context.create_supervisor<rt::supervisor_test_t>()
  93. .timeout(rt::default_timeout)
  94. .locality(locality1)
  95. .finish();
  96. auto reg = sup->create_actor<r::registry_t>().timeout(rt::default_timeout).finish();
  97. sup->do_process();
  98. CHECK(!sup->access<rt::to::registry>());
  99. auto sup2 = sup->create_actor<rt::supervisor_test_t>()
  100. .timeout(rt::default_timeout)
  101. .locality(locality2)
  102. .registry_address(reg->get_address())
  103. .finish();
  104. while (!sup->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
  105. sup->do_process();
  106. sup2->do_process();
  107. }
  108. CHECK(sup2->access<rt::to::registry>());
  109. sup2->do_shutdown();
  110. while (!sup->get_leader_queue().empty() || !sup2->get_leader_queue().empty()) {
  111. sup->do_process();
  112. sup2->do_process();
  113. }
  114. }
  115. sup->do_shutdown();
  116. sup->do_process();
  117. }
  118. TEST_CASE("registry actor (server)", "[registry][supervisor]") {
  119. r::system_context_t system_context;
  120. auto sup = system_context.create_supervisor<rt::supervisor_test_t>()
  121. .timeout(rt::default_timeout)
  122. .create_registry(true)
  123. .finish();
  124. auto act = sup->create_actor<manual_actor_t>().timeout(rt::default_timeout).finish();
  125. act->registry_addr = sup->access<rt::to::registry>();
  126. sup->do_process();
  127. SECTION("discovery non-exsiting name") {
  128. act->query_name("some-name");
  129. sup->do_process();
  130. REQUIRE((bool)act->discovery_reply);
  131. auto &ec = act->discovery_reply->payload.ee->ec;
  132. CHECK(ec == r::error_code_t::unknown_service);
  133. CHECK(ec.message() == "the requested service name is not registered");
  134. }
  135. SECTION("duplicate registration attempt") {
  136. act->register_name("nnn");
  137. sup->do_process();
  138. REQUIRE((bool)act->registration_reply);
  139. REQUIRE(!act->registration_reply->payload.ee);
  140. act->register_name("nnn");
  141. sup->do_process();
  142. auto &ec = act->registration_reply->payload.ee->ec;
  143. REQUIRE((bool)ec);
  144. REQUIRE(ec == r::error_code_t::already_registered);
  145. REQUIRE(ec.message() == "service name is already registered");
  146. }
  147. SECTION("reg 2 names, check, unreg on, check") {
  148. act->register_name("s1");
  149. sup->do_process();
  150. REQUIRE((bool)act->registration_reply);
  151. REQUIRE(!act->registration_reply->payload.ee);
  152. act->query_name("s1");
  153. sup->do_process();
  154. REQUIRE((bool)act->discovery_reply);
  155. REQUIRE(!act->discovery_reply->payload.ee);
  156. REQUIRE(act->discovery_reply->payload.res.service_addr.get() == act->get_address().get());
  157. act->register_name("s2");
  158. sup->do_process();
  159. REQUIRE((bool)act->registration_reply);
  160. REQUIRE(!act->registration_reply->payload.ee);
  161. act->query_name("s2");
  162. sup->do_process();
  163. REQUIRE((bool)act->discovery_reply);
  164. REQUIRE(!act->discovery_reply->payload.ee);
  165. REQUIRE(act->discovery_reply->payload.res.service_addr.get() == act->get_address().get());
  166. act->register_name("s3");
  167. sup->do_process();
  168. REQUIRE((bool)act->registration_reply);
  169. REQUIRE(!act->registration_reply->payload.ee);
  170. act->unregister_name("s2");
  171. act->query_name("s2");
  172. sup->do_process();
  173. REQUIRE(act->discovery_reply->payload.ee->ec == r::error_code_t::unknown_service);
  174. act->unregister_all();
  175. act->query_name("s1");
  176. sup->do_process();
  177. REQUIRE(act->discovery_reply->payload.ee->ec == r::error_code_t::unknown_service);
  178. act->query_name("s3");
  179. sup->do_process();
  180. REQUIRE(act->discovery_reply->payload.ee->ec == r::error_code_t::unknown_service);
  181. }
  182. SECTION("promise & future") {
  183. REQUIRE(!act->future_reply);
  184. SECTION("promise, register, future") {
  185. act->promise_name("s1");
  186. act->register_name("s1");
  187. sup->do_process();
  188. CHECK(act->future_reply);
  189. CHECK(act->future_reply->payload.res.service_addr.get() == act->get_address().get());
  190. }
  191. SECTION("future, register, promise") {
  192. act->register_name("s1");
  193. act->promise_name("s1");
  194. sup->do_process();
  195. CHECK(act->future_reply);
  196. CHECK(act->future_reply->payload.res.service_addr.get() == act->get_address().get());
  197. }
  198. SECTION("cancel") {
  199. auto req_id = act->promise_name("s1");
  200. act->cancel_name(req_id);
  201. sup->do_process();
  202. auto plugin = static_cast<r::actor_base_t *>(sup.get())->access<rt::to::get_plugin>(
  203. r::plugin::child_manager_plugin_t::class_identity);
  204. auto &reply = act->future_reply;
  205. CHECK(reply->payload.ee);
  206. CHECK(reply->payload.ee->ec.message() == "request has been cancelled");
  207. auto &actors_map = static_cast<r::plugin::child_manager_plugin_t *>(plugin)->access<rt::to::actors_map>();
  208. auto actor_state = actors_map.find(act->registry_addr);
  209. auto &registry = actor_state->second.actor;
  210. auto &promises = static_cast<r::registry_t *>(registry.get())->access<rt::to::promises>();
  211. CHECK(promises.empty());
  212. }
  213. }
  214. sup->do_shutdown();
  215. sup->do_process();
  216. }
  217. TEST_CASE("registry plugin (client)", "[registry][supervisor]") {
  218. r::system_context_t system_context;
  219. auto sup = system_context.create_supervisor<rt::supervisor_test_t>()
  220. .timeout(rt::default_timeout)
  221. .create_registry(true)
  222. .finish();
  223. SECTION("common case (just discover)") {
  224. auto act_s = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  225. act_s->configurer = [&](auto &actor, r::plugin::plugin_base_t &plugin) {
  226. plugin.with_casted<r::plugin::registry_plugin_t>(
  227. [&actor](auto &p) { p.register_name("service-name", actor.get_address()); });
  228. };
  229. sup->do_process();
  230. REQUIRE(sup->get_state() == r::state_t::OPERATIONAL);
  231. auto act_c = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  232. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  233. plugin.with_casted<r::plugin::registry_plugin_t>(
  234. [&](auto &p) { p.discover_name("service-name", act_c->service_addr); });
  235. };
  236. sup->do_process();
  237. CHECK(act_c->get_state() == r::state_t::OPERATIONAL);
  238. CHECK(act_c->service_addr == act_s->get_address());
  239. sup->do_shutdown();
  240. sup->do_process();
  241. CHECK(act_c->get_state() == r::state_t::SHUT_DOWN);
  242. CHECK(act_s->get_state() == r::state_t::SHUT_DOWN);
  243. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  244. }
  245. SECTION("common case (discover & link)") {
  246. auto act_s = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  247. act_s->configurer = [&](auto &actor, r::plugin::plugin_base_t &plugin) {
  248. plugin.with_casted<r::plugin::registry_plugin_t>(
  249. [&actor](auto &p) { p.register_name("service-name", actor.get_address()); });
  250. };
  251. sup->do_process();
  252. REQUIRE(sup->get_state() == r::state_t::OPERATIONAL);
  253. auto act_c = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  254. int succeses = 0;
  255. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  256. plugin.with_casted<r::plugin::registry_plugin_t>([&](auto &p) {
  257. p.discover_name("service-name", act_c->service_addr)
  258. .link(true)
  259. .callback([&](auto /*phase*/, auto &ec) mutable {
  260. REQUIRE(!ec);
  261. ++succeses;
  262. });
  263. });
  264. };
  265. sup->do_process();
  266. CHECK(act_c->get_state() == r::state_t::OPERATIONAL);
  267. CHECK(act_c->service_addr == act_s->get_address());
  268. CHECK(succeses == 2);
  269. sup->do_shutdown();
  270. sup->do_process();
  271. CHECK(act_c->get_state() == r::state_t::SHUT_DOWN);
  272. CHECK(act_s->get_state() == r::state_t::SHUT_DOWN);
  273. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  274. }
  275. SECTION("common case (promise & link)") {
  276. auto act_c = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  277. int succeses = 0;
  278. act_c->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  279. plugin.with_casted<r::plugin::registry_plugin_t>([&](auto &p) {
  280. p.discover_name("service-name", act_c->service_addr, true)
  281. .link(true)
  282. .callback([&](auto /*phase*/, auto &ec) mutable {
  283. REQUIRE(!ec);
  284. ++succeses;
  285. });
  286. });
  287. };
  288. sup->do_process();
  289. CHECK(succeses == 0);
  290. SECTION("successful link") {
  291. auto act_s = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  292. act_s->configurer = [&](auto &actor, r::plugin::plugin_base_t &plugin) {
  293. plugin.with_casted<r::plugin::registry_plugin_t>(
  294. [&actor](auto &p) { p.register_name("service-name", actor.get_address()); });
  295. };
  296. sup->do_process();
  297. CHECK(succeses == 2);
  298. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  299. CHECK(act_c->get_state() == r::state_t::OPERATIONAL);
  300. CHECK(act_c->service_addr == act_s->get_address());
  301. }
  302. SECTION("cancel promise") {
  303. CHECK(act_c->get_state() == r::state_t::INITIALIZING);
  304. act_c->do_shutdown();
  305. sup->do_process();
  306. CHECK(act_c->get_state() == r::state_t::SHUT_DOWN);
  307. auto plugin = act_c->access<rt::to::get_plugin>(r::plugin::registry_plugin_t::class_identity);
  308. auto p = static_cast<r::plugin::registry_plugin_t *>(plugin);
  309. auto &dm = p->access<rt::to::discovery_map>();
  310. CHECK(dm.size() == 0);
  311. }
  312. sup->do_shutdown();
  313. sup->do_process();
  314. CHECK(act_c->get_state() == r::state_t::SHUT_DOWN);
  315. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  316. }
  317. SECTION("discovery non-existing name => fail to init") {
  318. auto act = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  319. act->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  320. plugin.with_casted<r::plugin::registry_plugin_t>(
  321. [&act](auto &p) { p.discover_name("non-existing-service", act->service_addr); });
  322. };
  323. sup->do_process();
  324. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  325. auto &reason = act->get_shutdown_reason();
  326. CHECK(reason->ec == r::shutdown_code_t::supervisor_shutdown);
  327. CHECK(reason->ec.message() == "actor shutdown has been requested by supervisor");
  328. CHECK(reason->next->ec == r::shutdown_code_t::child_init_failed);
  329. CHECK(reason->next->ec.message() == "supervisor shutdown due to child init failure");
  330. auto &down_reason = reason->next->next->next;
  331. REQUIRE(down_reason);
  332. CHECK(down_reason->ec == r::error_code_t::discovery_failed);
  333. CHECK(down_reason->ec.message() == "discovery has been failed");
  334. }
  335. SECTION("double name registration => fail") {
  336. auto act1 = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  337. auto act2 = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  338. printf("act1 = %p(%p), act2 = %p(%p)\n", (void *)act1.get(), (void *)act1->get_address().get(),
  339. (void *)act2.get(), (void *)act2->get_address().get());
  340. auto configurer = [](auto &actor, r::plugin::plugin_base_t &plugin) {
  341. plugin.with_casted<r::plugin::registry_plugin_t>(
  342. [&actor](auto &p) { p.register_name("service-name", actor.get_address()); });
  343. };
  344. act1->configurer = configurer;
  345. act2->configurer = configurer;
  346. sup->do_process();
  347. CHECK(act1->get_state() == r::state_t::SHUT_DOWN);
  348. CHECK(act2->get_state() == r::state_t::SHUT_DOWN);
  349. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  350. auto &reason = act2->get_shutdown_reason();
  351. auto &down_reason = reason->next->next->next;
  352. REQUIRE(down_reason);
  353. CHECK(down_reason->ec == r::error_code_t::registration_failed);
  354. CHECK(down_reason->ec.message() == "registration has been failed");
  355. }
  356. }
  357. TEST_CASE("notify linked clients about going to shutdown", "[registry][supervisor]") {
  358. r::system_context_t system_context;
  359. auto sup = system_context.create_supervisor<rt::supervisor_test_t>()
  360. .timeout(rt::default_timeout)
  361. .create_registry(true)
  362. .finish();
  363. auto act1 = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  364. act1->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  365. plugin.with_casted<r::plugin::registry_plugin_t>([&act1](auto &p) {
  366. p.register_name("my-actor", act1->get_address());
  367. p.discover_name("non-existing-service", act1->service_addr, true);
  368. });
  369. };
  370. auto act2 = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  371. act2->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  372. plugin.with_casted<r::plugin::registry_plugin_t>(
  373. [&act1](auto &p) { p.discover_name("my-actor", act1->service_addr, true).link(false); });
  374. };
  375. sup->do_process();
  376. REQUIRE(act1->get_state() == r::state_t::INITIALIZING);
  377. REQUIRE(act2->get_state() == r::state_t::OPERATIONAL);
  378. REQUIRE(sup->get_state() == r::state_t::INITIALIZING);
  379. act1->do_shutdown();
  380. sup->do_process();
  381. CHECK(act1->get_state() == r::state_t::SHUT_DOWN);
  382. CHECK(act2->get_state() == r::state_t::SHUT_DOWN);
  383. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  384. }
  385. TEST_CASE("no problems when supervisor registers self in a registry", "[registry][supervisor]") {
  386. r::system_context_t system_context;
  387. auto sup = system_context.create_supervisor<rt::supervisor_test_t>()
  388. .timeout(rt::default_timeout)
  389. .create_registry(true)
  390. .configurer([](auto &actor, r::plugin::plugin_base_t &plugin) {
  391. plugin.with_casted<r::plugin::registry_plugin_t>(
  392. [&actor](auto &p) { p.register_name("service-name", actor.get_address()); });
  393. })
  394. .finish();
  395. SECTION("single supervisor and it's registry") {
  396. sup->do_process();
  397. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  398. }
  399. SECTION("supervisor + actor") {
  400. sup->do_process();
  401. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  402. auto act = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
  403. act->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
  404. plugin.with_casted<r::plugin::registry_plugin_t>(
  405. [&](auto &p) { p.discover_name("service-name", act->service_addr, false).link(false); });
  406. };
  407. sup->do_process();
  408. CHECK(act->get_state() == r::state_t::OPERATIONAL);
  409. }
  410. sup->do_shutdown();
  411. sup->do_process();
  412. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  413. }