|
- #pragma once
- #include "actor_base.h"
- #include "handler.h"
- #include "message.h"
- #include "subscription.h"
- #include "system_context.h"
- #include "supervisor_config.h"
- #include "address_mapping.h"
- #include "error_code.h"
- #include "spawner.h"
- #include <functional>
- #include <unordered_map>
- #include <unordered_set>
- #include <boost/lockfree/queue.hpp>
- #if defined(_MSC_VER)
- #pragma warning(push)
- #pragma warning(disable : 4251)
- #endif
- namespace rotor {
- struct ROTOR_API supervisor_t : public actor_base_t {
-
-
- using plugins_list_t = std::tuple<
- plugin::address_maker_plugin_t,
- plugin::locality_plugin_t,
- plugin::delivery_plugin_t<plugin::default_local_delivery_t>,
- plugin::lifetime_plugin_t,
- plugin::init_shutdown_plugin_t,
- plugin::foreigners_support_plugin_t,
- plugin::child_manager_plugin_t,
- plugin::link_server_plugin_t,
- plugin::link_client_plugin_t,
- plugin::registry_plugin_t,
- plugin::resources_plugin_t,
- plugin::starter_plugin_t>;
-
-
- using config_t = supervisor_config_t;
-
- template <typename Supervisor> using config_builder_t = supervisor_config_builder_t<Supervisor>;
-
- supervisor_t(supervisor_config_t &config);
- supervisor_t(const supervisor_t &) = delete;
- supervisor_t(supervisor_t &&) = delete;
- ~supervisor_t();
- virtual void do_initialize(system_context_t *ctx) noexcept override;
-
- inline size_t do_process() noexcept { return locality_leader->delivery->process(); }
-
- virtual address_ptr_t make_address() noexcept;
-
- virtual void commit_unsubscription(const subscription_info_ptr_t &info) noexcept;
-
- virtual void start() noexcept = 0;
- void on_start() noexcept override;
-
- virtual void shutdown() noexcept = 0;
- void do_shutdown(const extended_error_ptr_t &reason = {}) noexcept override;
- void shutdown_finish() noexcept override;
-
- virtual void on_child_init(actor_base_t *actor, const extended_error_ptr_t &ec) noexcept;
-
- virtual void on_child_shutdown(actor_base_t *actor) noexcept;
-
- virtual void enqueue(message_ptr_t message) noexcept = 0;
-
- inline void put(message_ptr_t message) { locality_leader->queue.emplace_back(std::move(message)); }
-
- template <typename Handler> void subscribe(actor_base_t &actor, Handler &&handler) {
- supervisor->subscribe(actor.address, wrap_handler(actor, std::move(handler)));
- }
-
- template <typename Handler> inline void unsubscribe_actor(const address_ptr_t &addr, Handler &&handler) noexcept {
- handler_ptr_t wrapped_handler(std::forward<Handler>(handler));
- lifetime->unsubscribe(wrapped_handler, addr);
- }
-
- template <typename Actor> auto create_actor() {
- using builder_t = typename Actor::template config_builder_t<Actor>;
- assert(manager && "child_manager_plugin_t should be already initialized");
- return builder_t([this](auto &actor) { manager->create_child(actor); }, this);
- }
-
- template <typename T, typename... Args>
- request_builder_t<T> do_request(actor_base_t &actor, const address_ptr_t &dest_addr, const address_ptr_t &reply_to,
- Args &&...args) noexcept {
- return request_builder_t<T>(*this, actor, dest_addr, reply_to, std::forward<Args>(args)...);
- }
-
- subscription_info_ptr_t subscribe(const handler_ptr_t &handler, const address_ptr_t &addr,
- const actor_base_t *owner_ptr, owner_tag_t owner_tag) noexcept;
-
- spawner_t spawn(factory_t) noexcept;
- using actor_base_t::subscribe;
-
- inline const address_ptr_t &get_registry_address() const noexcept { return registry_address; }
-
- template <typename T> auto &access() noexcept;
-
- template <typename T, typename... Args> auto access(Args... args) noexcept;
-
- using inbound_queue_t = boost::lockfree::queue<message_base_t *>;
- protected:
-
- virtual address_ptr_t instantiate_address(const void *locality) noexcept;
-
- using request_map_t = std::unordered_map<request_id_t, request_curry_t>;
-
- void on_request_trigger(request_id_t timer_id, bool cancelled) noexcept;
-
- virtual void do_start_timer(const pt::time_duration &interval, timer_handler_base_t &handler) noexcept = 0;
-
- virtual void do_cancel_timer(request_id_t timer_id) noexcept = 0;
-
- virtual void intercept(message_ptr_t &message, const void *tag, const continuation_t &continuation) noexcept;
-
- system_context_t *context;
-
- messages_queue_t queue;
-
- request_id_t last_req_id;
-
- request_map_t request_map;
-
- subscription_t subscription_map;
-
- supervisor_t *parent;
-
- plugin::delivery_plugin_base_t *delivery = nullptr;
-
- plugin::child_manager_plugin_t *manager = nullptr;
-
- supervisor_t *locality_leader;
-
- inbound_queue_t inbound_queue;
-
- size_t inbound_queue_size;
-
- pt::time_duration poll_duration;
-
- const std::atomic_bool *shutdown_flag = nullptr;
-
- pt::time_duration shutdown_poll_frequency = pt::millisec{100};
- private:
- using actors_set_t = std::unordered_set<const actor_base_t *>;
- bool create_registry;
- bool synchronize_start;
- address_ptr_t registry_address;
- actors_set_t alive_actors;
- supervisor_policy_t policy;
-
- address_mapping_t address_mapping;
- template <typename T> friend struct request_builder_t;
- template <typename Supervisor> friend struct actor_config_builder_t;
- friend struct plugin::delivery_plugin_base_t;
- friend struct actor_base_t;
- template <typename T> friend struct plugin::delivery_plugin_t;
- void discard_request(request_id_t request_id) noexcept;
- void uplift_last_message() noexcept;
- void on_shutdown_check_timer(request_id_t, bool cancelled) noexcept;
- inline request_id_t next_request_id() noexcept {
- AGAIN:
- auto &map = locality_leader->request_map;
- auto it = map.find(++locality_leader->last_req_id);
- if (it != map.end()) {
- goto AGAIN;
- }
- return locality_leader->last_req_id;
- }
- };
- using supervisor_ptr_t = intrusive_ptr_t<supervisor_t>;
- template <typename Supervisor> auto system_context_t::create_supervisor() {
- using builder_t = typename Supervisor::template config_builder_t<Supervisor>;
- return builder_t(
- [this](auto &actor) {
- if (supervisor) {
- auto ec = make_error_code(error_code_t::supervisor_defined);
- on_error(actor.get(), make_error(identity(), ec));
- actor.reset();
- } else {
- this->supervisor = actor;
- actor->do_initialize(this);
- }
- },
- *this);
- }
- template <typename M, typename... Args> void actor_base_t::send(const address_ptr_t &addr, Args &&...args) {
- supervisor->put(make_message<M>(addr, std::forward<Args>(args)...));
- }
- template <typename Delegate, typename Method>
- void actor_base_t::start_timer(request_id_t request_id, const pt::time_duration &interval, Delegate &delegate,
- Method method) noexcept {
- using final_handler_t = timer_handler_t<Delegate, Method>;
- auto handler = std::make_unique<final_handler_t>(this, request_id, &delegate, std::forward<Method>(method));
- supervisor->do_start_timer(interval, *handler);
- timers_map.emplace(request_id, std::move(handler));
- }
- template <typename Delegate, typename Method, typename>
- request_id_t actor_base_t::start_timer(const pt::time_duration &interval, Delegate &delegate, Method method) noexcept {
- auto request_id = supervisor->next_request_id();
- start_timer(request_id, interval, delegate, std::forward<Method>(method));
- return request_id;
- }
- template <typename Handler> handler_ptr_t wrap_handler(actor_base_t &actor, Handler &&handler) {
- using final_handler_t = handler_t<Handler>;
- auto handler_raw = new final_handler_t(actor, std::move(handler));
- return handler_ptr_t{handler_raw};
- }
- template <typename Handler> subscription_info_ptr_t actor_base_t::subscribe(Handler &&h) noexcept {
- auto wrapped_handler = wrap_handler(*this, std::move(h));
- return supervisor->subscribe(wrapped_handler, address, this, owner_tag_t::ANONYMOUS);
- }
- template <typename Handler>
- subscription_info_ptr_t actor_base_t::subscribe(Handler &&h, const address_ptr_t &addr) noexcept {
- auto wrapped_handler = wrap_handler(*this, std::move(h));
- return supervisor->subscribe(wrapped_handler, addr, this, owner_tag_t::ANONYMOUS);
- }
- namespace plugin {
- template <typename Handler>
- subscription_info_ptr_t plugin_base_t::subscribe(Handler &&h, const address_ptr_t &addr) noexcept {
- using final_handler_t = handler_t<Handler>;
- handler_ptr_t wrapped_handler(new final_handler_t(*this, std::move(h)));
- auto info = actor->supervisor->subscribe(wrapped_handler, addr, actor, owner_tag_t::PLUGIN);
- own_subscriptions.emplace_back(info);
- return info;
- }
- template <typename Handler> subscription_info_ptr_t plugin_base_t::subscribe(Handler &&h) noexcept {
- return subscribe(std::forward<Handler>(h), actor->address);
- }
- template <> inline auto &plugin_base_t::access<plugin::starter_plugin_t>() noexcept { return own_subscriptions; }
- template <typename Handler> subscription_info_ptr_t starter_plugin_t::subscribe_actor(Handler &&handler) noexcept {
- auto &address = actor->get_address();
- return subscribe_actor(std::forward<Handler>(handler), address);
- }
- template <typename Handler>
- subscription_info_ptr_t starter_plugin_t::subscribe_actor(Handler &&handler, const address_ptr_t &addr) noexcept {
- auto wrapped_handler = wrap_handler(*actor, std::move(handler));
- auto info = actor->get_supervisor().subscribe(wrapped_handler, addr, actor, owner_tag_t::PLUGIN);
- assert(std::count_if(tracked.begin(), tracked.end(), [&](auto &it) { return *it == *info; }) == 0 &&
- "already subscribed");
- tracked.emplace_back(info);
- access<starter_plugin_t>().emplace_back(info);
- return info;
- }
- template <> inline size_t delivery_plugin_t<plugin::local_delivery_t>::process() noexcept {
- size_t enqueued_messages{0};
- while (queue->size()) {
- auto message = message_ptr_t(queue->front().detach(), false);
- auto &dest = message->address;
- queue->pop_front();
- auto internal = dest->same_locality(*address);
- if (internal) {
- auto local_recipients = subscription_map->get_recipients(*message);
- if (local_recipients) {
- plugin::local_delivery_t::delivery(message, *local_recipients);
- }
- } else {
- dest->supervisor.enqueue(std::move(message));
- ++enqueued_messages;
- }
- }
- return enqueued_messages;
- }
- template <> inline size_t delivery_plugin_t<plugin::inspected_local_delivery_t>::process() noexcept {
- size_t enqueued_messages{0};
- while (queue->size()) {
- auto message = message_ptr_t(queue->front().detach(), false);
- auto &dest = message->address;
- queue->pop_front();
- auto internal = dest->same_locality(*address);
- const subscription_t::joint_handlers_t *local_recipients = nullptr;
- bool delivery_attempt = false;
- if (internal) {
- local_recipients = subscription_map->get_recipients(*message);
- delivery_attempt = true;
- } else {
- dest->supervisor.enqueue(std::move(message));
- ++enqueued_messages;
- }
- if (local_recipients) {
- plugin::inspected_local_delivery_t::delivery(message, *local_recipients, stringifier);
- } else {
- if (delivery_attempt) {
- plugin::inspected_local_delivery_t::discard(message, stringifier);
- }
- }
- }
- return enqueued_messages;
- }
- }
- template <typename Handler, typename Enabled> void actor_base_t::unsubscribe(Handler &&h) noexcept {
- supervisor->unsubscribe_actor(address, wrap_handler(*this, std::move(h)));
- }
- template <typename Handler, typename Enabled>
- void actor_base_t::unsubscribe(Handler &&h, address_ptr_t &addr) noexcept {
- supervisor->unsubscribe_actor(addr, wrap_handler(*this, std::move(h)));
- }
- template <typename T>
- template <typename... Args>
- request_builder_t<T>::request_builder_t(supervisor_t &sup_, actor_base_t &actor_, const address_ptr_t &destination_,
- const address_ptr_t &reply_to_, Args &&...args)
- : sup{sup_}, actor{actor_}, request_id{sup.next_request_id()}, destination{destination_}, reply_to{reply_to_},
- do_install_handler{false} {
- auto addr = sup.address_mapping.get_mapped_address(actor_, response_message_t::message_type);
- if (addr) {
- imaginary_address = addr;
- } else {
-
-
-
- imaginary_address = sup.make_address();
- do_install_handler = true;
- }
- req.reset(
- new request_message_t{destination, request_id, imaginary_address, reply_to_, std::forward<Args>(args)...});
- }
- template <typename T> request_id_t request_builder_t<T>::send(const pt::time_duration &timeout_) noexcept {
- if (do_install_handler) {
- install_handler();
- }
- auto fn = &request_traits_t<T>::make_error_response;
- sup.request_map.emplace(request_id, request_curry_t{fn, reply_to, req, &actor});
- sup.put(req);
- sup.start_timer(request_id, timeout_, sup, &supervisor_t::on_request_trigger);
- actor.active_requests.emplace(request_id);
- return request_id;
- }
- template <typename T> void request_builder_t<T>::install_handler() noexcept {
- auto handler = lambda<response_message_t>([supervisor = &sup](response_message_t &msg) {
- auto request_id = msg.payload.request_id();
- auto &request_map = supervisor->request_map;
- auto it = request_map.find(request_id);
-
-
-
-
- if (it != request_map.end()) {
- auto &curry = it->second;
- auto &orig_addr = curry.origin;
- supervisor->template send<wrapped_res_t>(orig_addr, msg.payload);
- supervisor->discard_request(request_id);
-
- supervisor->uplift_last_message();
- }
- });
- auto wrapped_handler = wrap_handler(sup, std::move(handler));
- auto info = sup.subscribe(wrapped_handler, imaginary_address, &actor, owner_tag_t::SUPERVISOR);
- sup.address_mapping.set(actor, info);
- }
- template <typename Request, typename... Args>
- request_builder_t<typename request_wrapper_t<Request>::request_t> actor_base_t::request(const address_ptr_t &dest_addr,
- Args &&...args) {
- using request_t = typename request_wrapper_t<Request>::request_t;
- return supervisor->do_request<request_t>(*this, dest_addr, address, std::forward<Args>(args)...);
- }
- template <typename Request, typename... Args>
- request_builder_t<typename request_wrapper_t<Request>::request_t>
- actor_base_t::request_via(const address_ptr_t &dest_addr, const address_ptr_t &reply_addr, Args &&...args) {
- using request_t = typename request_wrapper_t<Request>::request_t;
- return supervisor->do_request<request_t>(*this, dest_addr, reply_addr, std::forward<Args>(args)...);
- }
- template <typename Request> auto actor_base_t::make_response(Request &message, const extended_error_ptr_t &ec) {
- using payload_t = typename Request::payload_t::request_t;
- using traits_t = request_traits_t<payload_t>;
- return traits_t::make_error_response(message.payload.reply_to, message, ec);
- }
- template <typename Request, typename... Args> auto actor_base_t::make_response(Request &message, Args &&...args) {
- using payload_t = typename Request::payload_t::request_t;
- using req_traits_t = request_traits_t<payload_t>;
- using response_t = typename req_traits_t::response::wrapped_t;
- using request_ptr_t = typename req_traits_t::request::message_ptr_t;
- return make_message<response_t>(message.payload.reply_to, request_ptr_t{&message}, std::forward<Args>(args)...);
- }
- template <typename Request, typename... Args> void actor_base_t::reply_to(Request &message, Args &&...args) {
- supervisor->put(make_response<Request>(message, std::forward<Args>(args)...));
- }
- template <typename Request> void actor_base_t::reply_with_error(Request &message, const extended_error_ptr_t &ec) {
- supervisor->put(make_response<Request>(message, ec));
- }
- template <typename Actor>
- actor_config_builder_t<Actor>::actor_config_builder_t(install_action_t &&action_, supervisor_t *supervisor_)
- : install_action{std::move(action_)}, supervisor{supervisor_},
- system_context{*supervisor_->context}, config{supervisor_} {
- init_ctor();
- }
- template <typename Actor> intrusive_ptr_t<Actor> actor_config_builder_t<Actor>::finish() && {
- intrusive_ptr_t<Actor> actor_ptr;
- if (!validate()) {
- auto ec = make_error_code(error_code_t::actor_misconfigured);
- system_context.on_error(actor_ptr.get(), make_error(system_context.identity(), ec));
- } else {
- auto &cfg = static_cast<typename builder_t::config_t &>(config);
- auto actor = new Actor(cfg);
- actor_ptr.reset(actor);
- install_action(actor_ptr);
- }
- return actor_ptr;
- }
- }
- #if defined(_MSC_VER)
- #pragma warning(pop)
- #endif
|