123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484 |
- //
- // Copyright (c) 2019-2020 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
- //
- // Distributed under the MIT Software License
- //
- #include "catch.hpp"
- #include "rotor.hpp"
- #include "supervisor_test.h"
- #include "actor_test.h"
- #include "access.h"
- namespace r = rotor;
- namespace rt = rotor::test;
- static std::uint32_t destroyed = 0;
- struct init_shutdown_plugin_t;
- namespace payload {
- struct sample_payload_t {};
- } // namespace payload
- namespace message {
- using sample_payload_t = r::message_t<payload::sample_payload_t>;
- }
- struct sample_sup_t : public rt::supervisor_test_t {
- using sup_base_t = rt::supervisor_test_t;
- using plugins_list_t = std::tuple<r::plugin::address_maker_plugin_t, r::plugin::locality_plugin_t,
- r::plugin::delivery_plugin_t<r::plugin::local_delivery_t>,
- r::plugin::lifetime_plugin_t, init_shutdown_plugin_t, /* use custom */
- r::plugin::foreigners_support_plugin_t, r::plugin::child_manager_plugin_t,
- r::plugin::starter_plugin_t>;
- std::uint32_t initialized = 0;
- std::uint32_t init_invoked = 0;
- std::uint32_t shutdown_started = 0;
- std::uint32_t shutdown_finished = 0;
- std::uint32_t shutdown_conf_invoked = 0;
- r::address_ptr_t shutdown_addr;
- using rt::supervisor_test_t::supervisor_test_t;
- ~sample_sup_t() override { ++destroyed; }
- void do_initialize(r::system_context_t *ctx) noexcept override {
- ++initialized;
- sup_base_t::do_initialize(ctx);
- }
- void shutdown_finish() noexcept override {
- ++shutdown_finished;
- rt::supervisor_test_t::shutdown_finish();
- }
- };
- struct init_shutdown_plugin_t : r::plugin::init_shutdown_plugin_t {
- using parent_t = r::plugin::init_shutdown_plugin_t;
- void deactivate() noexcept override { parent_t::deactivate(); }
- bool handle_shutdown(r::message::shutdown_request_t *message) noexcept override {
- auto sup = static_cast<sample_sup_t *>(actor);
- sup->shutdown_started++;
- return parent_t::handle_shutdown(message);
- }
- bool handle_init(r::message::init_request_t *message) noexcept override {
- auto sup = static_cast<sample_sup_t *>(actor);
- sup->init_invoked++;
- return parent_t::handle_init(message);
- }
- };
- struct sample_plugin_t : r::plugin::plugin_base_t {
- using parent_t = r::plugin::plugin_base_t;
- static const void *class_identity;
- const void *identity() const noexcept override { return class_identity; }
- void activate(r::actor_base_t *actor_) noexcept override {
- parent_t::activate(actor_);
- subscribe(&sample_plugin_t::on_message)->tag_io();
- }
- void deactivate() noexcept override { parent_t::deactivate(); }
- void on_message(message::sample_payload_t &) noexcept { message_received = true; }
- bool message_received = false;
- };
- const void *sample_plugin_t::class_identity = &sample_plugin_t::class_identity;
- struct sample_sup2_t : public rt::supervisor_test_t {
- using sup_base_t = rt::supervisor_test_t;
- std::uint32_t initialized = 0;
- std::uint32_t init_invoked = 0;
- std::uint32_t shutdown_finished = 0;
- std::uint32_t shutdown_conf_invoked = 0;
- r::address_ptr_t shutdown_addr;
- actor_base_t *init_child = nullptr;
- actor_base_t *shutdown_child = nullptr;
- std::error_code init_ec;
- std::error_code shutdown_ec;
- using rt::supervisor_test_t::supervisor_test_t;
- ~sample_sup2_t() override { ++destroyed; }
- void do_initialize(r::system_context_t *ctx) noexcept override {
- ++initialized;
- sup_base_t::do_initialize(ctx);
- }
- void init_finish() noexcept override {
- ++init_invoked;
- sup_base_t::init_finish();
- }
- virtual void shutdown_finish() noexcept override {
- ++shutdown_finished;
- rt::supervisor_test_t::shutdown_finish();
- }
- void on_child_init(actor_base_t *actor, const std::error_code &ec) noexcept override {
- init_child = actor;
- init_ec = ec;
- }
- void on_child_shutdown(actor_base_t *actor, const std::error_code &ec) noexcept override {
- shutdown_child = actor;
- shutdown_ec = ec;
- }
- };
- struct sample_sup3_t : public rt::supervisor_test_t {
- using sup_base_t = rt::supervisor_test_t;
- using rt::supervisor_test_t::supervisor_test_t;
- std::uint32_t received = 0;
- void make_subscription() noexcept {
- subscribe(&sample_sup3_t::on_sample);
- send<payload::sample_payload_t>(address);
- }
- void on_sample(message::sample_payload_t &) noexcept { ++received; }
- };
- struct sample_sup4_t : public rt::supervisor_test_t {
- using sup_base_t = rt::supervisor_test_t;
- using rt::supervisor_test_t::supervisor_test_t;
- std::uint32_t counter = 0;
- void intercept(r::message_ptr_t &, const void *tag, const r::continuation_t &continuation) noexcept override {
- CHECK(tag == rotor::tags::io);
- if (++counter % 2) {
- continuation();
- }
- }
- };
- struct unsubscriber_sup_t : public rt::supervisor_test_t {
- using sup_base_t = rt::supervisor_test_t;
- using rt::supervisor_test_t::supervisor_test_t;
- void configure(r::plugin::plugin_base_t &plugin) noexcept override {
- plugin.with_casted<r::plugin::starter_plugin_t>(
- [](auto &p) { p.subscribe_actor(&unsubscriber_sup_t::on_sample); });
- }
- void on_start() noexcept override {
- rt::supervisor_test_t::on_start();
- unsubscribe(&unsubscriber_sup_t::on_sample);
- }
- void on_sample(message::sample_payload_t &) noexcept {}
- };
- struct sample_actor_t : public r::actor_base_t {
- using r::actor_base_t::actor_base_t;
- };
- struct sample_actor2_t : public rt::actor_test_t {
- using rt::actor_test_t::actor_test_t;
- void configure(r::plugin::plugin_base_t &plugin) noexcept override {
- plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) { alternative = p.create_address(); });
- plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
- p.subscribe_actor(&sample_actor2_t::on_link, alternative);
- send<payload::sample_payload_t>(alternative);
- });
- }
- void on_link(message::sample_payload_t &) noexcept { ++received; }
- r::address_ptr_t alternative;
- int received = 0;
- };
- struct sample_actor3_t : public rt::actor_test_t {
- using rt::actor_test_t::actor_test_t;
- void shutdown_start() noexcept override {
- rt::actor_test_t::shutdown_start();
- resources->acquire();
- }
- };
- struct sample_actor4_t : public rt::actor_test_t {
- using rt::actor_test_t::actor_test_t;
- void configure(r::plugin::plugin_base_t &plugin) noexcept override {
- rt::actor_test_t::configure(plugin);
- plugin.with_casted<r::plugin::starter_plugin_t>(
- [&](auto &p) { p.subscribe_actor(&sample_actor4_t::on_message)->tag_io(); });
- }
- void on_start() noexcept override {
- rt::actor_test_t::on_start();
- send<payload::sample_payload_t>(get_address());
- send<payload::sample_payload_t>(get_address());
- }
- void on_message(message::sample_payload_t &) noexcept { ++received; }
- std::size_t received = 0;
- };
- struct sample_actor5_t : public rt::actor_test_t {
- using rt::actor_test_t::actor_test_t;
- // clang-format off
- using plugins_list_t = std::tuple<
- r::plugin::address_maker_plugin_t,
- r::plugin::lifetime_plugin_t,
- r::plugin::init_shutdown_plugin_t,
- r::plugin::link_server_plugin_t,
- r::plugin::link_client_plugin_t,
- r::plugin::registry_plugin_t,
- r::plugin::resources_plugin_t,
- r::plugin::starter_plugin_t,
- sample_plugin_t
- >;
- // clang-format on
- void on_start() noexcept override {
- rt::actor_test_t::on_start();
- send<payload::sample_payload_t>(get_address());
- send<payload::sample_payload_t>(get_address());
- }
- };
- struct sample_actor6_t : public rt::actor_test_t {
- using rt::actor_test_t::actor_test_t;
- void on_start() noexcept override {
- rt::actor_test_t::on_start();
- start_timer(r::pt::minutes(1), *this, &sample_actor6_t::on_timer);
- }
- void on_timer(r::request_id_t, bool cancelled) noexcept { this->cancelled = cancelled; }
- bool cancelled = false;
- };
- TEST_CASE("on_initialize, on_start, simple on_shutdown (handled by plugin)", "[supervisor]") {
- destroyed = 0;
- r::system_context_t *system_context = new r::system_context_t{};
- auto sup = system_context->create_supervisor<sample_sup_t>().timeout(rt::default_timeout).finish();
- REQUIRE(&sup->get_supervisor() == sup.get());
- REQUIRE(sup->initialized == 1);
- sup->do_process();
- CHECK(sup->init_invoked == 1);
- CHECK(sup->shutdown_started == 0);
- CHECK(sup->shutdown_conf_invoked == 0);
- CHECK(sup->active_timers.size() == 0);
- CHECK(sup->get_state() == r::state_t::OPERATIONAL);
- sup->do_shutdown();
- sup->do_process();
- REQUIRE(sup->shutdown_started == 1);
- REQUIRE(sup->shutdown_finished == 1);
- REQUIRE(sup->active_timers.size() == 0);
- REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
- REQUIRE(sup->get_leader_queue().size() == 0);
- REQUIRE(sup->get_points().size() == 0);
- CHECK(rt::empty(sup->get_subscription()));
- REQUIRE(destroyed == 0);
- delete system_context;
- sup->shutdown_addr.reset();
- sup.reset();
- REQUIRE(destroyed == 1);
- }
- TEST_CASE("on_initialize, on_start, simple on_shutdown", "[supervisor]") {
- destroyed = 0;
- r::system_context_t *system_context = new r::system_context_t{};
- auto sup = system_context->create_supervisor<sample_sup2_t>().timeout(rt::default_timeout).finish();
- REQUIRE(&sup->get_supervisor() == sup.get());
- REQUIRE(sup->initialized == 1);
- REQUIRE(sup->init_child == nullptr);
- sup->do_process();
- REQUIRE(sup->init_invoked == 1);
- REQUIRE(sup->shutdown_conf_invoked == 0);
- REQUIRE(sup->active_timers.size() == 0);
- REQUIRE(sup->get_state() == r::state_t::OPERATIONAL);
- sup->do_shutdown();
- sup->do_process();
- REQUIRE(sup->shutdown_finished == 1);
- REQUIRE(sup->active_timers.size() == 0);
- REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
- REQUIRE(sup->get_leader_queue().size() == 0);
- REQUIRE(sup->get_points().size() == 0);
- CHECK(rt::empty(sup->get_subscription()));
- REQUIRE(sup->shutdown_child == nullptr);
- REQUIRE(destroyed == 0);
- delete system_context;
- sup->shutdown_addr.reset();
- sup.reset();
- REQUIRE(destroyed == 1);
- }
- TEST_CASE("start/shutdown 1 child & 1 supervisor", "[supervisor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto sup = system_context->create_supervisor<sample_sup2_t>().timeout(rt::default_timeout).finish();
- auto act = sup->create_actor<sample_actor_t>().timeout(rt::default_timeout).finish();
- /* for better coverage */
- auto last = sup->access<rt::to::last_req_id>();
- auto &request_map = sup->access<rt::to::request_map>();
- request_map[last + 1] = r::request_curry_t();
- sup->do_process();
- request_map.clear();
- CHECK(sup->access<rt::to::last_req_id>() > 1);
- CHECK(sup->get_state() == r::state_t::OPERATIONAL);
- CHECK(act->access<rt::to::state>() == r::state_t::OPERATIONAL);
- CHECK(act->access<rt::to::resources>()->has() == 0);
- CHECK(sup->init_child == act.get());
- CHECK(!sup->init_ec);
- CHECK(sup->shutdown_child == nullptr);
- sup->do_shutdown();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
- CHECK(act->access<rt::to::state>() == r::state_t::SHUT_DOWN);
- CHECK(sup->shutdown_child == act.get());
- CHECK(!sup->shutdown_ec);
- }
- TEST_CASE("custom subscription", "[supervisor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto sup = system_context->create_supervisor<sample_sup3_t>().timeout(rt::default_timeout).finish();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::OPERATIONAL);
- sup->make_subscription();
- sup->do_process();
- CHECK(sup->received == 1);
- sup->do_shutdown();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
- }
- TEST_CASE("shutdown immediately", "[supervisor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto sup = system_context->create_supervisor<sample_sup3_t>().timeout(rt::default_timeout).finish();
- sup->do_shutdown();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
- }
- TEST_CASE("self unsubscriber", "[actor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto sup = system_context->create_supervisor<unsubscriber_sup_t>().timeout(rt::default_timeout).finish();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::OPERATIONAL);
- sup->do_shutdown();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
- }
- TEST_CASE("alternative address subscriber", "[actor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto sup = system_context->create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
- auto act = sup->create_actor<sample_actor2_t>().timeout(rt::default_timeout).finish();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::OPERATIONAL);
- CHECK(act->get_state() == r::state_t::OPERATIONAL);
- CHECK(act->received == 1);
- sup->do_shutdown();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
- CHECK(act->get_state() == r::state_t::SHUT_DOWN);
- }
- TEST_CASE("acquire resources on shutdown start", "[actor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto sup = system_context->create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
- auto act = sup->create_actor<sample_actor3_t>().timeout(rt::default_timeout).finish();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::OPERATIONAL);
- sup->do_shutdown();
- sup->do_process();
- CHECK(act->get_state() == r::state_t::SHUTTING_DOWN);
- act->access<rt::to::resources>()->release();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
- CHECK(act->get_state() == r::state_t::SHUT_DOWN);
- }
- TEST_CASE("io tagging & intercepting", "[actor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto sup = system_context->create_supervisor<sample_sup4_t>().timeout(rt::default_timeout).finish();
- auto act = sup->create_actor<sample_actor4_t>().timeout(rt::default_timeout).finish();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::OPERATIONAL);
- CHECK(act->received == 1);
- CHECK(sup->counter == 2);
- sup->do_shutdown();
- sup->do_process();
- CHECK(act->get_state() == r::state_t::SHUT_DOWN);
- CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
- }
- TEST_CASE("io tagging (in plugin) & intercepting", "[actor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto sup = system_context->create_supervisor<sample_sup4_t>().timeout(rt::default_timeout).finish();
- auto act = sup->create_actor<sample_actor5_t>().timeout(rt::default_timeout).finish();
- sup->do_process();
- CHECK(sup->get_state() == r::state_t::OPERATIONAL);
- CHECK(sup->counter == 2);
- auto plugin = act->access<rt::to::get_plugin>(sample_plugin_t::class_identity);
- CHECK(plugin);
- CHECK(static_cast<sample_plugin_t *>(plugin)->message_received);
- sup->do_shutdown();
- sup->do_process();
- CHECK(act->get_state() == r::state_t::SHUT_DOWN);
- CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
- }
- TEST_CASE("timers cancellation", "[actor]") {
- r::system_context_ptr_t system_context = new r::system_context_t();
- auto sup = system_context->create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
- auto act = sup->create_actor<sample_actor6_t>().timeout(rt::default_timeout).finish();
- sup->do_process();
- CHECK(act->get_state() == r::state_t::OPERATIONAL);
- CHECK(sup->get_state() == r::state_t::OPERATIONAL);
- CHECK(!act->access<rt::to::timers_map>().empty());
- sup->do_shutdown();
- sup->do_process();
- CHECK(act->get_state() == r::state_t::SHUT_DOWN);
- CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
- CHECK(act->access<rt::to::timers_map>().empty());
- }
|