actor_base.h 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. #pragma once
  2. //
  3. // Copyright (c) 2019-2022 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  4. //
  5. // Distributed under the MIT Software License
  6. //
  7. #include "forward.hpp"
  8. #include "address.hpp"
  9. #include "actor_config.h"
  10. #include "messages.hpp"
  11. #include "state.h"
  12. #include "handler.h"
  13. #include "extended_error.h"
  14. #include "timer_handler.hpp"
  15. #include <set>
  16. namespace rotor {
  17. /** \struct actor_base_t
  18. * \brief universal primitive of concurrent computation
  19. *
  20. * The class is base class for user-defined actors. It is expected that
  21. * actors will react on incoming messages (e.g. by changing internal
  22. * /private state) or send (other) messages to other actors, or do
  23. * some side-effects (I/O, etc.).
  24. *
  25. * Message passing interface is asynchronous, they are send to {@link supervisor_t}.
  26. *
  27. * Every actor belong to some {@link supervisor_t}, which "injects" the thread-safe
  28. * execution context, in a sense, that the actor can call it's own methods as well
  29. * as supervisors without any need of synchonization.
  30. *
  31. * All actor methods are thread-unsafe, i.e. should not be called with except of
  32. * it's own supervisor. Communication with actor should be performed via messages.
  33. *
  34. * Actor is addressed by it's "main" address; however it is possible for an actor
  35. * to have multiple identities aka "virtual" addresses.
  36. *
  37. */
  38. struct actor_base_t : public arc_base_t<actor_base_t> {
  39. /** \brief injects an alias for actor_config_t */
  40. using config_t = actor_config_t;
  41. /** \brief injects templated actor_config_builder_t */
  42. template <typename Actor> using config_builder_t = actor_config_builder_t<Actor>;
  43. /** \brief SFINAE handler detector
  44. *
  45. * Either handler can be constructed from memeber-to-function-pointer or
  46. * it is already constructed and have a base `handler_base_t`
  47. */
  48. template <typename Handler>
  49. using is_handler =
  50. std::enable_if_t<std::is_member_function_pointer_v<Handler> || std::is_base_of_v<handler_base_t, Handler>>;
  51. // clang-format off
  52. /** \brief the default list of plugins for an actor
  53. *
  54. * The order of plugins is very important, as they are initialized in the direct order
  55. * and deinitilized in the reverse order.
  56. *
  57. */
  58. using plugins_list_t = std::tuple<
  59. plugin::address_maker_plugin_t,
  60. plugin::lifetime_plugin_t,
  61. plugin::init_shutdown_plugin_t,
  62. plugin::link_server_plugin_t,
  63. plugin::link_client_plugin_t,
  64. plugin::registry_plugin_t,
  65. plugin::resources_plugin_t,
  66. plugin::starter_plugin_t>;
  67. // clang-format on
  68. /** \brief constructs actor and links it's supervisor
  69. *
  70. * An actor cannot outlive it's supervisor.
  71. *
  72. * Sets internal actor state to `NEW`
  73. *
  74. */
  75. actor_base_t(config_t &cfg);
  76. virtual ~actor_base_t();
  77. /** \brief early actor initialization (pre-initialization)
  78. *
  79. * Actor's plugins are activated, "main" address is created
  80. * (via {@link plugin::address_maker_plugin_t}), state is set
  81. * to `INITIALIZING` (via {@link plugin::init_shutdown_plugin_t}).
  82. *
  83. */
  84. virtual void do_initialize(system_context_t *ctx) noexcept;
  85. /** \brief convenient method to send actor's supervisor shutdown trigger message
  86. *
  87. * If actor is already shutting down, the method will do nothing, otherwise
  88. * it will send shutdown trigger to its supervisor.
  89. *
  90. * The shutdown reason is forwarded "as is". If it is missing, than it will
  91. * be constructed with the error code "normal shutdown".
  92. */
  93. virtual void do_shutdown(const extended_error_ptr_t &reason = {}) noexcept;
  94. /** \brief actor is fully initialized and it's supervisor has sent signal to start
  95. *
  96. * The actor state is set to `OPERATIONAL`.
  97. *
  98. */
  99. virtual void on_start() noexcept;
  100. /** \brief sends message to the destination address
  101. *
  102. * Internally it just constructs new message in supervisor's outbound queue.
  103. *
  104. */
  105. template <typename M, typename... Args> void send(const address_ptr_t &addr, Args &&...args);
  106. /** \brief returns request builder for destination address using the "main" actor address
  107. *
  108. * The `args` are forwarded for construction of the request. The request is not actually sent,
  109. * until `send` method of {@link request_builder_t} will be invoked.
  110. *
  111. * Supervisor will spawn timeout timer upon `timeout` method.
  112. */
  113. template <typename R, typename... Args>
  114. request_builder_t<typename request_wrapper_t<R>::request_t> request(const address_ptr_t &dest_addr, Args &&...args);
  115. /** \brief returns request builder for destination address using the specified address for reply
  116. *
  117. * It is assumed, that the specified address belongs to the actor.
  118. *
  119. * The method is useful, when a different behavior is needed for the same
  120. * message response types. It serves at some extend as virtual dispatching within
  121. * the actor.
  122. *
  123. * See the description of `request` method.
  124. *
  125. */
  126. template <typename R, typename... Args>
  127. request_builder_t<typename request_wrapper_t<R>::request_t>
  128. request_via(const address_ptr_t &dest_addr, const address_ptr_t &reply_addr, Args &&...args);
  129. /** \brief convenient method for constructing and sending response to a request
  130. *
  131. * `args` are forwarded to response payload constuction
  132. */
  133. template <typename Request, typename... Args> void reply_to(Request &message, Args &&...args);
  134. /** \brief convenient method for constructing and sending error response to a request */
  135. template <typename Request> void reply_with_error(Request &message, const extended_error_ptr_t &ec);
  136. /** \brief makes response to the request, but does not send it.
  137. *
  138. * The return type is intrusive pointer to the message, not the message itself.
  139. *
  140. * It can be useful for delayed responses. The response can be dispatched later via
  141. * supervisor->put(std::move(response_ptr));
  142. *
  143. */
  144. template <typename Request, typename... Args> auto make_response(Request &message, Args &&...args);
  145. /** \brief makes error response to the request, but does not send it.
  146. *
  147. * The return type is intrusive pointer to the message, not the message itself.
  148. *
  149. * It can be useful for delayed responses. The response can be dispatched later via
  150. * supervisor->put(std::move(response_ptr));
  151. *
  152. */
  153. template <typename Request> auto make_response(Request &message, const extended_error_ptr_t &ec);
  154. /** \brief subscribes actor's handler to process messages on the specified address */
  155. template <typename Handler> subscription_info_ptr_t subscribe(Handler &&h, const address_ptr_t &addr) noexcept;
  156. /** \brief subscribes actor's handler to process messages on the actor's "main" address */
  157. template <typename Handler> subscription_info_ptr_t subscribe(Handler &&h) noexcept;
  158. /** \brief unsubscribes actor's handler from process messages on the specified address */
  159. template <typename Handler, typename = is_handler<Handler>>
  160. void unsubscribe(Handler &&h, address_ptr_t &addr) noexcept;
  161. /** \brief unsubscribes actor's handler from processing messages on the actor's "main" address */
  162. template <typename Handler, typename = is_handler<Handler>> void unsubscribe(Handler &&h) noexcept;
  163. /* \brief initiates handler unsubscription from the address
  164. *
  165. * If the address is local, then unsubscription confirmation is sent immediately,
  166. * otherwise {@link payload::external_subscription_t} request is sent to the external
  167. * supervisor, which owns the address.
  168. *
  169. * The optional call can be providded to be called upon message destruction.
  170. *
  171. */
  172. /** \brief initiates handler unsubscription from the default actor address */
  173. inline void unsubscribe(const handler_ptr_t &h) noexcept { lifetime->unsubscribe(h, address); }
  174. /** \brief starts plugins activation */
  175. void activate_plugins() noexcept;
  176. /** \brief finishes plugin activation, successful or not */
  177. void commit_plugin_activation(plugin::plugin_base_t &plugin, bool success) noexcept;
  178. /** \brief starts plugins deactivation */
  179. void deactivate_plugins() noexcept;
  180. /** \brief finishes plugin deactivation */
  181. void commit_plugin_deactivation(plugin::plugin_base_t &plugin) noexcept;
  182. /** \brief propaagtes subscription message to corresponding actors */
  183. void on_subscription(message::subscription_t &message) noexcept;
  184. /** \brief propaagtes unsubscription message to corresponding actors */
  185. void on_unsubscription(message::unsubscription_t &message) noexcept;
  186. /** \brief propaagtes external unsubscription message to corresponding actors */
  187. void on_unsubscription_external(message::unsubscription_external_t &message) noexcept;
  188. /** \brief creates new unique address for an actor (via address_maker plugin) */
  189. address_ptr_t create_address() noexcept;
  190. /** \brief starts shutdown procedure, e.g. upon receiving shutdown request
  191. *
  192. * The actor state is set to SHUTTING_DOWN.
  193. *
  194. */
  195. virtual void shutdown_start() noexcept;
  196. /** \brief polls plugins for shutdown
  197. *
  198. * The poll is performed in the reverse order. If all plugins, with active
  199. * shutdown reaction confirm they are ready to shutdown, then the
  200. * `shutdown_finish` method is invoked.
  201. *
  202. */
  203. void shutdown_continue() noexcept;
  204. /** \brief finalizes shutdown
  205. *
  206. * The shutdown response is sent and actor state is set to SHUT_DOWN.
  207. *
  208. * This is the last action in the shutdown sequence.
  209. * No further methods will be invoked on the actor
  210. *
  211. */
  212. virtual void shutdown_finish() noexcept;
  213. /** \brief starts initialization procedure
  214. *
  215. * The actor state is set to INITIALIZING.
  216. *
  217. */
  218. virtual void init_start() noexcept;
  219. /** \brief polls plugins whether they completed initialization.
  220. *
  221. * The poll is performed in the direct order. If all plugins, with active
  222. * init reaction confirm they are ready, then the `init_finish` method
  223. * is invoked.
  224. *
  225. */
  226. void init_continue() noexcept;
  227. /** \brief finalizes initialization
  228. *
  229. * The init response is sent and actor state is set to INITIALIZED.
  230. *
  231. */
  232. virtual void init_finish() noexcept;
  233. /** \brief main callback for plugin configuration when it's ready */
  234. virtual void configure(plugin::plugin_base_t &plugin) noexcept;
  235. /** \brief generic non-public fields accessor */
  236. template <typename T> auto &access() noexcept;
  237. /** \brief generic non-public methods accessor */
  238. template <typename T, typename... Args> auto access(Args... args) noexcept;
  239. /** \brief generic non-public fields accessor */
  240. template <typename T> auto &access() const noexcept;
  241. /** \brief generic non-public methods accessor */
  242. template <typename T, typename... Args> auto access(Args... args) const noexcept;
  243. /** \brief returns actor's main address */
  244. inline const address_ptr_t &get_address() const noexcept { return address; }
  245. /** \brief returns actor's supervisor */
  246. inline supervisor_t &get_supervisor() const noexcept { return *supervisor; }
  247. /** \brief spawns a new one-shot timer
  248. *
  249. * \param interval specifies amount of time, after which the timer will trigger.
  250. * \param delegate is an object of arbitrary class.
  251. * \param method is the pointer-to-member-function of the object, which will be
  252. * invoked upon timer triggering or cancellation.
  253. *
  254. * The `method` parameter should have the following signature:
  255. *
  256. * void Delegate::on_timer(request_id_t, bool cancelled) noexcept;
  257. *
  258. * `start_timer` returns timer identity. It will be supplied to the specified callback,
  259. * or the timer can be cancelled via it.
  260. */
  261. template <typename Delegate, typename Method>
  262. request_id_t start_timer(const pt::time_duration &interval, Delegate &delegate, Method method) noexcept;
  263. /** \brief cancels previously started timer
  264. *
  265. * If timer hasn't been triggered, then it is cancelled and the callback will be invoked
  266. * with `true` to mark that it was cancelled.
  267. *
  268. * Upon cancellation the timer callback will be invoked immediately, in the context of caller.
  269. */
  270. void cancel_timer(request_id_t request_id) noexcept;
  271. /** \brief returns actor shutdwon reason
  272. *
  273. * The shutdown reason should be available if actors' state is already `SHUTTING_DOWN`
  274. *
  275. */
  276. inline const extended_error_ptr_t &get_shutdown_reason() const noexcept { return shutdown_reason; }
  277. /** \brief retuns human-readable actor identity
  278. *
  279. * The identity can be assigned either directly in ctor, or via address_maker plugin
  280. *
  281. */
  282. inline const std::string &get_identity() const noexcept { return identity; }
  283. /** \brief flag to mark, that actor is already executing initialization */
  284. static const constexpr std::uint32_t PROGRESS_INIT = 1 << 0;
  285. /** \brief flag to mark, that actor is already executing shutdown */
  286. static const constexpr std::uint32_t PROGRESS_SHUTDOWN = 1 << 1;
  287. /** \brief flag to mark, that actor is already executing shutdown
  288. *
  289. * When actor is shutdown due to failure, if this flag is ON, then
  290. * it will trigger it's supervisor shutdown.
  291. *
  292. * This policy is ignored when actor is spawned.
  293. *
  294. */
  295. static const constexpr std::uint32_t ESCALATE_FALIURE = 1 << 2;
  296. /** \brief flag to mark, that actor trigger supervisor shutdown
  297. *
  298. * When actor is shutdown (for whatever reason), if this flag is ON, then
  299. * it will trigger it's supervisor shutdown.
  300. *
  301. * This policy is ignored when actor is spawned.
  302. *
  303. */
  304. static const constexpr std::uint32_t AUTOSHUTDOWN_SUPERVISOR = 1 << 3;
  305. /** \brief whether spawner should create a new instance of the actor
  306. *
  307. * When then actor is spawned via a spawner, and it becomes down,
  308. * the spawner will ask the curretn instance whether it should
  309. * spawn another one.
  310. *
  311. * This method is consulted, only when spawner's restart_policy_t is
  312. * `ask_actor`.
  313. *
  314. */
  315. virtual bool should_restart() const noexcept;
  316. protected:
  317. /** \brief timer-id to timer-handler map (type) */
  318. using timers_map_t = std::unordered_map<request_id_t, timer_handler_ptr_t>;
  319. /** \brief triggers timer handler associated with the timer id */
  320. void on_timer_trigger(request_id_t request_id, bool cancelled) noexcept;
  321. /** \brief starts timer with pre-forged timer id (aka request-id */
  322. template <typename Delegate, typename Method>
  323. void start_timer(request_id_t request_id, const pt::time_duration &interval, Delegate &delegate,
  324. Method method) noexcept;
  325. /** \brief helper-method, which assigns shutdown reason if it isn't set */
  326. void assign_shutdown_reason(extended_error_ptr_t reason) noexcept;
  327. /** \brief makes extended error within the context of the actor */
  328. extended_error_ptr_t make_error(const std::error_code &ec, const extended_error_ptr_t &next = {}) noexcept;
  329. /** \brief notification, when actor has been unlinked from server actor
  330. *
  331. * Returns boolean, meaning whether actor should initate shutdown. Default value is `true`.
  332. *
  333. */
  334. virtual bool on_unlink(const address_ptr_t &server_addr) noexcept;
  335. /** \brief suspended init request message */
  336. intrusive_ptr_t<message::init_request_t> init_request;
  337. /** \brief suspended shutdown request message */
  338. intrusive_ptr_t<message::shutdown_request_t> shutdown_request;
  339. /** \brief actor address */
  340. address_ptr_t address;
  341. /** \brief actor spawner address */
  342. address_ptr_t spawner_address;
  343. /** \brief actor identity, wich might have some meaning for developers */
  344. std::string identity;
  345. /** \brief non-owning pointer to actor's execution / infrastructure context */
  346. supervisor_t *supervisor;
  347. /** \brief opaque plugins storage (owning) */
  348. plugin_storage_ptr_t plugins_storage;
  349. /** \brief non-ownling list of plugins */
  350. plugins_t plugins;
  351. /** \brief timeout for actor initialization (used by supervisor) */
  352. pt::time_duration init_timeout;
  353. /** \brief timeout for actor shutdown (used by supervisorr) */
  354. pt::time_duration shutdown_timeout;
  355. /** \brief current actor state */
  356. state_t state;
  357. /** \brief non-owning pointer to address_maker plugin */
  358. plugin::address_maker_plugin_t *address_maker = nullptr;
  359. /** \brief non-owning pointer to lifetime plugin */
  360. plugin::lifetime_plugin_t *lifetime = nullptr;
  361. /** \brief non-owning pointer to link_server plugin */
  362. plugin::link_server_plugin_t *link_server = nullptr;
  363. /** \brief non-owning pointer to resources plugin */
  364. plugin::resources_plugin_t *resources = nullptr;
  365. /** \brief finds plugin by plugin class identity
  366. *
  367. * `nullptr` is returned when plugin cannot be found
  368. */
  369. plugin::plugin_base_t *get_plugin(const void *identity) const noexcept;
  370. /** \brief set of activating plugin identities */
  371. std::set<const void *> activating_plugins;
  372. /** \brief set of deactivating plugin identities */
  373. std::set<const void *> deactivating_plugins;
  374. /** \brief timer-id to timer-handler map */
  375. timers_map_t timers_map;
  376. /** \brief set of currently proccessing states, i.e. init or shutdown
  377. *
  378. * This is not the same as `state_t` flag, which just marks the state.
  379. *
  380. * The `continuation_mask` is mostly used by plugins to avoid recursion
  381. *
  382. */
  383. std::uint32_t continuation_mask = 0;
  384. /** \brief explanation, why actor is been requested for shut down */
  385. extended_error_ptr_t shutdown_reason;
  386. friend struct plugin::plugin_base_t;
  387. friend struct plugin::lifetime_plugin_t;
  388. friend struct supervisor_t;
  389. template <typename T> friend struct request_builder_t;
  390. template <typename T, typename M> friend struct accessor_t;
  391. };
  392. } // namespace rotor