supervisor.h 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
  1. #pragma once
  2. //
  3. // Copyright (c) 2019-2020 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  4. //
  5. // Distributed under the MIT Software License
  6. //
  7. #include "actor_base.h"
  8. #include "handler.h"
  9. #include "message.h"
  10. #include "messages.hpp"
  11. #include "subscription.h"
  12. #include "system_context.h"
  13. #include "supervisor_config.h"
  14. #include "address_mapping.h"
  15. #include <functional>
  16. #include <unordered_map>
  17. namespace rotor {
  18. /** \struct supervisor_t
  19. * \brief supervisor is responsible for managing actors (workers) lifetime
  20. *
  21. * Supervisor starts, stops actors (children/workers) and process messages.
  22. * The message processing is basically sorting messages by their destination
  23. * {@link address_t}: if an address belongs to the supervisor, then message
  24. * is dispatched locally, otherwise the message is forwarded to supervisor,
  25. * which owns address.
  26. *
  27. * During message dispatching phase, supervisor examines handlers
  28. * ({@link handler_base_t}), if they are local, then a message in immediately
  29. * delivered to it (i.e. a local actor is invoked immediately), otherwise
  30. * is is forwarded for delivery to the supervisor, which owns the handler.
  31. *
  32. * Supervisor is responsible for managing it's local actors lifetime, i.e.
  33. * sending initialization, start, shutdown requests etc.
  34. *
  35. * Supervisor is locality-aware: i.e. if two supervisors have the same
  36. * locality (i.e. executed in the same thread/event loop), it takes advantage
  37. * of this and immediately delivers message to the target supervisor
  38. * without involving any synchronization mechanisms. In other words,
  39. * a message is delivered to any actor of the locality, even if the
  40. * actor is not child of the current supervisor.
  41. *
  42. * As supervisor is special kind of actor, it should be possible to spawn
  43. * other supervisors constructing tree-like organization of responsibilities.
  44. *
  45. * Unlike Erlang's supervisor, rotor's supervisor does not spawn actors
  46. * if they terminated. It should be possible, hovewer, to implement it in derived
  47. * classes with application-specific logic.
  48. *
  49. * This supervisor class is abstract, and the concrete implementation is
  50. * is event-loop specific, i.e. it should know how to start/stop shutdown
  51. * timers, how to trigger messages processing in thread-safe way, how
  52. * to deliver message to a supervisor in a thread-safe way etc.
  53. *
  54. */
  55. struct supervisor_t : public actor_base_t {
  56. // clang-format off
  57. /** \brief the default list of plugins for an supervisor
  58. *
  59. * The order of plugins is very important, as they are initialized in the direct order
  60. * and deinitilized in the reverse order.
  61. *
  62. */
  63. using plugins_list_t = std::tuple<
  64. plugin::address_maker_plugin_t,
  65. plugin::locality_plugin_t,
  66. plugin::delivery_plugin_t<plugin::default_local_delivery_t>,
  67. plugin::lifetime_plugin_t,
  68. plugin::init_shutdown_plugin_t,
  69. plugin::foreigners_support_plugin_t,
  70. plugin::child_manager_plugin_t,
  71. plugin::link_server_plugin_t,
  72. plugin::link_client_plugin_t,
  73. plugin::registry_plugin_t,
  74. plugin::starter_plugin_t>;
  75. // clang-format on
  76. /** \brief injects an alias for supervisor_config_t */
  77. using config_t = supervisor_config_t;
  78. /** \brief injects templated supervisor_config_builder_t */
  79. template <typename Supervisor> using config_builder_t = supervisor_config_builder_t<Supervisor>;
  80. /** \brief constructs new supervisor with optional parent supervisor */
  81. supervisor_t(supervisor_config_t &config);
  82. supervisor_t(const supervisor_t &) = delete;
  83. supervisor_t(supervisor_t &&) = delete;
  84. virtual void do_initialize(system_context_t *ctx) noexcept override;
  85. /** \brief process queue of messages of locality leader
  86. *
  87. * The locality leaders queue `queue` of messages is processed.
  88. *
  89. * -# It takes message from the queue
  90. * -# If the message destination address belongs to the foreing the supervisor,
  91. * then it is forwarded to it immediately.
  92. * -# Otherwise, the message is local, i.e. either for the supervisor or one
  93. * of its non-supervisor children (internal), or to other supervisor within
  94. * the same locality.
  95. * -# in the former case the message is immediately delivered locally in
  96. * the context of current supervisor; in the latter case in the context
  97. * of other supervsior. In the both cases `deliver_local` method is used.
  98. *
  99. * It is expected, that derived classes should invoke `do_process` message,
  100. * whenever it is known that there are messages for processing. The invocation
  101. * should be performed in safe thread/loop context.
  102. *
  103. * The method should be invoked in event-loop context only.
  104. *
  105. */
  106. inline void do_process() noexcept { delivery->process(); }
  107. /** \brief creates new {@link address_t} linked with the supervisor */
  108. virtual address_ptr_t make_address() noexcept;
  109. /** \brief removes the subscription point: local address and (foreign-or-local)
  110. * handler pair
  111. */
  112. virtual void commit_unsubscription(const subscription_info_ptr_t &info) noexcept;
  113. /** \brief thread-safe version of `do_process`
  114. *
  115. * Starts supervisor to processing messages queue in safe thread/loop
  116. * context. Once it becomes empty, the method returns
  117. */
  118. virtual void start() noexcept = 0;
  119. /** \brief thread-safe version of `do_shutdown`, i.e. send shutdown request
  120. * let it be processed by the supervisor */
  121. virtual void shutdown() noexcept = 0;
  122. void do_shutdown() noexcept override;
  123. void shutdown_finish() noexcept override;
  124. /** \brief supervisor hook for reaction on child actor init */
  125. virtual void on_child_init(actor_base_t *actor, const std::error_code &ec) noexcept;
  126. /** \brief supervisor hook for reaction on child actor shutdown */
  127. virtual void on_child_shutdown(actor_base_t *actor, const std::error_code &ec) noexcept;
  128. /** \brief enqueues messages thread safe way and triggers processing
  129. *
  130. * This is the only method for deliver message outside of `rotor` context.
  131. * Basically it is `put` and `process` in the event loop context.
  132. *
  133. * The thread-safety should be guaranteed by derived class and/or used event-loop.
  134. *
  135. * This method is used for messaging between supervisors with different
  136. * localities, event loops or threads.
  137. *
  138. */
  139. virtual void enqueue(message_ptr_t message) noexcept = 0;
  140. /** \brief puts a message into internal supevisor queue for further processing
  141. *
  142. * This is thread-unsafe method. The `enqueue` method should be used to put
  143. * a new message from external context in thread-safe way.
  144. *
  145. */
  146. inline void put(message_ptr_t message) { locality_leader->queue.emplace_back(std::move(message)); }
  147. /** \brief templated version of `subscribe_actor` */
  148. template <typename Handler> void subscribe(actor_base_t &actor, Handler &&handler) {
  149. supervisor->subscribe(actor.address, wrap_handler(actor, std::move(handler)));
  150. }
  151. /** \brief convenient templated version of `unsubscribe_actor */
  152. template <typename Handler> inline void unsubscribe_actor(const address_ptr_t &addr, Handler &&handler) noexcept {
  153. handler_ptr_t wrapped_handler(std::forward<Handler>(handler));
  154. lifetime->unsubscribe(wrapped_handler, addr);
  155. }
  156. /** \brief creates child-actor builder */
  157. template <typename Actor> auto create_actor() {
  158. using builder_t = typename Actor::template config_builder_t<Actor>;
  159. assert(manager && "child_manager_plugin_t should be already initialized");
  160. return builder_t([this](auto &actor) { manager->create_child(actor); }, this);
  161. }
  162. /** \brief convenient method for request building
  163. *
  164. * The built request isn't sent immediately, but only after invoking `send(timeout)`
  165. *
  166. */
  167. template <typename T, typename... Args>
  168. request_builder_t<T> do_request(actor_base_t &actor, const address_ptr_t &dest_addr, const address_ptr_t &reply_to,
  169. Args &&...args) noexcept {
  170. return request_builder_t<T>(*this, actor, dest_addr, reply_to, std::forward<Args>(args)...);
  171. }
  172. /**
  173. * \brief main subscription implementation
  174. *
  175. * The subscription point is materialized inot subscription info. If address is
  176. * internal/local, then it is immediately confirmed to the source actor as
  177. * {@link payload::subscription_confirmation_t}.
  178. *
  179. * Otherwise, if the address is external (foreign), then subscription request
  180. * is forwarded to approriate supervisor as {@link payload::external_subscription_t}
  181. * request.
  182. *
  183. * The materialized subscription info is returned.
  184. *
  185. */
  186. subscription_info_ptr_t subscribe(const handler_ptr_t &handler, const address_ptr_t &addr,
  187. const actor_base_t *owner_ptr, owner_tag_t owner_tag) noexcept;
  188. using actor_base_t::subscribe;
  189. /** \brief returns registry actor address (if it was defined or registry actor was created) */
  190. inline const address_ptr_t &get_registry_address() const noexcept { return registry_address; }
  191. /** \brief generic non-public fields accessor */
  192. template <typename T> auto &access() noexcept;
  193. /** \brief generic non-public methods accessor */
  194. template <typename T, typename... Args> auto access(Args... args) noexcept;
  195. protected:
  196. /** \brief creates new address with respect to supervisor locality mark */
  197. virtual address_ptr_t instantiate_address(const void *locality) noexcept;
  198. /** \brief timer to response with timeout procuder type */
  199. using request_map_t = std::unordered_map<request_id_t, request_curry_t>;
  200. /** \brief invoked as timer callback; creates response or just clean up for previously set request */
  201. void on_request_trigger(request_id_t timer_id, bool cancelled) noexcept;
  202. /** \brief starts non-recurring timer (to be implemented in descendants) */
  203. virtual void do_start_timer(const pt::time_duration &interval, timer_handler_base_t &handler) noexcept = 0;
  204. /** \brief cancels timer (to be implemented in descendants) */
  205. virtual void do_cancel_timer(request_id_t timer_id) noexcept = 0;
  206. /** \brief intercepts message delivery for the tagged handler */
  207. virtual void intercept(message_ptr_t &message, const void *tag, const continuation_t &continuation) noexcept;
  208. /** \brief non-owning pointer to system context. */
  209. system_context_t *context;
  210. /** \brief queue of unprocessed messages */
  211. messages_queue_t queue;
  212. /** \brief counter for request/timer ids */
  213. request_id_t last_req_id;
  214. /** \brief timer to response with timeout procuder */
  215. request_map_t request_map;
  216. /** \brief main subscription support class */
  217. subscription_t subscription_map;
  218. /** \brief non-owning pointer to parent supervisor, `NULL` for root supervisor */
  219. supervisor_t *parent;
  220. /** \brief delivery plugin pointer */
  221. plugin::delivery_plugin_base_t *delivery = nullptr;
  222. /** \brief child manager plugin pointer */
  223. plugin::child_manager_plugin_t *manager = nullptr;
  224. /** \brief root supervisor for the locality */
  225. supervisor_t *locality_leader;
  226. private:
  227. bool create_registry;
  228. bool synchronize_start;
  229. address_ptr_t registry_address;
  230. supervisor_policy_t policy;
  231. /** \brief per-actor and per-message request tracking support */
  232. address_mapping_t address_mapping;
  233. template <typename T> friend struct request_builder_t;
  234. template <typename Supervisor> friend struct actor_config_builder_t;
  235. friend struct plugin::delivery_plugin_base_t;
  236. friend struct actor_base_t;
  237. template <typename T> friend struct plugin::delivery_plugin_t;
  238. void discard_request(request_id_t request_id) noexcept;
  239. inline request_id_t next_request_id() noexcept {
  240. request_map_t::iterator it;
  241. do {
  242. it = locality_leader->request_map.find(++locality_leader->last_req_id);
  243. } while (it != locality_leader->request_map.end());
  244. return locality_leader->last_req_id;
  245. }
  246. };
  247. using supervisor_ptr_t = intrusive_ptr_t<supervisor_t>;
  248. /* third-party classes implementations */
  249. template <typename Supervisor> auto system_context_t::create_supervisor() {
  250. using builder_t = typename Supervisor::template config_builder_t<Supervisor>;
  251. return builder_t(
  252. [this](auto &actor) {
  253. if (supervisor) {
  254. on_error(make_error_code(error_code_t::supervisor_defined));
  255. actor.reset();
  256. } else {
  257. this->supervisor = actor;
  258. actor->do_initialize(this);
  259. }
  260. },
  261. *this);
  262. }
  263. template <typename M, typename... Args> void actor_base_t::send(const address_ptr_t &addr, Args &&...args) {
  264. supervisor->put(make_message<M>(addr, std::forward<Args>(args)...));
  265. }
  266. template <typename Delegate, typename Method>
  267. void actor_base_t::start_timer(request_id_t request_id, const pt::time_duration &interval, Delegate &delegate,
  268. Method method) noexcept {
  269. using final_handler_t = timer_handler_t<Delegate, Method>;
  270. auto handler = std::make_unique<final_handler_t>(this, request_id, &delegate, std::forward<Method>(method));
  271. supervisor->do_start_timer(interval, *handler);
  272. timers_map.emplace(request_id, std::move(handler));
  273. }
  274. template <typename Delegate, typename Method>
  275. request_id_t actor_base_t::start_timer(const pt::time_duration &interval, Delegate &delegate, Method method) noexcept {
  276. auto request_id = supervisor->next_request_id();
  277. start_timer(request_id, interval, delegate, std::forward<Method>(method));
  278. return request_id;
  279. }
  280. /** \brief wraps handler (pointer to member function) and actor address into intrusive pointer */
  281. template <typename Handler> handler_ptr_t wrap_handler(actor_base_t &actor, Handler &&handler) {
  282. using final_handler_t = handler_t<Handler>;
  283. auto handler_raw = new final_handler_t(actor, std::move(handler));
  284. return handler_ptr_t{handler_raw};
  285. }
  286. template <typename Handler> subscription_info_ptr_t actor_base_t::subscribe(Handler &&h) noexcept {
  287. auto wrapped_handler = wrap_handler(*this, std::move(h));
  288. return supervisor->subscribe(wrapped_handler, address, this, owner_tag_t::ANONYMOUS);
  289. }
  290. template <typename Handler>
  291. subscription_info_ptr_t actor_base_t::subscribe(Handler &&h, const address_ptr_t &addr) noexcept {
  292. auto wrapped_handler = wrap_handler(*this, std::move(h));
  293. return supervisor->subscribe(wrapped_handler, addr, this, owner_tag_t::ANONYMOUS);
  294. }
  295. namespace plugin {
  296. template <typename Handler>
  297. subscription_info_ptr_t plugin_base_t::subscribe(Handler &&h, const address_ptr_t &addr) noexcept {
  298. using final_handler_t = handler_t<Handler>;
  299. handler_ptr_t wrapped_handler(new final_handler_t(*this, std::move(h)));
  300. auto info = actor->supervisor->subscribe(wrapped_handler, addr, actor, owner_tag_t::PLUGIN);
  301. own_subscriptions.emplace_back(info);
  302. return info;
  303. }
  304. template <typename Handler> subscription_info_ptr_t plugin_base_t::subscribe(Handler &&h) noexcept {
  305. return subscribe(std::forward<Handler>(h), actor->address);
  306. }
  307. template <> inline auto &plugin_base_t::access<plugin::starter_plugin_t>() noexcept { return own_subscriptions; }
  308. template <typename Handler> subscription_info_ptr_t starter_plugin_t::subscribe_actor(Handler &&handler) noexcept {
  309. auto &address = actor->get_address();
  310. return subscribe_actor(std::forward<Handler>(handler), address);
  311. }
  312. template <typename Handler>
  313. subscription_info_ptr_t starter_plugin_t::subscribe_actor(Handler &&handler, const address_ptr_t &addr) noexcept {
  314. auto wrapped_handler = wrap_handler(*actor, std::move(handler));
  315. auto info = actor->get_supervisor().subscribe(wrapped_handler, addr, actor, owner_tag_t::PLUGIN);
  316. assert(std::count_if(tracked.begin(), tracked.end(), [&](auto &it) { return *it == *info; }) == 0 &&
  317. "already subscribed");
  318. tracked.emplace_back(info);
  319. access<starter_plugin_t>().emplace_back(info);
  320. return info;
  321. }
  322. template <typename LocalDelivery> void delivery_plugin_t<LocalDelivery>::process() noexcept {
  323. while (queue->size()) {
  324. auto message = queue->front();
  325. auto &dest = message->address;
  326. queue->pop_front();
  327. auto &dest_sup = dest->supervisor;
  328. auto internal = &dest_sup == actor;
  329. if (internal) { /* subscriptions are handled by me */
  330. auto *local_recipients = subscription_map->get_recipients(*message);
  331. if (local_recipients) {
  332. LocalDelivery::delivery(message, *local_recipients);
  333. }
  334. } else if (dest_sup.address->same_locality(*address)) {
  335. auto *local_recipients = dest_sup.subscription_map.get_recipients(*message);
  336. if (local_recipients) {
  337. LocalDelivery::delivery(message, *local_recipients);
  338. }
  339. } else {
  340. dest_sup.enqueue(std::move(message));
  341. }
  342. }
  343. }
  344. } // namespace plugin
  345. template <typename Handler, typename Enabled> void actor_base_t::unsubscribe(Handler &&h) noexcept {
  346. supervisor->unsubscribe_actor(address, wrap_handler(*this, std::move(h)));
  347. }
  348. template <typename Handler, typename Enabled>
  349. void actor_base_t::unsubscribe(Handler &&h, address_ptr_t &addr) noexcept {
  350. supervisor->unsubscribe_actor(addr, wrap_handler(*this, std::move(h)));
  351. }
  352. template <typename T>
  353. template <typename... Args>
  354. request_builder_t<T>::request_builder_t(supervisor_t &sup_, actor_base_t &actor_, const address_ptr_t &destination_,
  355. const address_ptr_t &reply_to_, Args &&...args)
  356. : sup{sup_}, actor{actor_}, request_id{sup.next_request_id()}, destination{destination_}, reply_to{reply_to_},
  357. do_install_handler{false} {
  358. auto addr = sup.address_mapping.get_mapped_address(actor_, response_message_t::message_type);
  359. if (addr) {
  360. imaginary_address = addr;
  361. } else {
  362. // subscribe to imaginary address instead of real one because of
  363. // 1. faster dispatching
  364. // 2. need to distinguish between "timeout guarded responses" and "responses to own requests"
  365. imaginary_address = sup.make_address();
  366. do_install_handler = true;
  367. }
  368. req.reset(
  369. new request_message_t{destination, request_id, imaginary_address, reply_to_, std::forward<Args>(args)...});
  370. }
  371. template <typename T> request_id_t request_builder_t<T>::send(pt::time_duration timeout) noexcept {
  372. if (do_install_handler) {
  373. install_handler();
  374. }
  375. auto fn = &request_traits_t<T>::make_error_response;
  376. sup.request_map.emplace(request_id, request_curry_t{fn, reply_to, req});
  377. sup.put(req);
  378. sup.start_timer(request_id, timeout, sup, &supervisor_t::on_request_trigger);
  379. return request_id;
  380. }
  381. template <typename T> void request_builder_t<T>::install_handler() noexcept {
  382. auto handler = lambda<response_message_t>([supervisor = &sup](response_message_t &msg) {
  383. auto request_id = msg.payload.request_id();
  384. auto it = supervisor->request_map.find(request_id);
  385. if (it != supervisor->request_map.end()) {
  386. auto &orig_addr = it->second.origin;
  387. supervisor->template send<wrapped_res_t>(orig_addr, msg.payload);
  388. supervisor->discard_request(request_id);
  389. }
  390. // if a response to request has arrived and no timer can be found
  391. // that means that either timeout timer already triggered
  392. // and error-message already delivered or response is not expected.
  393. // just silently drop it anyway
  394. });
  395. auto wrapped_handler = wrap_handler(sup, std::move(handler));
  396. auto info = sup.subscribe(wrapped_handler, imaginary_address, &actor, owner_tag_t::SUPERVISOR);
  397. sup.address_mapping.set(actor, info);
  398. }
  399. /** \brief makes an reqest to the destination address with the message constructed from `args`
  400. *
  401. * The `reply_to` address is defaulted to actor's main address.1
  402. *
  403. */
  404. template <typename Request, typename... Args>
  405. request_builder_t<typename request_wrapper_t<Request>::request_t> actor_base_t::request(const address_ptr_t &dest_addr,
  406. Args &&...args) {
  407. using request_t = typename request_wrapper_t<Request>::request_t;
  408. return supervisor->do_request<request_t>(*this, dest_addr, address, std::forward<Args>(args)...);
  409. }
  410. /** \brief makes an reqest to the destination address with the message constructed from `args`
  411. *
  412. * The `reply_addr` is used to specify the exact destinatiion address, where reply should be
  413. * delivered.
  414. *
  415. */
  416. template <typename Request, typename... Args>
  417. request_builder_t<typename request_wrapper_t<Request>::request_t>
  418. actor_base_t::request_via(const address_ptr_t &dest_addr, const address_ptr_t &reply_addr, Args &&...args) {
  419. using request_t = typename request_wrapper_t<Request>::request_t;
  420. return supervisor->do_request<request_t>(*this, dest_addr, reply_addr, std::forward<Args>(args)...);
  421. }
  422. template <typename Request> auto actor_base_t::make_response(Request &message, const std::error_code &ec) {
  423. using payload_t = typename Request::payload_t::request_t;
  424. using traits_t = request_traits_t<payload_t>;
  425. return traits_t::make_error_response(message.payload.reply_to, message, ec);
  426. }
  427. template <typename Request, typename... Args> auto actor_base_t::make_response(Request &message, Args &&...args) {
  428. using payload_t = typename Request::payload_t::request_t;
  429. using traits_t = request_traits_t<payload_t>;
  430. using response_t = typename traits_t::response::wrapped_t;
  431. using request_ptr_t = typename traits_t::request::message_ptr_t;
  432. return make_message<response_t>(message.payload.reply_to, request_ptr_t{&message}, std::forward<Args>(args)...);
  433. }
  434. template <typename Request, typename... Args> void actor_base_t::reply_to(Request &message, Args &&...args) {
  435. supervisor->put(make_response<Request>(message, std::forward<Args>(args)...));
  436. }
  437. template <typename Request> void actor_base_t::reply_with_error(Request &message, const std::error_code &ec) {
  438. supervisor->put(make_response<Request>(message, ec));
  439. }
  440. template <typename Actor>
  441. actor_config_builder_t<Actor>::actor_config_builder_t(install_action_t &&action_, supervisor_t *supervisor_)
  442. : install_action{std::move(action_)}, supervisor{supervisor_},
  443. system_context{*supervisor_->context}, config{supervisor_} {
  444. init_ctor();
  445. }
  446. template <typename Actor> intrusive_ptr_t<Actor> actor_config_builder_t<Actor>::finish() && {
  447. intrusive_ptr_t<Actor> actor_ptr;
  448. if (!validate()) {
  449. auto ec = make_error_code(error_code_t::actor_misconfigured);
  450. system_context.on_error(ec);
  451. } else {
  452. auto &cfg = static_cast<typename builder_t::config_t &>(config);
  453. auto actor = new Actor(cfg);
  454. actor_ptr.reset(actor);
  455. install_action(actor_ptr);
  456. }
  457. return actor_ptr;
  458. }
  459. } // namespace rotor