actor_base.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. #pragma once
  2. //
  3. // Copyright (c) 2019 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  4. //
  5. // Distributed under the MIT Software License
  6. //
  7. #include "address.hpp"
  8. #include "messages.hpp"
  9. #include "behavior.h"
  10. #include "state.h"
  11. #include "handler.hpp"
  12. #include <unordered_map>
  13. #include <list>
  14. namespace rotor {
  15. struct supervisor_t;
  16. struct system_context_t;
  17. struct handler_base_t;
  18. /** \brief intrusive pointer for handler */
  19. using handler_ptr_t = intrusive_ptr_t<handler_base_t>;
  20. /** \brief SFINAE handler detector
  21. *
  22. * Either handler can be constructed from memeber-to-function-pointer or
  23. * it is already constructed and have a base `handler_base_t`
  24. */
  25. template <typename Handler>
  26. using is_handler =
  27. std::enable_if_t<std::is_member_function_pointer_v<Handler> || std::is_base_of_v<handler_base_t, Handler>>;
  28. /** \struct actor_base_t
  29. * \brief universal primitive of concurrent computation
  30. *
  31. * The class is base class for user-defined actors. It is expected that
  32. * actors will react on incoming messages (e.g. by changing internal
  33. * /private state) or send (other) messages to other actors, or do
  34. * some side-effects (I/O, etc.).
  35. *
  36. * Message passing interface is asynchronous, they are send to {@link supervisor_t}.
  37. *
  38. * Every actor belong to some {@link supervisor_t}, which "injects" the thread-safe
  39. * execution context, in a sense, that the actor can call it's own methods as well
  40. * as supervirors without any need of synchonization.
  41. *
  42. * All actor methods are thread-unsafe, i.e. should not be called with except of
  43. * it's own supervisor. Communication with actor should be performed via messages.
  44. *
  45. * Actor is addressed by it's "main" address; however it is possible for an actor
  46. * to have multiple identities aka "virtual" addresses.
  47. *
  48. */
  49. struct actor_base_t : public arc_base_t<actor_base_t> {
  50. /** \struct subscription_point_t
  51. * \brief pair of {@link handler_base_t} linked to particular {@link address_t}
  52. */
  53. struct subscription_point_t {
  54. /** \brief intrusive pointer to messages' handler */
  55. handler_ptr_t handler;
  56. /** \brief intrusive pointer to address */
  57. address_ptr_t address;
  58. };
  59. /** \brief alias to the list of {@link subscription_point_t} */
  60. using subscription_points_t = std::list<subscription_point_t>;
  61. /** \brief constructs actor and links it's supervisor
  62. *
  63. * An actor cannot outlive it's supervisor.
  64. *
  65. * Sets internal actor state to `NEW`
  66. *
  67. */
  68. actor_base_t(supervisor_t &supervisor_);
  69. virtual ~actor_base_t();
  70. /** \brief early actor initialization (pre-initialization)
  71. *
  72. * Actor's "main" address is created, actor's behavior is created.
  73. *
  74. * Actor performs subsscription on all major methods, defined by
  75. * `rotor` framework; sets internal actor state to `INITIALIZING`.
  76. *
  77. */
  78. virtual void do_initialize(system_context_t *ctx) noexcept;
  79. /** \brief convenient method to send actor's supervisor shutdown trigger message */
  80. virtual void do_shutdown() noexcept;
  81. /** \brief creates actor's address by delegating the call to supervisor */
  82. virtual address_ptr_t create_address() noexcept;
  83. /** \brief returns actor's "main" address (intrusive pointer) */
  84. inline address_ptr_t get_address() const noexcept { return address; }
  85. /** \brief returns actor's supervisor */
  86. inline supervisor_t &get_supervisor() const noexcept { return supervisor; }
  87. /** \brief returns actor's state */
  88. inline state_t &get_state() noexcept { return state; }
  89. /** \brief returns actor's subscription points */
  90. inline subscription_points_t &get_subscription_points() noexcept { return points; }
  91. /** \brief records init request and may be triggers actor initialization
  92. *
  93. * After recoding the init request message, it invokes `init_start` method,
  94. * which triggers initialization sequece (configured by
  95. * {@link actor_behavior_t} ).
  96. *
  97. */
  98. virtual void on_initialize(message::init_request_t &) noexcept;
  99. /** \brief start confirmation from supervisor
  100. *
  101. * Sets internal actor state to `OPERATIONAL`.
  102. *
  103. */
  104. virtual void on_start(message_t<payload::start_actor_t> &) noexcept;
  105. /** \brief records shutdown request and may be triggers actor shutdown
  106. *
  107. * After recording the shutdown request it invokes the `shutdown_start`
  108. * method, which triggers shutdown actions sequence (configured by
  109. * {@link actor_behavior_t} ).
  110. */
  111. virtual void on_shutdown(message::shutdown_request_t &) noexcept;
  112. /** \brief initiates actor's shutdown
  113. *
  114. * If there is a supervisor, the message is forwarded to it to
  115. * send shutdown request.
  116. *
  117. */
  118. virtual void on_shutdown_trigger(message::shutdown_trigger_t &) noexcept;
  119. /** \brief records subsciption point */
  120. virtual void on_subscription(message_t<payload::subscription_confirmation_t> &) noexcept;
  121. /** \brief forgets the subscription point
  122. *
  123. * If there is no more subscription points, the on_unsubscription event is
  124. * triggerred on {@link actor_behavior_t}.
  125. *
  126. */
  127. virtual void on_unsubscription(message_t<payload::unsubscription_confirmation_t> &) noexcept;
  128. /** \brief forgets the subscription point for external address
  129. *
  130. * The {@link payload::commit_unsubscription_t} is sent to the external {@link supervisor_t}
  131. * after removing the subscription .
  132. *
  133. * If there is no more subscription points, the on_unsubscription event is
  134. * triggerred on {@link actor_behavior_t}.
  135. *
  136. */
  137. virtual void on_external_unsubscription(message_t<payload::external_unsubscription_t> &) noexcept;
  138. /** \brief sends message to the destination address
  139. *
  140. * Internally it just constructs new message in supervisor's outbound queue.
  141. *
  142. */
  143. template <typename M, typename... Args> void send(const address_ptr_t &addr, Args &&... args);
  144. /** \brief returns request builder for destination address using the "main" actor address
  145. *
  146. * The `args` are forwarded for construction of the request. The request is not actually sent,
  147. * until `send` method of {@link request_builder_t} will be invoked.
  148. *
  149. * Supervisor will spawn timeout timer upon `timeout` method.
  150. */
  151. template <typename R, typename... Args>
  152. request_builder_t<typename request_wrapper_t<R>::request_t> request(const address_ptr_t &dest_addr,
  153. Args &&... args);
  154. /** \brief returns request builder for destination address using the specified address for reply
  155. *
  156. * It is assumed, that the specified address belongs to the actor.
  157. *
  158. * The method is useful, when a different behavior is needed for the same
  159. * message response types. It serves at some extend as virtual dispatching within
  160. * the actor.
  161. *
  162. * See the description of `request` method.
  163. *
  164. */
  165. template <typename R, typename... Args>
  166. request_builder_t<typename request_wrapper_t<R>::request_t>
  167. request_via(const address_ptr_t &dest_addr, const address_ptr_t &reply_addr, Args &&... args);
  168. /** \brief convenient method for constructing and sending response to a request
  169. *
  170. * `args` are forwarded to response payload constuction
  171. */
  172. template <typename Request, typename... Args> void reply_to(Request &message, Args &&... args);
  173. /** \brief convenient method for constructing and sending error response to a request */
  174. template <typename Request, typename... Args> void reply_with_error(Request &message, const std::error_code &ec);
  175. /** \brief subscribes actor's handler to process messages on the specified address */
  176. template <typename Handler> handler_ptr_t subscribe(Handler &&h, address_ptr_t &addr) noexcept;
  177. /** \brief subscribes actor's handler to process messages on the actor's "main" address */
  178. template <typename Handler> handler_ptr_t subscribe(Handler &&h) noexcept;
  179. /** \brief unsubscribes actor's handler from process messages on the specified address */
  180. template <typename Handler, typename = is_handler<Handler>>
  181. void unsubscribe(Handler &&h, address_ptr_t &addr) noexcept;
  182. /** \brief unsubscribes actor's handler from processing messages on the actor's "main" address */
  183. template <typename Handler, typename = is_handler<Handler>> void unsubscribe(Handler &&h) noexcept;
  184. /** \brief initiates handler unsubscription from the address
  185. *
  186. * If the address is local, then unsubscription confirmation is sent immediately,
  187. * otherwise {@link payload::external_subscription_t} request is sent to the external
  188. * supervisor, which owns the address.
  189. *
  190. * The optional call can be providded to be called upon message destruction.
  191. *
  192. */
  193. void unsubscribe(const handler_ptr_t &h, const address_ptr_t &addr, const payload::callback_ptr_t & = {}) noexcept;
  194. /** \brief initiates handler unsubscription from the default actor address */
  195. inline void unsubscribe(const handler_ptr_t &h) noexcept { unsubscribe(h, address); }
  196. protected:
  197. /** \brief constructs actor's behavior on early stage */
  198. virtual actor_behavior_t *create_behavior() noexcept;
  199. /** \brief removes the subscription point */
  200. virtual void remove_subscription(const address_ptr_t &addr, const handler_ptr_t &handler) noexcept;
  201. /** \brief starts initialization
  202. *
  203. * Some resources might be acquired synchronously, if needed. If resources need
  204. * to be acquired asynchronously this method should be overriden, and
  205. * invoked only after resources acquisition.
  206. *
  207. * In internals it forwards initialization sequence to the behavior.
  208. *
  209. */
  210. virtual void init_start() noexcept;
  211. /** \brief finializes initialization */
  212. virtual void init_finish() noexcept;
  213. /** \brief strart releasing acquired resources
  214. *
  215. * The method can be overriden in derived classes to initiate the
  216. * release of resources, i.e. (asynchronously) close all opened
  217. * sockets before confirm shutdown to supervisor.
  218. *
  219. * It actually forwards shutdown for the behavior
  220. *
  221. */
  222. virtual void shutdown_start() noexcept;
  223. /** \brief finalize shutdown, release aquired resources
  224. *
  225. * This is the last action in the shutdown sequence.
  226. * No further methods will be invoked.
  227. *
  228. */
  229. virtual void shutdown_finish() noexcept;
  230. /** \brief actor's execution / infrastructure context */
  231. supervisor_t &supervisor;
  232. /** \brief current actor state */
  233. state_t state;
  234. /** \brief actor's behavior, used for runtime customization of actors's
  235. * behavioral aspects
  236. */
  237. actor_behavior_t *behavior;
  238. /** \brief actor address */
  239. address_ptr_t address;
  240. /** \brief recorded subscription points (i.e. handler/address pairs) */
  241. subscription_points_t points;
  242. /** \brief suspended init request message */
  243. intrusive_ptr_t<message::init_request_t> init_request;
  244. /** \brief suspended shutdown request message */
  245. intrusive_ptr_t<message::shutdown_request_t> shutdown_request;
  246. friend struct actor_behavior_t;
  247. friend struct supervisor_t;
  248. template <typename T> friend struct request_builder_t;
  249. };
  250. /** \brief intrusive pointer for actor*/
  251. using actor_ptr_t = intrusive_ptr_t<actor_base_t>;
  252. } // namespace rotor