123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- //
- // Copyright (c) 2019-2020 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
- //
- // Distributed under the MIT Software License
- //
- #include "catch.hpp"
- #include "rotor.hpp"
- #include "supervisor_test.h"
- #include "access.h"
- namespace r = rotor;
- namespace rt = r::test;
- static std::uint32_t ping_received = 0;
- static std::uint32_t ping_sent = 0;
- struct ping_t {};
- struct pinger_t : public r::actor_base_t {
- using r::actor_base_t::actor_base_t;
- void set_ponger_addr(const r::address_ptr_t &addr) { ponger_addr = addr; }
- void configure(r::plugin::plugin_base_t &plugin) noexcept override {
- plugin.with_casted<r::plugin::starter_plugin_t>([](auto &p) { p.subscribe_actor(&pinger_t::on_state); });
- }
- void on_start() noexcept override {
- r::actor_base_t::on_start();
- request_status();
- }
- void request_status() noexcept {
- auto &sup_addr = static_cast<r::actor_base_t &>(ponger_addr->supervisor).get_address();
- request<r::payload::state_request_t>(sup_addr, ponger_addr).send(r::pt::seconds{1});
- ++attempts;
- }
- void on_state(r::message::state_response_t &msg) noexcept {
- auto &state = msg.payload.res.state;
- if (state == r::state_t::OPERATIONAL) {
- send<ping_t>(ponger_addr);
- ponger_addr.reset();
- ping_sent++;
- } else if (attempts > 10) {
- do_shutdown();
- } else {
- request_status();
- }
- }
- std::uint32_t attempts = 0;
- r::address_ptr_t ponger_addr;
- };
- struct ponger_t : public r::actor_base_t {
- using r::actor_base_t::actor_base_t;
- void configure(r::plugin::plugin_base_t &plugin) noexcept override {
- plugin.with_casted<r::plugin::starter_plugin_t>([](auto &p) { p.subscribe_actor(&ponger_t::on_ping); });
- }
- void on_ping(r::message_t<ping_t> &) noexcept {
- ping_received++;
- do_shutdown();
- }
- };
- struct custom_sup : rt::supervisor_test_t {
- using rt::supervisor_test_t::supervisor_test_t;
- void on_child_init(actor_base_t *, const std::error_code &ec) noexcept override { error_code = ec; }
- std::error_code error_code;
- };
- /*
- * Let's have the following tree of supervisors
- *
- * S_root
- * | |
- * S_A1 S_B1
- * | |
- * S_A2 S_B2
- * / \
- * pinger ponger
- *
- * 1. Pinger should be able to send ping message to ponger. The message should
- * be processed by S_1, still it have to be delivered to ponger
- *
- * 2. Ponger should receive the message, and initiate it's own shutdown procedure
- *
- * 3. As all supervisors have the same localitiy, the S_2 supervisor should
- * receive ponger shutdown request and spawn a new ponger.
- *
- * 4. All messaging (except initialization) should happen in single do_process
- * pass
- *
- */
- TEST_CASE("supervisor/locality tree ", "[supervisor]") {
- r::system_context_t system_context;
- const void *locality = &system_context;
- auto sup_root = system_context.create_supervisor<rt::supervisor_test_t>()
- .locality(locality)
- .timeout(rt::default_timeout)
- .finish();
- auto sup_A1 =
- sup_root->create_actor<rt::supervisor_test_t>().locality(locality).timeout(rt::default_timeout).finish();
- auto sup_A2 =
- sup_A1->create_actor<rt::supervisor_test_t>().locality(locality).timeout(rt::default_timeout).finish();
- auto sup_B1 =
- sup_root->create_actor<rt::supervisor_test_t>().locality(locality).timeout(rt::default_timeout).finish();
- auto sup_B2 =
- sup_B1->create_actor<rt::supervisor_test_t>().locality(locality).timeout(rt::default_timeout).finish();
- auto pinger = sup_A2->create_actor<pinger_t>().timeout(rt::default_timeout).finish();
- auto ponger = sup_B2->create_actor<ponger_t>().timeout(rt::default_timeout).finish();
- pinger->set_ponger_addr(ponger->get_address());
- sup_A2->do_process();
- REQUIRE(sup_A2->get_children_count() == 1 + 1);
- REQUIRE(sup_B2->get_children_count() == 1);
- REQUIRE(ping_sent == 1);
- REQUIRE(ping_received == 1);
- sup_root->do_shutdown();
- sup_root->do_process();
- REQUIRE(sup_A2->get_state() == r::state_t::SHUT_DOWN);
- REQUIRE(sup_B2->get_state() == r::state_t::SHUT_DOWN);
- REQUIRE(sup_A1->get_state() == r::state_t::SHUT_DOWN);
- REQUIRE(sup_B1->get_state() == r::state_t::SHUT_DOWN);
- REQUIRE(sup_root->get_state() == r::state_t::SHUT_DOWN);
- }
- TEST_CASE("failure escalation") {
- r::system_context_t system_context;
- auto sup_root =
- system_context.create_supervisor<custom_sup>().timeout(rt::default_timeout).create_registry().finish();
- auto sup_child = sup_root->create_actor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
- r::address_ptr_t dummy_addr;
- auto act = sup_child->create_actor<rt::actor_test_t>().timeout(rt::default_timeout).finish();
- act->configurer = [&](auto &, r::plugin::plugin_base_t &plugin) {
- plugin.with_casted<r::plugin::registry_plugin_t>([&](auto &p) { p.discover_name("service-name", dummy_addr); });
- };
- sup_root->do_process();
- CHECK(act->get_state() == r::state_t::SHUT_DOWN);
- CHECK(sup_child->get_state() == r::state_t::SHUT_DOWN);
- CHECK(sup_root->get_state() == r::state_t::SHUT_DOWN);
- CHECK(sup_root->error_code.message() == "failure escalation (child actor died)");
- }
|