supervisor.h 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. #pragma once
  2. //
  3. // Copyright (c) 2019 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  4. //
  5. // Distributed under the MIT Software License
  6. //
  7. #include "actor_base.h"
  8. #include "handler.hpp"
  9. #include "message.h"
  10. #include "messages.hpp"
  11. #include "subscription.h"
  12. #include "system_context.h"
  13. #include "supervisor_config.h"
  14. #include "address_mapping.h"
  15. #include <chrono>
  16. #include <deque>
  17. #include <functional>
  18. #include <unordered_map>
  19. namespace rotor {
  20. namespace pt = boost::posix_time;
  21. struct supervisor_t;
  22. /** \brief constucts actor on the supervisor */
  23. template <typename Actor, typename Supervisor, typename... Args>
  24. intrusive_ptr_t<Actor> make_actor(Supervisor &sup, Args... args);
  25. /** \struct supervisor_t
  26. * \brief supervisor is responsible for managing actors (workers) lifetime
  27. *
  28. * Supervisor starts, stops actors (children/workers) and process messages.
  29. * The message processing is basically sorting messages by their destination
  30. * {@link address_t}: if an address belongs to the supervisor, then message
  31. * is dispatched locally, otherwise the message is forwarded to supervisor,
  32. * which owns address.
  33. *
  34. * During message dispatching phase, supervisor examines handlers
  35. * ({@link handler_base_t}), if they are local, then a message in immediately
  36. * delivered to it (i.e. a local actor is invoked immediately), otherwise
  37. * is is forwarded for delivery to the supervisor, which owns the handler.
  38. *
  39. * Supervisor is responsible for managing it's local actors lifetime, i.e.
  40. * sending initialization, start, shutdown requests etc.
  41. *
  42. * Supervisor is locality-aware: i.e. if two supervisors have the same
  43. * locality (i.e. executed in the same thread/event loop), it takes advantage
  44. * of this and immediately delivers message to the target supervisor
  45. * without involving any synchronization mechanisms. In other words,
  46. * a message is delivered to any actor of the locality, even if the
  47. * actor is not child of the current supervisor.
  48. *
  49. * As supervisor is special kind of actor, it should be possible to spawn
  50. * other supervisors constructing tree-like organization of responsibilities.
  51. *
  52. * Unlike Erlang's supervisor, rotor's supervisor does not spawn actors
  53. * if they terminated. It should be, hovewer, to implement it in derived
  54. * classes with application-specific logic.
  55. *
  56. * This supervisor class is abstract, and the concrete implementation is
  57. * is event-loop specific, i.e. it should know how to start/stop shutdown
  58. * timers, how to trigger messages processing in thread-safe way, how
  59. * to deliver message to a supervisor in a thread-safe way etc.
  60. *
  61. */
  62. struct supervisor_t : public actor_base_t {
  63. /** \brief timer identifier type in the scope of the supervisor */
  64. using timer_id_t = std::uint32_t;
  65. /** \brief constructs new supervisor with optional parent supervisor */
  66. supervisor_t(supervisor_t *sup, const supervisor_config_t &config);
  67. supervisor_t(const supervisor_t &) = delete;
  68. supervisor_t(supervisor_t &&) = delete;
  69. virtual void do_initialize(system_context_t *ctx) noexcept override;
  70. /** \brief process queue of messages of locality leader
  71. *
  72. * The locality leaders queue `queue` of messages is processed.
  73. *
  74. * -# It takes message from the queue
  75. * -# If the message destination address belongs to the foreing the supervisor,
  76. * then it is forwarded to it immediately.
  77. * -# Otherwise, the message is local, i.e. either for the supervisor or one
  78. * of its non-supervisor children (internal), or to other supervisor within
  79. * the same locality.
  80. * -# in the former case the message is immediately delivered locally in
  81. * the context of current supervisor; in the latter case in the context
  82. * of other supervsior. In the both cases `deliver_local` method is used.
  83. *
  84. * It is expected, that derived classes should invoke `do_process` message,
  85. * whenever it is known that there are messages for processing. The invocation
  86. * should be performed in safe thread/loop context.
  87. *
  88. * The method should be invoked in event-loop context only.
  89. *
  90. */
  91. virtual void do_process() noexcept;
  92. /** \brief delivers an message for self of one of child-actors (non-supervisors)
  93. *
  94. * Supervisor iterates on subscriptions (handlers) on the message destination adddress:
  95. *
  96. * -# If the handler is local (i.e. it's actor belongs to the same supervisor),
  97. * -# Otherwise the message is forwarded for delivery for the foreign supervisor,
  98. * which owns the handler.
  99. *
  100. */
  101. void deliver_local(message_ptr_t &&msg) noexcept;
  102. /** \brief unsubcribes all actor's handlers */
  103. virtual void unsubscribe_actor(const actor_ptr_t &actor) noexcept;
  104. /** \brief creates new {@link address_t} linked with the supervisor */
  105. virtual address_ptr_t make_address() noexcept;
  106. /** \brief removes the subscription point: local address and (foreign-or-local)
  107. * handler pair
  108. */
  109. virtual void commit_unsubscription(const address_ptr_t &addr, const handler_ptr_t &handler) noexcept;
  110. /** \brief records just created actor and starts its initialization
  111. *
  112. * The initialization rquest is sent to the just created actor. If the
  113. * actor will not confirm initialization within timeout (specified in the message payload),
  114. * the actor will be asked for shut down.
  115. */
  116. virtual void on_create(message_t<payload::create_actor_t> &msg) noexcept;
  117. /** \brief sends {@link payload::start_actor_t} to the initialized actor */
  118. virtual void on_initialize_confirm(message::init_response_t &msg) noexcept;
  119. virtual void on_shutdown_trigger(message::shutdown_trigger_t &) noexcept override;
  120. /** \brief forgets just shutted down actor
  121. *
  122. * Internal structures related to the actor are released.
  123. *
  124. */
  125. virtual void on_shutdown_confirm(message::shutdown_response_t &msg) noexcept;
  126. /** \brief subscribes external handler to local address */
  127. virtual void on_external_subs(message_t<payload::external_subscription_t> &message) noexcept;
  128. /** \brief message interface for `commit_unsubscription` */
  129. virtual void on_commit_unsubscription(message_t<payload::commit_unsubscription_t> &message) noexcept;
  130. /** \brief delivers a message to local handler, which was originally send to external address
  131. *
  132. * The handler is subscribed to the external address, that's why the message was forwarded
  133. * from external supervisor to the local supervisor to process the call (invoke the local handler).
  134. *
  135. */
  136. virtual void on_call(message_t<payload::handler_call_t> &message) noexcept;
  137. /** \brief answers about actor's state, identified by it's address
  138. *
  139. * If there is no information about the address (including the case when an actor
  140. * is not yet created or already destroyed), then it replies with `UNKNOWN` status.
  141. *
  142. * It replies to the address specified to the `reply_addr` specified in
  143. * the message {@link payload::state_request_t}.
  144. *
  145. */
  146. virtual void on_state_request(message::state_request_t &message) noexcept;
  147. /** \brief starts non-recurring timer, identified by `timer_id`
  148. *
  149. * Once timer triggers, it will invoke `on_timer_trigger(timer_id)` method;
  150. * othewise, if it is no longer needed, it should be cancelled via
  151. * `cancel_timer` method
  152. *
  153. */
  154. virtual void start_timer(const pt::time_duration &send, timer_id_t timer_id) noexcept = 0;
  155. /** \brief cancels previously started timer */
  156. virtual void cancel_timer(timer_id_t timer_id) noexcept = 0;
  157. /** \brief triggers an action associated with the timer
  158. *
  159. * Currently it just delivers response timeout, if any.
  160. *
  161. */
  162. virtual void on_timer_trigger(timer_id_t timer_id);
  163. /** \brief thread-safe version of `do_process`
  164. *
  165. * Starts supervisor to processing messages queue in safe thread/loop
  166. * context. Once it becomes empty, the method returns
  167. */
  168. virtual void start() noexcept = 0;
  169. /** \brief thread-safe version of `do_shutdown`, i.e. send shutdown request
  170. * let it be processed by the supervisor */
  171. virtual void shutdown() noexcept = 0;
  172. virtual void do_shutdown() noexcept override;
  173. virtual void shutdown_finish() noexcept override;
  174. /** \brief enqueues messages thread safe way and triggers processing
  175. *
  176. * This is the only method for deliver message outside of `rotor` context.
  177. * Basically it is `put` and `process` in the event loop context.
  178. *
  179. * The thread-safety should be guaranteed by derived class and/or used event-loop.
  180. *
  181. * This method is used for messaging between supervisors with different
  182. * localities, or actors which use different loops/threads.
  183. *
  184. */
  185. virtual void enqueue(message_ptr_t message) noexcept = 0;
  186. /** \brief returns pointer to parent supervisor, may be NULL */
  187. inline supervisor_t *get_parent_supervisor() noexcept { return parent; }
  188. /** \brief puts a message into internal supevisor queue for further processing
  189. *
  190. * This is thread-unsafe method. The `enqueue` method should be used to put
  191. * a new message from external context in thread-safe way.
  192. *
  193. */
  194. inline void put(message_ptr_t message) { locality_leader->queue.emplace_back(std::move(message)); }
  195. /**
  196. * \brief subscribes an handler to an address.
  197. *
  198. * If the address is local, then subscription point is recorded and
  199. * {@link payload::subscription_confirmation_t} is send to the handler's actor.
  200. *
  201. * Otherwise, if the address is external (foreign), then subscription request
  202. * is forwarded to approriate supervisor as {@link payload::external_subscription_t}
  203. * request.
  204. *
  205. */
  206. inline void subscribe_actor(const address_ptr_t &addr, const handler_ptr_t &handler) {
  207. if (&addr->supervisor == &supervisor) {
  208. auto subs_info = subscription_map.try_emplace(addr, *this);
  209. subs_info.first->second.subscribe(handler);
  210. send<payload::subscription_confirmation_t>(handler->actor_ptr->get_address(), addr, handler);
  211. } else {
  212. send<payload::external_subscription_t>(addr->supervisor.address, addr, handler);
  213. }
  214. }
  215. /** \brief templated version of `subscribe_actor` */
  216. template <typename Handler> void subscribe_actor(actor_base_t &actor, Handler &&handler) {
  217. supervisor.subscribe_actor(actor.get_address(), wrap_handler(actor, std::move(handler)));
  218. }
  219. /** \brief convenient templated version of `unsubscribe_actor */
  220. template <typename Handler> inline void unsubscribe_actor(const address_ptr_t &addr, Handler &&handler) noexcept {
  221. handler_ptr_t wrapped_handler(std::forward<Handler>(handler));
  222. unsubscribe(wrapped_handler, addr);
  223. }
  224. /** \brief creates actor, records it in internal structures and returns
  225. * intrusive pointer to it
  226. */
  227. template <typename Actor, typename... Args>
  228. intrusive_ptr_t<Actor> create_actor(const pt::time_duration &timeout, Args... args) {
  229. auto &&actor = make_actor<Actor>(*this, timeout, std::forward<Args>(args)...);
  230. static_cast<supervisor_behavior_t *>(behavior)->on_create_child(actor->get_address());
  231. return actor;
  232. }
  233. /** \brief returns system context */
  234. inline system_context_t *get_context() noexcept { return context; }
  235. /** \brief convenient method for request building
  236. *
  237. * The built request isn't sent immediately, but only after invoking `send(timeout)`
  238. *
  239. */
  240. template <typename T, typename... Args>
  241. request_builder_t<T> do_request(actor_base_t &actor, const address_ptr_t &dest_addr, const address_ptr_t &reply_to,
  242. Args &&... args) noexcept {
  243. return request_builder_t<T>(*this, actor, dest_addr, reply_to, std::forward<Args>(args)...);
  244. }
  245. /** \brief child actror housekeeping strcuture */
  246. struct actor_state_t {
  247. /** \brief intrusive pointer to actor */
  248. actor_ptr_t actor;
  249. /** \brief whethe the shutdown request is already sent */
  250. bool shutdown_requesting;
  251. };
  252. protected:
  253. virtual actor_behavior_t *create_behavior() noexcept override;
  254. /** \brief creates new address with respect to supervisor locality mark */
  255. virtual address_ptr_t instantiate_address(const void *locality) noexcept;
  256. /** \brief structure to hold messages (intrusive pointers) */
  257. using queue_t = std::deque<message_ptr_t>;
  258. /** \brief address-to-subscription map type */
  259. using subscription_map_t = std::unordered_map<address_ptr_t, subscription_t>;
  260. /** \brief (local) address-to-child_actor map type */
  261. using actors_map_t = std::unordered_map<address_ptr_t, actor_state_t>;
  262. /** \brief timer to response with timeout procuder type */
  263. using request_map_t = std::unordered_map<timer_id_t, request_curry_t>;
  264. /** \brief removes actor from supervisor. It is assumed, that actor it shutted down. */
  265. virtual void remove_actor(actor_base_t &actor) noexcept;
  266. /** \brief non-owning pointer to parent supervisor, `NULL` for root supervisor */
  267. supervisor_t *parent;
  268. /** \brief non-owning pointer to system context. */
  269. system_context_t *context;
  270. /** \brief root supervisor for the locality */
  271. supervisor_t *locality_leader;
  272. /** \brief queue of unprocessed messages */
  273. queue_t queue;
  274. /** \brief local and external subscriptions for the addresses generated by the supervisor
  275. *
  276. * key: address, value: {@link subscription_t}
  277. *
  278. */
  279. subscription_map_t subscription_map;
  280. /** \brief local address to local actor (intrusive pointer) mapping */
  281. actors_map_t actors_map;
  282. /** \brief counter for request/timer ids */
  283. timer_id_t last_req_id;
  284. /** \brief timer to response with timeout procuder */
  285. request_map_t request_map;
  286. /** \brief shutdown timeout value (copied from config) */
  287. pt::time_duration shutdown_timeout;
  288. /** \brief reaction on child-actors termination */
  289. supervisor_policy_t policy;
  290. /** \brief per-actor and per-message request tracking support */
  291. address_mapping_t address_mapping;
  292. template <typename T> friend struct request_builder_t;
  293. friend struct supervisor_behavior_t;
  294. };
  295. using supervisor_ptr_t = intrusive_ptr_t<supervisor_t>;
  296. /* third-party classes implementations */
  297. template <typename Supervisor, typename... Args>
  298. auto system_context_t::create_supervisor(Args &&... args) -> intrusive_ptr_t<Supervisor> {
  299. using wrapper_t = intrusive_ptr_t<Supervisor>;
  300. auto raw_object = new Supervisor{std::forward<Args>(args)...};
  301. raw_object->do_initialize(this);
  302. supervisor = supervisor_ptr_t{raw_object};
  303. return wrapper_t{raw_object};
  304. }
  305. template <typename M, typename... Args> void actor_base_t::send(const address_ptr_t &addr, Args &&... args) {
  306. supervisor.put(make_message<M>(addr, std::forward<Args>(args)...));
  307. }
  308. /** \brief wraps handler (pointer to member function) and actor address into intrusive pointer */
  309. template <typename Handler> handler_ptr_t wrap_handler(actor_base_t &actor, Handler &&handler) {
  310. using final_handler_t = handler_t<Handler>;
  311. auto handler_raw = new final_handler_t(actor, std::move(handler));
  312. return handler_ptr_t{handler_raw};
  313. }
  314. template <typename Handler> handler_ptr_t actor_base_t::subscribe(Handler &&h) noexcept {
  315. auto wrapped_handler = wrap_handler(*this, std::move(h));
  316. supervisor.subscribe_actor(address, wrapped_handler);
  317. return wrapped_handler;
  318. }
  319. template <typename Handler> handler_ptr_t actor_base_t::subscribe(Handler &&h, address_ptr_t &addr) noexcept {
  320. auto wrapped_handler = wrap_handler(*this, std::move(h));
  321. supervisor.subscribe_actor(addr, wrapped_handler);
  322. return wrapped_handler;
  323. }
  324. template <typename Handler, typename Enabled> void actor_base_t::unsubscribe(Handler &&h) noexcept {
  325. supervisor.unsubscribe_actor(address, wrap_handler(*this, std::move(h)));
  326. }
  327. template <typename Handler, typename Enabled>
  328. void actor_base_t::unsubscribe(Handler &&h, address_ptr_t &addr) noexcept {
  329. supervisor.unsubscribe_actor(addr, wrap_handler(*this, std::move(h)));
  330. }
  331. namespace details {
  332. template <typename Actor, typename Supervisor, typename IsSupervisor = void> struct actor_ctor_t;
  333. /** \brief constructs new actor (derived from supervisor), SFINAE-class */
  334. template <typename Actor, typename Supervisor>
  335. struct actor_ctor_t<Actor, Supervisor, std::enable_if_t<std::is_base_of_v<supervisor_t, Actor>>> {
  336. /** \brief constructs new actor (derived from supervisor) */
  337. template <typename... Args>
  338. static auto construct(Supervisor *sup, Args... args) noexcept -> intrusive_ptr_t<Actor> {
  339. return new Actor{sup, std::forward<Args>(args)...};
  340. }
  341. };
  342. /** \brief constructs new actor (not derived from supervisor), SFINAE-class */
  343. template <typename Actor, typename Supervisor>
  344. struct actor_ctor_t<Actor, Supervisor, std::enable_if_t<!std::is_base_of_v<supervisor_t, Actor>>> {
  345. /** \brief constructs new actor (not derived from supervisor) */
  346. template <typename... Args>
  347. static auto construct(Supervisor *sup, Args... args) noexcept -> intrusive_ptr_t<Actor> {
  348. return new Actor{*sup, std::forward<Args>(args)...};
  349. }
  350. };
  351. } // namespace details
  352. /** \brief convenience method for creating an actor in the scope of supervisor
  353. *
  354. * Actor performs early initialization, and further init will be request-based
  355. * and initiated * by the supervisor.
  356. *
  357. */
  358. template <typename Actor, typename Supervisor, typename... Args>
  359. intrusive_ptr_t<Actor> make_actor(Supervisor &sup, const pt::time_duration &timeout, Args... args) {
  360. using ctor_t = details::actor_ctor_t<Actor, Supervisor>;
  361. auto context = sup.get_context();
  362. auto actor = ctor_t::construct(&sup, std::forward<Args>(args)...);
  363. actor->do_initialize(context);
  364. sup.template send<payload::create_actor_t>(sup.get_address(), actor, timeout);
  365. return actor;
  366. }
  367. template <typename T>
  368. template <typename... Args>
  369. request_builder_t<T>::request_builder_t(supervisor_t &sup_, actor_base_t &actor_, const address_ptr_t &destination_,
  370. const address_ptr_t &reply_to_, Args &&... args)
  371. : sup{sup_}, actor{actor_}, request_id{++sup.last_req_id}, destination{destination_}, reply_to{reply_to_},
  372. do_install_handler{false} {
  373. auto addr = sup.address_mapping.get_addr(actor_, response_message_t::message_type);
  374. if (addr) {
  375. imaginary_address = addr;
  376. } else {
  377. // subscribe to imaginary address instead of real one because of
  378. // 1. faster dispatching
  379. // 2. need to distinguish between "timeout guarded responses" and "responses to own requests"
  380. imaginary_address = sup.make_address();
  381. do_install_handler = true;
  382. }
  383. req.reset(new request_message_t{destination, request_id, imaginary_address, std::forward<Args>(args)...});
  384. }
  385. template <typename T> std::uint32_t request_builder_t<T>::send(pt::time_duration timeout) noexcept {
  386. if (do_install_handler) {
  387. install_handler();
  388. }
  389. auto fn = &request_traits_t<T>::make_error_response;
  390. sup.request_map.emplace(request_id, request_curry_t{fn, reply_to, req});
  391. sup.put(req);
  392. sup.start_timer(timeout, request_id);
  393. return request_id;
  394. }
  395. template <typename T> void request_builder_t<T>::install_handler() noexcept {
  396. auto handler = lambda<response_message_t>([supervisor = &sup](response_message_t &msg) {
  397. auto request_id = msg.payload.request_id();
  398. auto it = supervisor->request_map.find(request_id);
  399. if (it != supervisor->request_map.end()) {
  400. supervisor->cancel_timer(request_id);
  401. auto &orig_addr = it->second.reply_to;
  402. supervisor->template send<wrapped_res_t>(orig_addr, msg.payload);
  403. supervisor->request_map.erase(it);
  404. }
  405. // if a response to request has arrived and no timer can be found
  406. // that means that either timeout timer already triggered
  407. // and error-message already delivered or response is not expected.
  408. // just silently drop it anyway
  409. });
  410. auto handler_ptr = sup.subscribe(handler, imaginary_address);
  411. sup.address_mapping.set(actor, response_message_t::message_type, handler_ptr, imaginary_address);
  412. }
  413. /** \brief makes an reqest to the destination address with the message constructed from `args`
  414. *
  415. * The `reply_to` address is defaulted to actor's main address.1
  416. *
  417. */
  418. template <typename Request, typename... Args>
  419. request_builder_t<typename request_wrapper_t<Request>::request_t> actor_base_t::request(const address_ptr_t &dest_addr,
  420. Args &&... args) {
  421. using request_t = typename request_wrapper_t<Request>::request_t;
  422. return supervisor.do_request<request_t>(*this, dest_addr, address, std::forward<Args>(args)...);
  423. }
  424. /** \brief makes an reqest to the destination address with the message constructed from `args`
  425. *
  426. * The `reply_addr` is used to specify the exact destinatiion address, where reply should be
  427. * delivered.
  428. *
  429. */
  430. template <typename Request, typename... Args>
  431. request_builder_t<typename request_wrapper_t<Request>::request_t>
  432. actor_base_t::request_via(const address_ptr_t &dest_addr, const address_ptr_t &reply_addr, Args &&... args) {
  433. using request_t = typename request_wrapper_t<Request>::request_t;
  434. return supervisor.do_request<request_t>(*this, dest_addr, reply_addr, std::forward<Args>(args)...);
  435. }
  436. template <typename Request, typename... Args> void actor_base_t::reply_to(Request &message, Args &&... args) {
  437. using payload_t = typename Request::payload_t::request_t;
  438. using traits_t = request_traits_t<payload_t>;
  439. using response_t = typename traits_t::response::wrapped_t;
  440. using request_ptr_t = typename traits_t::request::message_ptr_t;
  441. send<response_t>(message.payload.reply_to, request_ptr_t{&message}, std::forward<Args>(args)...);
  442. }
  443. template <typename Request, typename... Args>
  444. void actor_base_t::reply_with_error(Request &message, const std::error_code &ec) {
  445. using payload_t = typename Request::payload_t::request_t;
  446. using traits_t = request_traits_t<payload_t>;
  447. auto response = traits_t::make_error_response(message.payload.reply_to, message, ec);
  448. supervisor.put(std::move(response));
  449. }
  450. } // namespace rotor