079-peer.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. // SPDX-FileCopyrightText: 2019-2025 Ivan Baidakou
  3. #include "test-utils.h"
  4. #include "access.h"
  5. #include "utils/tls.h"
  6. #include "utils/format.hpp"
  7. #include "proto/bep_support.h"
  8. #include "model/cluster.h"
  9. #include "model/messages.h"
  10. #include "model/diff/cluster_visitor.h"
  11. #include "net/names.h"
  12. #include "net/messages.h"
  13. #include "net/peer_actor.h"
  14. #include "transport/stream.h"
  15. #include "diff-builder.h"
  16. #include "constants.h"
  17. #include <rotor/asio.hpp>
  18. #include <boost/algorithm/string/replace.hpp>
  19. using namespace syncspirit;
  20. using namespace syncspirit::test;
  21. using namespace syncspirit::model;
  22. using namespace syncspirit::net;
  23. using namespace std::chrono_literals;
  24. namespace asio = boost::asio;
  25. namespace sys = boost::system;
  26. namespace r = rotor;
  27. namespace ra = r::asio;
  28. using configure_callback_t = std::function<void(r::plugin::plugin_base_t &)>;
  29. auto timeout = r::pt::time_duration{r::pt::millisec{1500}};
  30. auto host = "127.0.0.1";
  31. struct supervisor_t : ra::supervisor_asio_t {
  32. using parent_t = ra::supervisor_asio_t;
  33. using parent_t::parent_t;
  34. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  35. parent_t::configure(plugin);
  36. plugin.with_casted<r::plugin::registry_plugin_t>([&](auto &p) {
  37. p.register_name(names::coordinator, get_address());
  38. p.register_name(names::peer_supervisor, get_address());
  39. });
  40. if (configure_callback) {
  41. configure_callback(plugin);
  42. }
  43. }
  44. void on_child_shutdown(actor_base_t *actor) noexcept override {
  45. if (actor) {
  46. spdlog::info("child shutdown: {}, reason: {}", actor->get_identity(),
  47. actor->get_shutdown_reason()->message());
  48. }
  49. parent_t::on_child_shutdown(actor);
  50. }
  51. void shutdown_finish() noexcept override {
  52. parent_t::shutdown_finish();
  53. if (acceptor) {
  54. acceptor->cancel();
  55. }
  56. }
  57. auto get_state() noexcept { return state; }
  58. asio::ip::tcp::acceptor *acceptor = nullptr;
  59. configure_callback_t configure_callback;
  60. };
  61. using supervisor_ptr_t = r::intrusive_ptr_t<supervisor_t>;
  62. using actor_ptr_t = r::intrusive_ptr_t<peer_actor_t>;
  63. struct fixture_t : private model::diff::cluster_visitor_t {
  64. using acceptor_t = asio::ip::tcp::acceptor;
  65. using timer_t = asio::deadline_timer;
  66. using timer_ptr_t = std::unique_ptr<timer_t>;
  67. fixture_t() noexcept : ctx(io_ctx), acceptor(io_ctx), peer_sock(io_ctx), pre_main_counter{2} {
  68. test::init_logging();
  69. log = utils::get_logger("fixture");
  70. }
  71. virtual void run(bool add_peer = true) noexcept {
  72. known_peer = add_peer;
  73. auto strand = std::make_shared<asio::io_context::strand>(io_ctx);
  74. sup = ctx.create_supervisor<supervisor_t>().strand(strand).timeout(timeout).create_registry().finish();
  75. sup->configure_callback = [&](r::plugin::plugin_base_t &plugin) {
  76. plugin.template with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  77. using cluster_diff_ptr_t = r::intrusive_ptr_t<model::message::model_update_t>;
  78. using cluster_diff_t = typename cluster_diff_ptr_t::element_type;
  79. p.subscribe_actor(r::lambda<cluster_diff_t>([&](cluster_diff_t &msg) {
  80. LOG_INFO(log, "received cluster diff message");
  81. auto &diff = msg.payload.diff;
  82. auto r = diff->apply(*cluster, get_apply_controller());
  83. if (!r) {
  84. LOG_ERROR(log, "error updating model: {}", r.assume_error().message());
  85. sup->do_shutdown();
  86. }
  87. }));
  88. });
  89. };
  90. sup->start();
  91. sup->do_process();
  92. my_keys = utils::generate_pair("my").value();
  93. auto md = model::device_id_t::from_cert(my_keys.cert_data).value();
  94. my_device = device_t::create(md, "my-device").value();
  95. peer_keys = utils::generate_pair("peer").value();
  96. auto pd = model::device_id_t::from_cert(peer_keys.cert_data).value();
  97. peer_device = device_t::create(pd, "peer-device").value();
  98. cluster = new cluster_t(my_device, 1);
  99. cluster->get_devices().put(my_device);
  100. if (add_peer) {
  101. cluster->get_devices().put(peer_device);
  102. }
  103. auto ep = asio::ip::tcp::endpoint(asio::ip::make_address(host), 0);
  104. acceptor.open(ep.protocol());
  105. acceptor.bind(ep);
  106. acceptor.listen();
  107. auto local_ep = acceptor.local_endpoint();
  108. acceptor.async_accept(peer_sock, [this](auto ec) { this->accept(ec); });
  109. sup->acceptor = &acceptor;
  110. auto uri_str = fmt::format("tcp://{}:{}/", local_ep.address(), local_ep.port());
  111. LOG_TRACE(log, "Connecting to {}", uri_str);
  112. auto uri = utils::parse(uri_str);
  113. auto cfg = transport::transport_config_t{{}, uri, *sup, {}, true};
  114. client_trans = transport::initiate_stream(cfg);
  115. auto ip = asio::ip::make_address(host);
  116. auto peer_ep = tcp::endpoint(ip, local_ep.port());
  117. auto addresses = std::vector<tcp::endpoint>{peer_ep};
  118. auto addresses_ptr = std::make_shared<decltype(addresses)>(addresses);
  119. transport::error_fn_t on_error = [&](auto &ec) { on_client_error(ec); };
  120. transport::connect_fn_t on_connect = [addresses_ptr, this](const tcp::endpoint &) {
  121. LOG_INFO(log, "active/connected");
  122. try_main();
  123. };
  124. client_trans->async_connect(addresses_ptr, on_connect, on_error);
  125. std::this_thread::sleep_for(1ms);
  126. io_ctx.run();
  127. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  128. }
  129. void try_main() {
  130. --pre_main_counter;
  131. LOG_DEBUG(log, "try_main, counter = {}", pre_main_counter);
  132. if (!pre_main_counter) {
  133. main();
  134. }
  135. }
  136. virtual void main() noexcept {}
  137. virtual actor_ptr_t create_actor(uint32_t rx_timeout = 2000) noexcept {
  138. if (known_peer) {
  139. auto builder = diff_builder_t(*cluster);
  140. builder.update_state(*peer_device, {}, model::device_state_t::connecting).apply(*sup);
  141. }
  142. auto bep_config = config::bep_config_t();
  143. bep_config.rx_buff_size = 1024;
  144. bep_config.rx_timeout = rx_timeout;
  145. LOG_INFO(log, "crearing actor, timeout = {}", rx_timeout);
  146. return sup->create_actor<actor_ptr_t::element_type>()
  147. .timeout(timeout)
  148. .cluster(cluster)
  149. .coordinator(sup->get_address())
  150. .bep_config(bep_config)
  151. .transport(peer_trans)
  152. .peer_device_id(peer_device->device_id())
  153. .device_name("peer-device")
  154. .peer_proto("tcp")
  155. .autoshutdown_supervisor(true)
  156. .finish();
  157. }
  158. virtual void on_client_error(const sys::error_code &ec) noexcept { LOG_WARN(log, "client err: {}", ec.message()); }
  159. virtual void accept(const sys::error_code &ec) noexcept {
  160. LOG_INFO(log, "accept, ec: {}, remote = {}", ec.message(), peer_sock.remote_endpoint());
  161. auto uri = utils::parse("tcp://127.0.0.1:0/");
  162. auto cfg = transport::transport_config_t{{}, uri, *sup, std::move(peer_sock), false};
  163. peer_trans = transport::initiate_stream(cfg);
  164. try_main();
  165. }
  166. virtual void on_client_write(std::size_t bytes) noexcept { LOG_INFO(log, "client sent {} bytes", bytes); }
  167. virtual void on_client_read(std::size_t bytes) noexcept {
  168. LOG_INFO(log, "client received {} bytes", bytes);
  169. auto result = proto::parse_bep(asio::buffer(rx_buff, bytes)).value();
  170. auto hello = std::get_if<proto::message::Hello>(&result.message);
  171. // On some platforms 'on_client_read' triggers before 'on_server_send', but as we
  172. // check server code here, there might be problems. So, give server-side
  173. // some time via timer.
  174. if (hello) {
  175. assert(!hello_timer);
  176. hello_timer.reset(new timer_t(ctx.get_io_context()));
  177. hello_timer->expires_from_now(r::pt::milliseconds{1});
  178. hello_timer->async_wait([hello = std::move(*hello), this](const sys::error_code &ec) mutable {
  179. LOG_INFO(log, "on hello timer, ec: {}", ec.value());
  180. on_hello(std::move(hello));
  181. hello_timer.reset();
  182. });
  183. }
  184. }
  185. virtual void on_hello(proto::message::Hello msg) noexcept {
  186. LOG_INFO(log, "client received hello message from {}, {}/{}", msg->device_name(), msg->client_name(),
  187. msg->client_version());
  188. }
  189. virtual void send_hello() noexcept {
  190. proto::make_hello_message(tx_buff, "self-name");
  191. transport::io_fn_t on_write = [&](size_t bytes) { on_client_write(bytes); };
  192. transport::io_fn_t on_read = [&](size_t bytes) { on_client_read(bytes); };
  193. transport::error_fn_t on_error = [&](auto &ec) { on_client_error(ec); };
  194. auto tx_buff_ = asio::buffer(tx_buff.data(), tx_buff.size());
  195. auto rx_buff_ = asio::buffer(rx_buff, sizeof(rx_buff));
  196. client_trans->async_recv(rx_buff_, on_read, on_error);
  197. client_trans->async_send(tx_buff_, on_write, on_error);
  198. }
  199. cluster_ptr_t cluster;
  200. supervisor_ptr_t sup;
  201. asio::io_context io_ctx;
  202. ra::system_context_asio_t ctx;
  203. acceptor_t acceptor;
  204. asio::ip::tcp::socket peer_sock;
  205. utils::logger_t log;
  206. utils::key_pair_t peer_keys;
  207. utils::key_pair_t my_keys;
  208. model::device_ptr_t peer_device;
  209. model::device_ptr_t my_device;
  210. transport::stream_sp_t peer_trans;
  211. transport::stream_sp_t client_trans;
  212. fmt::memory_buffer tx_buff;
  213. timer_ptr_t hello_timer;
  214. char rx_buff[2000];
  215. bool known_peer = false;
  216. int pre_main_counter;
  217. };
  218. void test_shutdown_on_hello_timeout() {
  219. struct F : fixture_t {
  220. void main() noexcept override { auto act = create_actor(1); }
  221. void run(bool add_peer = true) noexcept override {
  222. fixture_t::run(add_peer);
  223. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  224. }
  225. };
  226. F().run();
  227. }
  228. void test_online_on_hello() {
  229. struct F : fixture_t {
  230. void main() noexcept override {
  231. create_actor();
  232. send_hello();
  233. }
  234. void on_hello(proto::message::Hello) noexcept override {
  235. auto peer = cluster->get_devices().by_sha256(peer_device->device_id().get_sha256());
  236. CHECK(peer->get_state() == device_state_t::online);
  237. }
  238. };
  239. F().run();
  240. }
  241. void test_hello_from_unknown() {
  242. struct F : fixture_t {
  243. void main() noexcept override {
  244. create_actor();
  245. send_hello();
  246. }
  247. void on_hello(proto::message::Hello) noexcept override {
  248. CHECK(cluster->get_devices().size() == 1);
  249. auto &unknown_devices = cluster->get_pending_devices();
  250. CHECK(unknown_devices.size() == 1);
  251. auto peer = unknown_devices.by_sha256(peer_device->device_id().get_sha256());
  252. REQUIRE(peer);
  253. CHECK(peer->get_name() == "self-name");
  254. CHECK(peer->get_client_name() == constants::client_name);
  255. CHECK(peer->get_client_version() == constants::client_version);
  256. CHECK(peer->get_address() == "tcp://0.0.0.0:0");
  257. auto delta = pt::microsec_clock::local_time() - peer->get_last_seen();
  258. CHECK(delta.seconds() <= 2);
  259. }
  260. };
  261. F().run(false);
  262. }
  263. void test_hello_from_known_unknown() {
  264. struct F : fixture_t {
  265. void main() noexcept override {
  266. diff_builder_t(*cluster).add_unknown_device(peer_device->device_id(), {}).apply(*sup);
  267. REQUIRE(cluster->get_pending_devices().size() == 1);
  268. create_actor();
  269. send_hello();
  270. }
  271. void on_hello(proto::message::Hello) noexcept override {
  272. CHECK(cluster->get_devices().size() == 1);
  273. auto &unknown_devices = cluster->get_pending_devices();
  274. CHECK(unknown_devices.size() == 1);
  275. auto peer = unknown_devices.by_sha256(peer_device->device_id().get_sha256());
  276. REQUIRE(peer);
  277. CHECK(peer->get_name() == "self-name");
  278. CHECK(peer->get_client_name() == constants::client_name);
  279. CHECK(peer->get_client_version() == constants::client_version);
  280. CHECK(peer->get_address() == "tcp://0.0.0.0:0");
  281. auto delta = pt::microsec_clock::local_time() - peer->get_last_seen();
  282. CHECK(delta.seconds() <= 2);
  283. }
  284. };
  285. F().run(false);
  286. }
  287. void test_hello_from_ignored() {
  288. struct F : fixture_t {
  289. void main() noexcept override {
  290. diff_builder_t(*cluster).add_ignored_device(peer_device->device_id(), {}).apply(*sup);
  291. REQUIRE(cluster->get_ignored_devices().size() == 1);
  292. create_actor();
  293. send_hello();
  294. }
  295. void on_hello(proto::message::Hello) noexcept override {
  296. CHECK(cluster->get_devices().size() == 1);
  297. auto &ignored_devices = cluster->get_ignored_devices();
  298. CHECK(ignored_devices.size() == 1);
  299. auto peer = ignored_devices.by_sha256(peer_device->device_id().get_sha256());
  300. REQUIRE(peer);
  301. CHECK(peer->get_name() == "self-name");
  302. CHECK(peer->get_client_name() == constants::client_name);
  303. CHECK(peer->get_client_version() == constants::client_version);
  304. CHECK(peer->get_address() == "tcp://0.0.0.0:0");
  305. auto delta = pt::microsec_clock::local_time() - peer->get_last_seen();
  306. CHECK(delta.seconds() <= 2);
  307. REQUIRE(cluster->get_pending_devices().size() == 0);
  308. }
  309. };
  310. F().run(false);
  311. }
  312. int _init() {
  313. REGISTER_TEST_CASE(test_shutdown_on_hello_timeout, "test_shutdown_on_hello_timeout", "[peer]");
  314. REGISTER_TEST_CASE(test_online_on_hello, "test_online_on_hello", "[peer]");
  315. REGISTER_TEST_CASE(test_hello_from_unknown, "test_hello_from_unknown", "[peer]");
  316. REGISTER_TEST_CASE(test_hello_from_known_unknown, "test_hello_from_known_unknown", "[peer]");
  317. REGISTER_TEST_CASE(test_hello_from_ignored, "test_hello_from_ignored", "[peer]");
  318. return 1;
  319. }
  320. static int v = _init();