079-peer.cpp 14 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. // SPDX-FileCopyrightText: 2019-2024 Ivan Baidakou
  3. #include "test-utils.h"
  4. #include "access.h"
  5. #include "utils/tls.h"
  6. #include "utils/format.hpp"
  7. #include "proto/bep_support.h"
  8. #include "model/cluster.h"
  9. #include "model/messages.h"
  10. #include "net/names.h"
  11. #include "net/messages.h"
  12. #include "net/peer_actor.h"
  13. #include "transport/stream.h"
  14. #include "diff-builder.h"
  15. #include "constants.h"
  16. #include <rotor/asio.hpp>
  17. #include <boost/algorithm/string/replace.hpp>
  18. using namespace syncspirit;
  19. using namespace syncspirit::test;
  20. using namespace syncspirit::model;
  21. using namespace syncspirit::net;
  22. namespace asio = boost::asio;
  23. namespace sys = boost::system;
  24. namespace r = rotor;
  25. namespace ra = r::asio;
  26. using configure_callback_t = std::function<void(r::plugin::plugin_base_t &)>;
  27. auto timeout = r::pt::time_duration{r::pt::millisec{1500}};
  28. auto host = "127.0.0.1";
  29. struct supervisor_t : ra::supervisor_asio_t {
  30. using parent_t = ra::supervisor_asio_t;
  31. using parent_t::parent_t;
  32. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  33. parent_t::configure(plugin);
  34. plugin.with_casted<r::plugin::registry_plugin_t>([&](auto &p) {
  35. p.register_name(names::coordinator, get_address());
  36. p.register_name(names::peer_supervisor, get_address());
  37. });
  38. if (configure_callback) {
  39. configure_callback(plugin);
  40. }
  41. }
  42. void on_child_shutdown(actor_base_t *actor) noexcept override {
  43. if (actor) {
  44. spdlog::info("child shutdown: {}, reason: {}", actor->get_identity(),
  45. actor->get_shutdown_reason()->message());
  46. }
  47. parent_t::on_child_shutdown(actor);
  48. }
  49. void shutdown_finish() noexcept override {
  50. parent_t::shutdown_finish();
  51. if (acceptor) {
  52. acceptor->cancel();
  53. }
  54. }
  55. auto get_state() noexcept { return state; }
  56. asio::ip::tcp::acceptor *acceptor = nullptr;
  57. configure_callback_t configure_callback;
  58. };
  59. using supervisor_ptr_t = r::intrusive_ptr_t<supervisor_t>;
  60. using actor_ptr_t = r::intrusive_ptr_t<peer_actor_t>;
  61. struct fixture_t : private model::diff::contact_visitor_t {
  62. using acceptor_t = asio::ip::tcp::acceptor;
  63. fixture_t() noexcept : ctx(io_ctx), acceptor(io_ctx), peer_sock(io_ctx) {
  64. utils::set_default("trace");
  65. log = utils::get_logger("fixture");
  66. }
  67. virtual void run(bool add_peer = true) noexcept {
  68. known_peer = add_peer;
  69. auto strand = std::make_shared<asio::io_context::strand>(io_ctx);
  70. sup = ctx.create_supervisor<supervisor_t>().strand(strand).timeout(timeout).create_registry().finish();
  71. sup->configure_callback = [&](r::plugin::plugin_base_t &plugin) {
  72. plugin.template with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  73. using cluster_diff_ptr_t = r::intrusive_ptr_t<model::message::model_update_t>;
  74. using cluster_diff_t = typename cluster_diff_ptr_t::element_type;
  75. using contact_diff_ptr_t = r::intrusive_ptr_t<model::message::contact_update_t>;
  76. using contact_diff_t = typename contact_diff_ptr_t::element_type;
  77. p.subscribe_actor(r::lambda<cluster_diff_t>([&](cluster_diff_t &msg) {
  78. LOG_INFO(log, "received cluster diff message");
  79. auto &diff = msg.payload.diff;
  80. auto r = diff->apply(*cluster);
  81. if (!r) {
  82. LOG_ERROR(log, "error updating model: {}", r.assume_error().message());
  83. sup->do_shutdown();
  84. }
  85. }));
  86. p.subscribe_actor(r::lambda<contact_diff_t>([&](contact_diff_t &msg) {
  87. LOG_INFO(log, "received contact diff message");
  88. auto &diff = msg.payload.diff;
  89. auto r = diff->apply(*cluster);
  90. if (!r) {
  91. LOG_ERROR(log, "error updating model: {}", r.assume_error().message());
  92. sup->do_shutdown();
  93. }
  94. }));
  95. });
  96. };
  97. sup->start();
  98. sup->do_process();
  99. my_keys = utils::generate_pair("my").value();
  100. auto md = model::device_id_t::from_cert(my_keys.cert_data).value();
  101. my_device = device_t::create(md, "my-device").value();
  102. peer_keys = utils::generate_pair("peer").value();
  103. auto pd = model::device_id_t::from_cert(peer_keys.cert_data).value();
  104. peer_device = device_t::create(pd, "peer-device").value();
  105. cluster = new cluster_t(my_device, 1, 1);
  106. cluster->get_devices().put(my_device);
  107. if (add_peer) {
  108. cluster->get_devices().put(peer_device);
  109. }
  110. auto ep = asio::ip::tcp::endpoint(asio::ip::make_address(host), 0);
  111. acceptor.open(ep.protocol());
  112. acceptor.bind(ep);
  113. acceptor.listen();
  114. auto local_ep = acceptor.local_endpoint();
  115. acceptor.async_accept(peer_sock, [this](auto ec) { this->accept(ec); });
  116. sup->acceptor = &acceptor;
  117. auto uri_str = fmt::format("tcp://{}:{}/", local_ep.address(), local_ep.port());
  118. LOG_TRACE(log, "Connecting to {}", uri_str);
  119. auto uri = utils::parse(uri_str);
  120. auto cfg = transport::transport_config_t{{}, uri, *sup, {}, true};
  121. client_trans = transport::initiate_stream(cfg);
  122. auto ip = asio::ip::make_address(host);
  123. auto peer_ep = tcp::endpoint(ip, local_ep.port());
  124. auto addresses = std::vector<tcp::endpoint>{peer_ep};
  125. auto addresses_ptr = std::make_shared<decltype(addresses)>(addresses);
  126. transport::error_fn_t on_error = [&](auto &ec) { on_client_error(ec); };
  127. transport::connect_fn_t on_connect = [addresses_ptr, this](const tcp::endpoint &ep) {
  128. LOG_INFO(log, "active/connected");
  129. // main();
  130. };
  131. client_trans->async_connect(addresses_ptr, on_connect, on_error);
  132. io_ctx.run();
  133. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  134. }
  135. virtual void main() noexcept {}
  136. virtual actor_ptr_t create_actor(uint32_t rx_timeout = 2000) noexcept {
  137. if (known_peer) {
  138. auto builder = diff_builder_t(*cluster);
  139. builder.update_state(*peer_device, {}, model::device_state_t::connecting).apply(*sup);
  140. }
  141. auto bep_config = config::bep_config_t();
  142. bep_config.rx_buff_size = 1024;
  143. bep_config.rx_buff_size = rx_timeout;
  144. return sup->create_actor<actor_ptr_t::element_type>()
  145. .timeout(timeout)
  146. .cluster(cluster)
  147. .coordinator(sup->get_address())
  148. .bep_config(bep_config)
  149. .transport(peer_trans)
  150. .peer_device_id(peer_device->device_id())
  151. .device_name("peer-device")
  152. .peer_proto("tcp")
  153. .autoshutdown_supervisor(true)
  154. .finish();
  155. }
  156. virtual void on_client_error(const sys::error_code &ec) noexcept { LOG_WARN(log, "client err: {}", ec.message()); }
  157. virtual void accept(const sys::error_code &ec) noexcept {
  158. LOG_INFO(log, "accept, ec: {}, remote = {}", ec.message(), peer_sock.remote_endpoint());
  159. auto uri = utils::parse("tcp://127.0.0.1:0/");
  160. auto cfg = transport::transport_config_t{{}, uri, *sup, std::move(peer_sock), false};
  161. peer_trans = transport::initiate_stream(cfg);
  162. main();
  163. }
  164. virtual void on_client_write(std::size_t bytes) noexcept { LOG_INFO(log, "client sent {} bytes", bytes); }
  165. virtual void on_client_read(std::size_t bytes) noexcept {
  166. LOG_INFO(log, "client received {} bytes", bytes);
  167. auto result = proto::parse_bep(asio::buffer(rx_buff, bytes)).value();
  168. auto hello = std::get_if<proto::message::Hello>(&result.message);
  169. if (hello) {
  170. return on_hello(std::move(*hello));
  171. }
  172. }
  173. virtual void on_hello(proto::message::Hello msg) noexcept {
  174. LOG_INFO(log, "client received hello message from {}, {}/{}", msg->device_name(), msg->client_name(),
  175. msg->client_version());
  176. }
  177. virtual void send_hello() noexcept {
  178. proto::make_hello_message(tx_buff, "self-name");
  179. transport::io_fn_t on_write = [&](size_t bytes) { on_client_write(bytes); };
  180. transport::io_fn_t on_read = [&](size_t bytes) { on_client_read(bytes); };
  181. transport::error_fn_t on_error = [&](auto &ec) { on_client_error(ec); };
  182. auto tx_buff_ = asio::buffer(tx_buff.data(), tx_buff.size());
  183. auto rx_buff_ = asio::buffer(rx_buff, sizeof(rx_buff));
  184. client_trans->async_recv(rx_buff_, on_read, on_error);
  185. client_trans->async_send(tx_buff_, on_write, on_error);
  186. }
  187. cluster_ptr_t cluster;
  188. supervisor_ptr_t sup;
  189. asio::io_context io_ctx;
  190. ra::system_context_asio_t ctx;
  191. acceptor_t acceptor;
  192. asio::ip::tcp::socket peer_sock;
  193. utils::logger_t log;
  194. utils::key_pair_t peer_keys;
  195. utils::key_pair_t my_keys;
  196. model::device_ptr_t peer_device;
  197. model::device_ptr_t my_device;
  198. transport::stream_sp_t peer_trans;
  199. transport::stream_sp_t client_trans;
  200. fmt::memory_buffer tx_buff;
  201. char rx_buff[2000];
  202. bool known_peer = false;
  203. };
  204. void test_shutdown_on_hello_timeout() {
  205. struct F : fixture_t {
  206. void main() noexcept override { auto act = create_actor(1); }
  207. void run(bool add_peer = true) noexcept override {
  208. fixture_t::run(add_peer);
  209. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  210. }
  211. };
  212. F().run();
  213. }
  214. void test_online_on_hello() {
  215. struct F : fixture_t {
  216. void main() noexcept override {
  217. create_actor();
  218. send_hello();
  219. }
  220. void on_hello(proto::message::Hello msg) noexcept override {
  221. auto peer = cluster->get_devices().by_sha256(peer_device->device_id().get_sha256());
  222. CHECK(peer->get_state() == device_state_t::online);
  223. }
  224. };
  225. F().run();
  226. }
  227. void test_hello_from_unknown() {
  228. struct F : fixture_t {
  229. void main() noexcept override {
  230. create_actor();
  231. send_hello();
  232. }
  233. void on_hello(proto::message::Hello msg) noexcept override {
  234. CHECK(cluster->get_devices().size() == 1);
  235. auto &unknown_devices = cluster->get_unknown_devices();
  236. CHECK(unknown_devices.size() == 1);
  237. auto peer = unknown_devices.by_sha256(peer_device->device_id().get_sha256());
  238. REQUIRE(peer);
  239. CHECK(peer->get_name() == "self-name");
  240. CHECK(peer->get_client_name() == constants::client_name);
  241. CHECK(peer->get_client_version() == constants::client_version);
  242. CHECK(peer->get_address() == "tcp://0.0.0.0:0");
  243. auto delta = pt::microsec_clock::local_time() - peer->get_last_seen();
  244. CHECK(delta.seconds() <= 2);
  245. }
  246. };
  247. F().run(false);
  248. }
  249. void test_hello_from_known_unknown() {
  250. struct F : fixture_t {
  251. void main() noexcept override {
  252. diff_builder_t(*cluster).add_unknown_device(peer_device->device_id(), {}).apply(*sup);
  253. REQUIRE(cluster->get_unknown_devices().size() == 1);
  254. create_actor();
  255. send_hello();
  256. }
  257. void on_hello(proto::message::Hello msg) noexcept override {
  258. CHECK(cluster->get_devices().size() == 1);
  259. auto &unknown_devices = cluster->get_unknown_devices();
  260. CHECK(unknown_devices.size() == 1);
  261. auto peer = unknown_devices.by_sha256(peer_device->device_id().get_sha256());
  262. REQUIRE(peer);
  263. CHECK(peer->get_name() == "self-name");
  264. CHECK(peer->get_client_name() == constants::client_name);
  265. CHECK(peer->get_client_version() == constants::client_version);
  266. CHECK(peer->get_address() == "tcp://0.0.0.0:0");
  267. auto delta = pt::microsec_clock::local_time() - peer->get_last_seen();
  268. CHECK(delta.seconds() <= 2);
  269. }
  270. };
  271. F().run(false);
  272. }
  273. void test_hello_from_ignored() {
  274. struct F : fixture_t {
  275. void main() noexcept override {
  276. diff_builder_t(*cluster).add_ignored_device(peer_device->device_id(), {}).apply(*sup);
  277. REQUIRE(cluster->get_ignored_devices().size() == 1);
  278. create_actor();
  279. send_hello();
  280. }
  281. void on_hello(proto::message::Hello msg) noexcept override {
  282. CHECK(cluster->get_devices().size() == 1);
  283. auto &ignored_devices = cluster->get_ignored_devices();
  284. CHECK(ignored_devices.size() == 1);
  285. auto peer = ignored_devices.by_sha256(peer_device->device_id().get_sha256());
  286. REQUIRE(peer);
  287. CHECK(peer->get_name() == "self-name");
  288. CHECK(peer->get_client_name() == constants::client_name);
  289. CHECK(peer->get_client_version() == constants::client_version);
  290. CHECK(peer->get_address() == "tcp://0.0.0.0:0");
  291. auto delta = pt::microsec_clock::local_time() - peer->get_last_seen();
  292. CHECK(delta.seconds() <= 2);
  293. REQUIRE(cluster->get_unknown_devices().size() == 0);
  294. }
  295. };
  296. F().run(false);
  297. }
  298. int _init() {
  299. REGISTER_TEST_CASE(test_shutdown_on_hello_timeout, "test_shutdown_on_hello_timeout", "[peer]");
  300. REGISTER_TEST_CASE(test_online_on_hello, "test_online_on_hello", "[peer]");
  301. REGISTER_TEST_CASE(test_hello_from_unknown, "test_hello_from_unknown", "[peer]");
  302. REGISTER_TEST_CASE(test_hello_from_known_unknown, "test_hello_from_known_unknown", "[peer]");
  303. REGISTER_TEST_CASE(test_hello_from_ignored, "test_hello_from_ignored", "[peer]");
  304. return 1;
  305. }
  306. static int v = _init();