016-pub_sub.cpp 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. //
  2. // Copyright (c) 2019-2020 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. #include "catch.hpp"
  7. #include "rotor.hpp"
  8. #include "supervisor_test.h"
  9. #include "access.h"
  10. namespace r = rotor;
  11. namespace rt = r::test;
  12. struct payload_t {};
  13. struct pub_config_t : r::actor_config_t {
  14. r::address_ptr_t pub_addr;
  15. using r::actor_config_t::actor_config_t;
  16. };
  17. template <typename Actor> struct pub_config_builder_t : r::actor_config_builder_t<Actor> {
  18. using builder_t = typename Actor::template config_builder_t<Actor>;
  19. using parent_t = r::actor_config_builder_t<Actor>;
  20. using parent_t::parent_t;
  21. builder_t &&pub_addr(const r::address_ptr_t &addr) {
  22. parent_t::config.pub_addr = addr;
  23. return std::move(*static_cast<builder_t *>(this));
  24. }
  25. bool validate() noexcept override { return parent_t::config.pub_addr && parent_t::validate(); }
  26. };
  27. struct pub_t : public r::actor_base_t {
  28. using config_t = pub_config_t;
  29. template <typename Actor> using config_builder_t = pub_config_builder_t<Actor>;
  30. explicit pub_t(config_t &cfg) : r::actor_base_t(cfg), pub_addr{cfg.pub_addr} {}
  31. void on_start() noexcept override {
  32. r::actor_base_t::on_start();
  33. send<payload_t>(pub_addr);
  34. }
  35. r::address_ptr_t pub_addr;
  36. };
  37. struct sub_t : public r::actor_base_t {
  38. using config_t = pub_config_t;
  39. template <typename Actor> using config_builder_t = pub_config_builder_t<Actor>;
  40. explicit sub_t(config_t &cfg) : r::actor_base_t(cfg), pub_addr{cfg.pub_addr} {}
  41. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  42. plugin.with_casted<r::plugin::starter_plugin_t>(
  43. [this](auto &p) { p.subscribe_actor(&sub_t::on_payload, pub_addr); });
  44. }
  45. void on_payload(r::message_t<payload_t> &) noexcept { ++received; }
  46. std::uint16_t received = 0;
  47. r::address_ptr_t pub_addr;
  48. };
  49. TEST_CASE("ping-pong", "[supervisor]") {
  50. r::system_context_t system_context;
  51. auto sup = system_context.create_supervisor<rt::supervisor_test_t>().timeout(rt::default_timeout).finish();
  52. auto pub_addr = sup->create_address();
  53. sup->create_actor<pub_t>().pub_addr(pub_addr).timeout(rt::default_timeout).finish();
  54. auto sub1 = sup->create_actor<sub_t>().pub_addr(pub_addr).timeout(rt::default_timeout).finish();
  55. auto sub2 = sup->create_actor<sub_t>().pub_addr(pub_addr).timeout(rt::default_timeout).finish();
  56. sup->do_process();
  57. REQUIRE(sub1->received == 1);
  58. REQUIRE(sub2->received == 1);
  59. sup->do_shutdown();
  60. sup->do_process();
  61. REQUIRE(sup->get_state() == r::state_t::SHUT_DOWN);
  62. REQUIRE(sup->get_leader_queue().size() == 0);
  63. REQUIRE(sup->get_points().size() == 0);
  64. CHECK(rt::empty(sup->get_subscription()));
  65. }