registry.h 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. #pragma once
  2. //
  3. // Copyright (c) 2019-2020 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  4. //
  5. // Distributed under the MIT Software License
  6. //
  7. #include "plugin_base.h"
  8. #include "link_client.h"
  9. #include <string>
  10. #include <unordered_map>
  11. namespace rotor::plugin {
  12. /** \struct registry_plugin_t
  13. *
  14. * \brief handy access to {@link registry_t}, for name registration and discovery
  15. *
  16. * Can use {@link registry_t} to register name/address during actor init phase
  17. * and perform the reverse operation to deregister name/address during shutdown phase.
  18. *
  19. * Simialary the {@link registry_t} can be used to discover name in the registry
  20. * and link the current actor to the target address during init phase; there is
  21. * no reverse operation during shutdown phase, because unlinking will be handled
  22. * by {@link plugin::link_client_plugin_t} plugin.
  23. *
  24. */
  25. struct registry_plugin_t : public plugin_base_t {
  26. using plugin_base_t::plugin_base_t;
  27. /** \brief phase for each discovery task: discovering or linking */
  28. enum class phase_t { discovering, linking };
  29. /** \struct discovery_task_t
  30. * \brief helper class to invoke callback upon address discovery
  31. */
  32. struct discovery_task_t {
  33. /** \brief callback for the discovery progress */
  34. using callback_t = std::function<void(phase_t phase, const std::error_code &)>;
  35. /** \brief sets that linking should be performed on operational-only discovered address */
  36. discovery_task_t &link(bool operational_only_ = true) noexcept {
  37. link_on_discovery = true;
  38. operational_only = operational_only_;
  39. return *this;
  40. }
  41. /** \brief discovery progress callback setter */
  42. template <typename Callback> void callback(Callback &&cb) noexcept {
  43. task_callback = std::forward<Callback>(cb);
  44. }
  45. private:
  46. discovery_task_t(registry_plugin_t &plugin_, address_ptr_t *address_, std::string service_name_, bool delayed_)
  47. : plugin{plugin_}, address(address_), service_name{service_name_}, delayed{delayed_} {}
  48. operator bool() const noexcept { return address; }
  49. void on_discovery(const std::error_code &ec) noexcept;
  50. void continue_init(const std::error_code &ec) noexcept;
  51. registry_plugin_t &plugin;
  52. address_ptr_t *address;
  53. std::string service_name;
  54. bool delayed;
  55. bool link_on_discovery = false;
  56. bool operational_only = false;
  57. bool requested = false;
  58. request_id_t request_id = 0;
  59. callback_t task_callback;
  60. friend struct registry_plugin_t;
  61. };
  62. /** The plugin unique identity to allow further static_cast'ing*/
  63. static const void *class_identity;
  64. const void *identity() const noexcept override;
  65. void activate(actor_base_t *actor) noexcept override;
  66. /** \brief reaction on registration response */
  67. virtual void on_registration(message::registration_response_t &) noexcept;
  68. /** \brief reaction on discovery response */
  69. virtual void on_discovery(message::discovery_response_t &) noexcept;
  70. /** \brief reaction on discovery future */
  71. virtual void on_future(message::discovery_future_t &message) noexcept;
  72. /** \brief enqueues name/address registration
  73. *
  74. * It links with registry actor first upon demand, and then sends to it
  75. * name registration request(s).
  76. */
  77. virtual void register_name(const std::string &name, const address_ptr_t &address) noexcept;
  78. /** \brief creates name discovery task
  79. *
  80. * The address pointer is the place, where the discovered address should be stored.
  81. *
  82. * The `delayed` means: if the name is missing in the registry, do not response
  83. * with error (which will cause shutdown of client), but wait until the name be
  84. * registered, and only then reply with found address. In other words: instead
  85. * of sending discovery request, it will send discovery future.
  86. *
  87. */
  88. virtual discovery_task_t &discover_name(const std::string &name, address_ptr_t &address,
  89. bool delayed = false) noexcept;
  90. bool handle_shutdown(message::shutdown_request_t *message) noexcept override;
  91. bool handle_init(message::init_request_t *message) noexcept override;
  92. /** \brief generic non-public fields accessor */
  93. template <typename T> auto &access() noexcept;
  94. private:
  95. template <typename Message> void process_discovery(Message &message) noexcept {
  96. auto &service = message.payload.req->payload.request_payload.service_name;
  97. auto &ec = message.payload.ec;
  98. auto it = discovery_map.find(service);
  99. assert(it != discovery_map.end());
  100. if (!ec) {
  101. *it->second.address = message.payload.res.service_addr;
  102. }
  103. it->second.on_discovery(ec);
  104. }
  105. enum class state_t { REGISTERING, LINKING, OPERATIONAL, UNREGISTERING };
  106. struct register_info_t {
  107. address_ptr_t address;
  108. state_t state;
  109. };
  110. using register_map_t = std::unordered_map<std::string, register_info_t>;
  111. using discovery_map_t = std::unordered_map<std::string, discovery_task_t>;
  112. enum plugin_state_t : std::uint32_t {
  113. CONFIGURED = 1 << 0,
  114. LINKING = 1 << 1,
  115. LINKED = 1 << 2,
  116. };
  117. std::uint32_t plugin_state = 0;
  118. register_map_t register_map;
  119. discovery_map_t discovery_map;
  120. void link_registry() noexcept;
  121. void on_link(const std::error_code &ec) noexcept;
  122. bool has_registering() noexcept;
  123. virtual void continue_init(const std::error_code &ec) noexcept;
  124. };
  125. } // namespace rotor::plugin