actor_base.h 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. #pragma once
  2. //
  3. // Copyright (c) 2019-2022 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  4. //
  5. // Distributed under the MIT Software License
  6. //
  7. #include "forward.hpp"
  8. #include "address.hpp"
  9. #include "actor_config.h"
  10. #include "messages.hpp"
  11. #include "state.h"
  12. #include "handler.h"
  13. #include "extended_error.h"
  14. #include "timer_handler.hpp"
  15. #include <set>
  16. #if defined(_MSC_VER)
  17. #pragma warning(push)
  18. #pragma warning(disable : 4251)
  19. #endif
  20. namespace rotor {
  21. /** \struct actor_base_t
  22. * \brief universal primitive of concurrent computation
  23. *
  24. * The class is base class for user-defined actors. It is expected that
  25. * actors will react on incoming messages (e.g. by changing internal
  26. * /private state) or send (other) messages to other actors, or do
  27. * some side-effects (I/O, etc.).
  28. *
  29. * Message passing interface is asynchronous, they are send to {@link supervisor_t}.
  30. *
  31. * Every actor belong to some {@link supervisor_t}, which "injects" the thread-safe
  32. * execution context, in a sense, that the actor can call it's own methods as well
  33. * as supervisors without any need of synchonization.
  34. *
  35. * All actor methods are thread-unsafe, i.e. should not be called with except of
  36. * it's own supervisor. Communication with actor should be performed via messages.
  37. *
  38. * Actor is addressed by it's "main" address; however it is possible for an actor
  39. * to have multiple identities aka "virtual" addresses.
  40. *
  41. */
  42. struct ROTOR_API actor_base_t : public arc_base_t<actor_base_t> {
  43. /** \brief injects an alias for actor_config_t */
  44. using config_t = actor_config_t;
  45. /** \brief injects templated actor_config_builder_t */
  46. template <typename Actor> using config_builder_t = actor_config_builder_t<Actor>;
  47. /** \brief SFINAE handler detector
  48. *
  49. * Either handler can be constructed from memeber-to-function-pointer or
  50. * it is already constructed and have a base `handler_base_t`
  51. */
  52. template <typename Handler>
  53. using is_handler =
  54. std::enable_if_t<std::is_member_function_pointer_v<Handler> || std::is_base_of_v<handler_base_t, Handler>>;
  55. // clang-format off
  56. /** \brief the default list of plugins for an actor
  57. *
  58. * The order of plugins is very important, as they are initialized in the direct order
  59. * and deinitilized in the reverse order.
  60. *
  61. */
  62. using plugins_list_t = std::tuple<
  63. plugin::address_maker_plugin_t,
  64. plugin::lifetime_plugin_t,
  65. plugin::init_shutdown_plugin_t,
  66. plugin::link_server_plugin_t,
  67. plugin::link_client_plugin_t,
  68. plugin::registry_plugin_t,
  69. plugin::resources_plugin_t,
  70. plugin::starter_plugin_t>;
  71. // clang-format on
  72. /** \brief constructs actor and links it's supervisor
  73. *
  74. * An actor cannot outlive it's supervisor.
  75. *
  76. * Sets internal actor state to `NEW`
  77. *
  78. */
  79. actor_base_t(config_t &cfg);
  80. virtual ~actor_base_t();
  81. /** \brief early actor initialization (pre-initialization)
  82. *
  83. * Actor's plugins are activated, "main" address is created
  84. * (via {@link plugin::address_maker_plugin_t}), state is set
  85. * to `INITIALIZING` (via {@link plugin::init_shutdown_plugin_t}).
  86. *
  87. */
  88. virtual void do_initialize(system_context_t *ctx) noexcept;
  89. /** \brief convenient method to send actor's supervisor shutdown trigger message
  90. *
  91. * If actor is already shutting down, the method will do nothing, otherwise
  92. * it will send shutdown trigger to its supervisor.
  93. *
  94. * The shutdown reason is forwarded "as is". If it is missing, than it will
  95. * be constructed with the error code "normal shutdown".
  96. */
  97. virtual void do_shutdown(const extended_error_ptr_t &reason = {}) noexcept;
  98. /** \brief actor is fully initialized and it's supervisor has sent signal to start
  99. *
  100. * The actor state is set to `OPERATIONAL`.
  101. *
  102. */
  103. virtual void on_start() noexcept;
  104. /** \brief sends message to the destination address
  105. *
  106. * Internally it just constructs new message in supervisor's outbound queue.
  107. *
  108. */
  109. template <typename M, typename... Args> void send(const address_ptr_t &addr, Args &&...args);
  110. /** \brief returns request builder for destination address using the "main" actor address
  111. *
  112. * The `args` are forwarded for construction of the request. The request is not actually sent,
  113. * until `send` method of {@link request_builder_t} will be invoked.
  114. *
  115. * Supervisor will spawn timeout timer upon `timeout` method.
  116. */
  117. template <typename R, typename... Args>
  118. request_builder_t<typename request_wrapper_t<R>::request_t> request(const address_ptr_t &dest_addr, Args &&...args);
  119. /** \brief returns request builder for destination address using the specified address for reply
  120. *
  121. * It is assumed, that the specified address belongs to the actor.
  122. *
  123. * The method is useful, when a different behavior is needed for the same
  124. * message response types. It serves at some extend as virtual dispatching within
  125. * the actor.
  126. *
  127. * See the description of `request` method.
  128. *
  129. */
  130. template <typename R, typename... Args>
  131. request_builder_t<typename request_wrapper_t<R>::request_t>
  132. request_via(const address_ptr_t &dest_addr, const address_ptr_t &reply_addr, Args &&...args);
  133. /** \brief convenient method for constructing and sending response to a request
  134. *
  135. * `args` are forwarded to response payload constuction
  136. */
  137. template <typename Request, typename... Args> void reply_to(Request &message, Args &&...args);
  138. /** \brief convenient method for constructing and sending error response to a request */
  139. template <typename Request> void reply_with_error(Request &message, const extended_error_ptr_t &ec);
  140. /** \brief makes response to the request, but does not send it.
  141. *
  142. * The return type is intrusive pointer to the message, not the message itself.
  143. *
  144. * It can be useful for delayed responses. The response can be dispatched later via
  145. * supervisor->put(std::move(response_ptr));
  146. *
  147. */
  148. template <typename Request, typename... Args> auto make_response(Request &message, Args &&...args);
  149. /** \brief makes error response to the request, but does not send it.
  150. *
  151. * The return type is intrusive pointer to the message, not the message itself.
  152. *
  153. * It can be useful for delayed responses. The response can be dispatched later via
  154. * supervisor->put(std::move(response_ptr));
  155. *
  156. */
  157. template <typename Request> auto make_response(Request &message, const extended_error_ptr_t &ec);
  158. /** \brief subscribes actor's handler to process messages on the specified address */
  159. template <typename Handler> subscription_info_ptr_t subscribe(Handler &&h, const address_ptr_t &addr) noexcept;
  160. /** \brief subscribes actor's handler to process messages on the actor's "main" address */
  161. template <typename Handler> subscription_info_ptr_t subscribe(Handler &&h) noexcept;
  162. /** \brief unsubscribes actor's handler from process messages on the specified address */
  163. template <typename Handler, typename = is_handler<Handler>>
  164. void unsubscribe(Handler &&h, address_ptr_t &addr) noexcept;
  165. /** \brief unsubscribes actor's handler from processing messages on the actor's "main" address */
  166. template <typename Handler, typename = is_handler<Handler>> void unsubscribe(Handler &&h) noexcept;
  167. /* \brief initiates handler unsubscription from the address
  168. *
  169. * If the address is local, then unsubscription confirmation is sent immediately,
  170. * otherwise {@link payload::external_subscription_t} request is sent to the external
  171. * supervisor, which owns the address.
  172. *
  173. * The optional call can be providded to be called upon message destruction.
  174. *
  175. */
  176. /** \brief initiates handler unsubscription from the default actor address */
  177. inline void unsubscribe(const handler_ptr_t &h) noexcept { lifetime->unsubscribe(h, address); }
  178. /** \brief starts plugins activation */
  179. void activate_plugins() noexcept;
  180. /** \brief finishes plugin activation, successful or not */
  181. void commit_plugin_activation(plugin::plugin_base_t &plugin, bool success) noexcept;
  182. /** \brief starts plugins deactivation */
  183. void deactivate_plugins() noexcept;
  184. /** \brief finishes plugin deactivation */
  185. void commit_plugin_deactivation(plugin::plugin_base_t &plugin) noexcept;
  186. /** \brief propaagtes subscription message to corresponding actors */
  187. void on_subscription(message::subscription_t &message) noexcept;
  188. /** \brief propaagtes unsubscription message to corresponding actors */
  189. void on_unsubscription(message::unsubscription_t &message) noexcept;
  190. /** \brief propaagtes external unsubscription message to corresponding actors */
  191. void on_unsubscription_external(message::unsubscription_external_t &message) noexcept;
  192. /** \brief creates new unique address for an actor (via address_maker plugin) */
  193. address_ptr_t create_address() noexcept;
  194. /** \brief starts shutdown procedure, e.g. upon receiving shutdown request
  195. *
  196. * The actor state is set to SHUTTING_DOWN.
  197. *
  198. */
  199. virtual void shutdown_start() noexcept;
  200. /** \brief polls plugins for shutdown
  201. *
  202. * The poll is performed in the reverse order. If all plugins, with active
  203. * shutdown reaction confirm they are ready to shutdown, then the
  204. * `shutdown_finish` method is invoked.
  205. *
  206. */
  207. void shutdown_continue() noexcept;
  208. /** \brief finalizes shutdown
  209. *
  210. * The shutdown response is sent and actor state is set to SHUT_DOWN.
  211. *
  212. * This is the last action in the shutdown sequence.
  213. * No further methods will be invoked on the actor.
  214. *
  215. * All unfinished requests and untriggered timers will be cancelled
  216. * by force in the method.
  217. *
  218. */
  219. virtual void shutdown_finish() noexcept;
  220. /** \brief starts initialization procedure
  221. *
  222. * The actor state is set to INITIALIZING.
  223. *
  224. */
  225. virtual void init_start() noexcept;
  226. /** \brief polls plugins whether they completed initialization.
  227. *
  228. * The poll is performed in the direct order. If all plugins, with active
  229. * init reaction confirm they are ready, then the `init_finish` method
  230. * is invoked.
  231. *
  232. */
  233. void init_continue() noexcept;
  234. /** \brief finalizes initialization
  235. *
  236. * The init response is sent and actor state is set to INITIALIZED.
  237. *
  238. */
  239. virtual void init_finish() noexcept;
  240. /** \brief main callback for plugin configuration when it's ready */
  241. virtual void configure(plugin::plugin_base_t &plugin) noexcept;
  242. /** \brief generic non-public fields accessor */
  243. template <typename T> auto &access() noexcept;
  244. /** \brief generic non-public methods accessor */
  245. template <typename T, typename... Args> auto access(Args... args) noexcept;
  246. /** \brief generic non-public fields accessor */
  247. template <typename T> auto &access() const noexcept;
  248. /** \brief generic non-public methods accessor */
  249. template <typename T, typename... Args> auto access(Args... args) const noexcept;
  250. /** \brief returns actor's main address */
  251. inline const address_ptr_t &get_address() const noexcept { return address; }
  252. /** \brief returns actor's supervisor */
  253. inline supervisor_t &get_supervisor() const noexcept { return *supervisor; }
  254. /** \brief spawns a new one-shot timer
  255. *
  256. * \param interval specifies amount of time, after which the timer will trigger.
  257. * \param delegate is an object of arbitrary class.
  258. * \param method is the pointer-to-member-function of the object, which will be
  259. * invoked upon timer triggering or cancellation.
  260. *
  261. * The `method` parameter should have the following signature:
  262. *
  263. * void Delegate::on_timer(request_id_t, bool cancelled) noexcept;
  264. *
  265. * `start_timer` returns timer identity. It will be supplied to the specified callback,
  266. * or the timer can be cancelled via it.
  267. */
  268. template <typename Delegate, typename Method>
  269. request_id_t start_timer(const pt::time_duration &interval, Delegate &delegate, Method method) noexcept;
  270. /** \brief cancels previously started timer
  271. *
  272. * If timer hasn't been triggered, then it is cancelled and the callback will be invoked
  273. * with `true` to mark that it was cancelled.
  274. *
  275. * Upon cancellation the timer callback will be invoked immediately, in the context of caller.
  276. */
  277. void cancel_timer(request_id_t request_id) noexcept;
  278. /** \brief returns actor shutdwon reason
  279. *
  280. * The shutdown reason should be available if actors' state is already `SHUTTING_DOWN`
  281. *
  282. */
  283. inline const extended_error_ptr_t &get_shutdown_reason() const noexcept { return shutdown_reason; }
  284. /** \brief retuns human-readable actor identity
  285. *
  286. * The identity can be assigned either directly in ctor, or via address_maker plugin
  287. *
  288. */
  289. inline const std::string &get_identity() const noexcept { return identity; }
  290. /** \brief flag to mark, that actor is already executing initialization */
  291. static const constexpr std::uint32_t PROGRESS_INIT = 1 << 0;
  292. /** \brief flag to mark, that actor is already executing shutdown */
  293. static const constexpr std::uint32_t PROGRESS_SHUTDOWN = 1 << 1;
  294. /** \brief flag to mark, that actor is already executing shutdown
  295. *
  296. * When actor is shutdown due to failure, if this flag is ON, then
  297. * it will trigger it's supervisor shutdown.
  298. *
  299. * This policy is ignored when actor is spawned.
  300. *
  301. */
  302. static const constexpr std::uint32_t ESCALATE_FALIURE = 1 << 2;
  303. /** \brief flag to mark, that actor trigger supervisor shutdown
  304. *
  305. * When actor is shutdown (for whatever reason), if this flag is ON, then
  306. * it will trigger it's supervisor shutdown.
  307. *
  308. * This policy is ignored when actor is spawned.
  309. *
  310. */
  311. static const constexpr std::uint32_t AUTOSHUTDOWN_SUPERVISOR = 1 << 3;
  312. /** \brief whether spawner should create a new instance of the actor
  313. *
  314. * When then actor is spawned via a spawner, and it becomes down,
  315. * the spawner will ask the curretn instance whether it should
  316. * spawn another one.
  317. *
  318. * This method is consulted, only when spawner's restart_policy_t is
  319. * `ask_actor`.
  320. *
  321. */
  322. virtual bool should_restart() const noexcept;
  323. protected:
  324. /** \brief timer-id to timer-handler map (type) */
  325. using timers_map_t = std::unordered_map<request_id_t, timer_handler_ptr_t>;
  326. /** \brief list of ids of active requests (type) */
  327. using requests_t = std::unordered_set<request_id_t>;
  328. /** \brief triggers timer handler associated with the timer id */
  329. void on_timer_trigger(request_id_t request_id, bool cancelled) noexcept;
  330. /** \brief starts timer with pre-forged timer id (aka request-id */
  331. template <typename Delegate, typename Method>
  332. void start_timer(request_id_t request_id, const pt::time_duration &interval, Delegate &delegate,
  333. Method method) noexcept;
  334. /** \brief helper-method, which assigns shutdown reason if it isn't set */
  335. void assign_shutdown_reason(extended_error_ptr_t reason) noexcept;
  336. /** \brief makes extended error within the context of the actor */
  337. extended_error_ptr_t make_error(const std::error_code &ec, const extended_error_ptr_t &next = {}) noexcept;
  338. /** \brief notification, when actor has been unlinked from server actor
  339. *
  340. * Returns boolean, meaning whether actor should initate shutdown. Default value is `true`.
  341. *
  342. */
  343. virtual bool on_unlink(const address_ptr_t &server_addr) noexcept;
  344. /** \brief suspended init request message */
  345. intrusive_ptr_t<message::init_request_t> init_request;
  346. /** \brief suspended shutdown request message */
  347. intrusive_ptr_t<message::shutdown_request_t> shutdown_request;
  348. /** \brief actor address */
  349. address_ptr_t address;
  350. /** \brief actor spawner address */
  351. address_ptr_t spawner_address;
  352. /** \brief actor identity, wich might have some meaning for developers */
  353. std::string identity;
  354. /** \brief non-owning pointer to actor's execution / infrastructure context */
  355. supervisor_t *supervisor;
  356. /** \brief opaque plugins storage (owning) */
  357. plugin_storage_ptr_t plugins_storage;
  358. /** \brief non-ownling list of plugins */
  359. plugins_t plugins;
  360. /** \brief timeout for actor initialization (used by supervisor) */
  361. pt::time_duration init_timeout;
  362. /** \brief timeout for actor shutdown (used by supervisorr) */
  363. pt::time_duration shutdown_timeout;
  364. /** \brief current actor state */
  365. state_t state;
  366. /** \brief non-owning pointer to address_maker plugin */
  367. plugin::address_maker_plugin_t *address_maker = nullptr;
  368. /** \brief non-owning pointer to lifetime plugin */
  369. plugin::lifetime_plugin_t *lifetime = nullptr;
  370. /** \brief non-owning pointer to link_server plugin */
  371. plugin::link_server_plugin_t *link_server = nullptr;
  372. /** \brief non-owning pointer to resources plugin */
  373. plugin::resources_plugin_t *resources = nullptr;
  374. /** \brief finds plugin by plugin class identity
  375. *
  376. * `nullptr` is returned when plugin cannot be found
  377. */
  378. plugin::plugin_base_t *get_plugin(const void *identity) const noexcept;
  379. /** \brief set of activating plugin identities */
  380. std::set<const void *> activating_plugins;
  381. /** \brief set of deactivating plugin identities */
  382. std::set<const void *> deactivating_plugins;
  383. /** \brief timer-id to timer-handler map */
  384. timers_map_t timers_map;
  385. /** \brief list of ids of active requests */
  386. requests_t active_requests;
  387. /** \brief set of currently proccessing states, i.e. init or shutdown
  388. *
  389. * This is not the same as `state_t` flag, which just marks the state.
  390. *
  391. * The `continuation_mask` is mostly used by plugins to avoid recursion
  392. *
  393. */
  394. std::uint32_t continuation_mask = 0;
  395. /** \brief explanation, why actor is been requested for shut down */
  396. extended_error_ptr_t shutdown_reason;
  397. friend struct plugin::plugin_base_t;
  398. friend struct plugin::lifetime_plugin_t;
  399. friend struct supervisor_t;
  400. template <typename T> friend struct request_builder_t;
  401. template <typename T, typename M> friend struct accessor_t;
  402. };
  403. } // namespace rotor
  404. #if defined(_MSC_VER)
  405. #pragma warning(pop)
  406. #endif