078-relay.cpp 13 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. // SPDX-FileCopyrightText: 2019-2024 Ivan Baidakou
  3. #include "test-utils.h"
  4. #include "access.h"
  5. #include "utils/tls.h"
  6. #include "utils/format.hpp"
  7. #include "model/cluster.h"
  8. #include "model/messages.h"
  9. #include "model/diff/contact/relay_connect_request.h"
  10. #include "net/names.h"
  11. #include "net/messages.h"
  12. #include "net/relay_actor.h"
  13. #include "transport/stream.h"
  14. #include <rotor/asio.hpp>
  15. #include <boost/algorithm/string/replace.hpp>
  16. using namespace syncspirit;
  17. using namespace syncspirit::test;
  18. using namespace syncspirit::model;
  19. using namespace syncspirit::net;
  20. namespace asio = boost::asio;
  21. namespace sys = boost::system;
  22. namespace r = rotor;
  23. namespace ra = r::asio;
  24. using configure_callback_t = std::function<void(r::plugin::plugin_base_t &)>;
  25. auto timeout = r::pt::time_duration{r::pt::millisec{1500}};
  26. auto host = "127.0.0.1";
  27. struct supervisor_t : ra::supervisor_asio_t {
  28. using parent_t = ra::supervisor_asio_t;
  29. using parent_t::parent_t;
  30. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  31. parent_t::configure(plugin);
  32. plugin.with_casted<r::plugin::registry_plugin_t>([&](auto &p) {
  33. p.register_name(names::coordinator, get_address());
  34. p.register_name(names::peer_supervisor, get_address());
  35. p.register_name(names::http11_relay, get_address());
  36. });
  37. if (configure_callback) {
  38. configure_callback(plugin);
  39. }
  40. }
  41. void on_child_shutdown(actor_base_t *actor) noexcept override {
  42. if (actor) {
  43. spdlog::info("child shutdown: {}, reason: {}", actor->get_identity(),
  44. actor->get_shutdown_reason()->message());
  45. }
  46. parent_t::on_child_shutdown(actor);
  47. }
  48. void shutdown_finish() noexcept override {
  49. parent_t::shutdown_finish();
  50. if (acceptor) {
  51. acceptor->cancel();
  52. }
  53. }
  54. auto get_state() noexcept { return state; }
  55. asio::ip::tcp::acceptor *acceptor = nullptr;
  56. configure_callback_t configure_callback;
  57. };
  58. using supervisor_ptr_t = r::intrusive_ptr_t<supervisor_t>;
  59. using actor_ptr_t = r::intrusive_ptr_t<relay_actor_t>;
  60. struct fixture_t : private model::diff::contact_visitor_t {
  61. using acceptor_t = asio::ip::tcp::acceptor;
  62. fixture_t() noexcept : ctx(io_ctx), acceptor(io_ctx), peer_sock(io_ctx) {
  63. utils::set_default("trace");
  64. log = utils::get_logger("fixture");
  65. relay_config = config::relay_config_t{
  66. true,
  67. true,
  68. utils::parse("https://some-endpoint.com/"),
  69. 1024 * 1024,
  70. };
  71. }
  72. void run() noexcept {
  73. auto strand = std::make_shared<asio::io_context::strand>(io_ctx);
  74. sup = ctx.create_supervisor<supervisor_t>().strand(strand).timeout(timeout).create_registry().finish();
  75. sup->configure_callback = [&](r::plugin::plugin_base_t &plugin) {
  76. plugin.template with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  77. using contact_update_t = model::message::contact_update_t;
  78. p.subscribe_actor(r::lambda<contact_update_t>([&](contact_update_t &msg) { on(msg); }));
  79. using http_req_t = net::message::http_request_t;
  80. p.subscribe_actor(r::lambda<http_req_t>([&](http_req_t &req) {
  81. LOG_INFO(log, "received http request");
  82. http::response<http::string_body> res;
  83. res.result(200);
  84. res.body() = public_relays;
  85. sup->reply_to(req, std::move(res), public_relays.size());
  86. }));
  87. using connect_req_t = net::message::connect_request_t;
  88. p.subscribe_actor(r::lambda<connect_req_t>([&](connect_req_t &req) {
  89. LOG_INFO(log, "(connect request)");
  90. on(req);
  91. }));
  92. });
  93. };
  94. sup->start();
  95. sup->do_process();
  96. auto ep = asio::ip::tcp::endpoint(asio::ip::make_address(host), 0);
  97. acceptor.open(ep.protocol());
  98. acceptor.bind(ep);
  99. acceptor.listen();
  100. listening_ep = acceptor.local_endpoint();
  101. my_keys = utils::generate_pair("me").value();
  102. relay_keys = utils::generate_pair("relay").value();
  103. peer_keys = utils::generate_pair("peer").value();
  104. auto md = model::device_id_t::from_cert(my_keys.cert_data).value();
  105. auto rd = model::device_id_t::from_cert(relay_keys.cert_data).value();
  106. auto pd = model::device_id_t::from_cert(peer_keys.cert_data).value();
  107. my_device = device_t::create(md, "my-device").value();
  108. relay_device = device_t::create(rd, "relay-device").value();
  109. peer_device = device_t::create(rd, "peer-device").value();
  110. public_relays = generate_public_relays(listening_ep, relay_device);
  111. log->debug("public relays json: {}", public_relays);
  112. initiate_accept();
  113. cluster = new cluster_t(my_device, 1, 1);
  114. cluster->get_devices().put(my_device);
  115. cluster->get_devices().put(peer_device);
  116. session_key = "lorem-imspum-dolor";
  117. main();
  118. }
  119. virtual void main() noexcept {}
  120. virtual std::string generate_public_relays(const asio::ip::tcp::endpoint &,
  121. model::device_ptr_t &relay_device) noexcept {
  122. std::string pattern = R""(
  123. {
  124. "relays": [
  125. {
  126. "url": "##URL##&pingInterval=0m5s&networkTimeout=2m0s&sessionLimitBps=0&globalLimitBps=0&statusAddr=:22070&providedBy=ina",
  127. "location": {
  128. "latitude": 50.1049,
  129. "longitude": 8.6295,
  130. "city": "Frankfurt am Main",
  131. "country": "DE",
  132. "continent": "EU"
  133. }
  134. }
  135. ]
  136. }
  137. )"";
  138. auto url = fmt::format("relay://{}/?id={}", listening_ep, relay_device->device_id().get_value());
  139. return boost::algorithm::replace_first_copy(pattern, "##URL##", url);
  140. }
  141. virtual void initiate_accept() noexcept {
  142. acceptor.async_accept(peer_sock, [this](auto ec) { this->accept(ec); });
  143. sup->acceptor = &acceptor;
  144. }
  145. virtual void accept(const sys::error_code &ec) noexcept {
  146. LOG_INFO(log, "accept (relay), ec: {}, remote = {}", ec.message(), peer_sock.remote_endpoint());
  147. auto uri = utils::parse("tcp://127.0.0.1:0/");
  148. auto cfg = transport::transport_config_t{{}, uri, *sup, std::move(peer_sock), false};
  149. relay_trans = transport::initiate_stream(cfg);
  150. relay_read();
  151. }
  152. virtual actor_ptr_t create_actor() noexcept {
  153. return sup->create_actor<actor_ptr_t::element_type>()
  154. .timeout(timeout)
  155. .cluster(cluster)
  156. .relay_config(relay_config)
  157. .escalate_failure()
  158. .finish();
  159. }
  160. virtual void on(net::message::connect_request_t &req) noexcept {
  161. auto &uri = req.payload.request_payload.uri;
  162. log->info("requested connect to {}", uri);
  163. auto cfg = transport::transport_config_t{{}, uri, *sup, {}, true};
  164. auto ip = asio::ip::make_address(host);
  165. auto peer_ep = tcp::endpoint(ip, uri->port_number());
  166. auto addresses = std::vector<tcp::endpoint>{peer_ep};
  167. auto addresses_ptr = std::make_shared<decltype(addresses)>(addresses);
  168. auto trans = transport::initiate_stream(cfg);
  169. transport::error_fn_t on_error = [&](auto &ec) { LOG_WARN(log, "active/connect, err: {}", ec.message()); };
  170. using ptr_t = model::intrusive_ptr_t<std::decay_t<decltype(req)>>;
  171. auto ptr = ptr_t(&req);
  172. transport::connect_fn_t on_connect = [ptr, trans, addresses_ptr, this](const tcp::endpoint &ep) {
  173. LOG_INFO(log, "active/connected");
  174. sup->reply_to(*ptr, trans, ep);
  175. };
  176. trans->async_connect(addresses_ptr, on_connect, on_error);
  177. }
  178. void send_relay(const proto::relay::message_t &msg) noexcept {
  179. proto::relay::serialize(msg, relay_tx);
  180. transport::error_fn_t on_error = [&](auto &ec) { LOG_WARN(log, "relay/write, err: {}", ec.message()); };
  181. transport::io_fn_t on_write = [&](size_t bytes) { LOG_TRACE(log, "relay/write, {} bytes", bytes); };
  182. relay_trans->async_send(asio::buffer(relay_tx), on_write, on_error);
  183. }
  184. void on(proto::relay::ping_t &) noexcept {
  185. };
  186. void on(proto::relay::pong_t &) noexcept {
  187. };
  188. void on(proto::relay::join_relay_request_t &) noexcept {
  189. LOG_INFO(log, "join_relay_request_t");
  190. send_relay(proto::relay::response_t{0, "ok"});
  191. };
  192. void on(proto::relay::join_session_request_t &) noexcept {
  193. };
  194. void on(proto::relay::response_t &) noexcept {
  195. };
  196. void on(proto::relay::connect_request_t &) noexcept {
  197. };
  198. void on(proto::relay::session_invitation_t &) noexcept {
  199. };
  200. virtual void on(model::message::contact_update_t &update) noexcept {
  201. auto &diff = *update.payload.diff;
  202. auto r = diff.apply(*cluster);
  203. if (!r) {
  204. LOG_ERROR(log, "error applying diff: {}", r.error().message());
  205. }
  206. r = diff.visit(*this, nullptr);
  207. if (!r) {
  208. LOG_ERROR(log, "error visiting diff: {}", r.error().message());
  209. }
  210. }
  211. void relay_read() noexcept {
  212. transport::error_fn_t on_error = [&](auto &ec) { LOG_WARN(log, "relay/read, err: {}", ec.message()); };
  213. transport::io_fn_t on_read = [&](size_t bytes) {
  214. LOG_TRACE(log, "relay/read, {} bytes", bytes);
  215. auto msg = proto::relay::parse({relay_rx.data(), bytes});
  216. auto wrapped = std::get_if<proto::relay::wrapped_message_t>(&msg);
  217. if (!wrapped) {
  218. LOG_ERROR(log, "relay/read non-message?");
  219. return;
  220. }
  221. std::visit([&](auto &it) { on(it); }, wrapped->message);
  222. };
  223. relay_rx.resize(1500);
  224. auto buff = asio::buffer(relay_rx.data(), relay_rx.size());
  225. relay_trans->async_recv(buff, on_read, on_error);
  226. LOG_TRACE(log, "relay/async recv");
  227. }
  228. config::relay_config_t relay_config;
  229. cluster_ptr_t cluster;
  230. asio::io_context io_ctx;
  231. ra::system_context_asio_t ctx;
  232. acceptor_t acceptor;
  233. supervisor_ptr_t sup;
  234. asio::ip::tcp::endpoint listening_ep;
  235. utils::logger_t log;
  236. asio::ip::tcp::socket peer_sock;
  237. std::string public_relays;
  238. utils::key_pair_t my_keys;
  239. utils::key_pair_t relay_keys;
  240. utils::key_pair_t peer_keys;
  241. model::device_ptr_t my_device;
  242. model::device_ptr_t relay_device;
  243. model::device_ptr_t peer_device;
  244. transport::stream_sp_t relay_trans;
  245. std::string relay_rx;
  246. std::string relay_tx;
  247. std::string session_key;
  248. };
  249. void test_master_connect() {
  250. struct F : fixture_t {
  251. void main() noexcept override {
  252. auto act = create_actor();
  253. io_ctx.run();
  254. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  255. REQUIRE(my_device->get_uris().size() == 1);
  256. CHECK(my_device->get_uris()[0]->scheme() == "relay");
  257. sup->shutdown();
  258. io_ctx.restart();
  259. io_ctx.run();
  260. CHECK(my_device->get_uris().size() == 0);
  261. io_ctx.restart();
  262. io_ctx.run();
  263. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  264. }
  265. void on(model::message::contact_update_t &update) noexcept override {
  266. LOG_INFO(log, "contact_update_t");
  267. fixture_t::on(update);
  268. io_ctx.stop();
  269. }
  270. };
  271. F().run();
  272. }
  273. void test_passive() {
  274. struct F : fixture_t {
  275. void main() noexcept override {
  276. auto act = create_actor();
  277. io_ctx.run();
  278. CHECK(sent);
  279. CHECK(received);
  280. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  281. sup->shutdown();
  282. io_ctx.restart();
  283. io_ctx.run();
  284. CHECK(my_device->get_uris().size() == 0);
  285. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  286. }
  287. void on(model::message::contact_update_t &update) noexcept override {
  288. LOG_INFO(log, "contact_update_t");
  289. fixture_t::on(update);
  290. if (my_device->get_uris().size() == 1 && !sent) {
  291. sent = true;
  292. auto msg = proto::relay::session_invitation_t{
  293. std::string(peer_device->device_id().get_sha256()), session_key, {}, 12345, true};
  294. send_relay(msg);
  295. }
  296. }
  297. outcome::result<void> operator()(const model::diff::contact::relay_connect_request_t &diff,
  298. void *) noexcept override {
  299. CHECK(diff.peer == peer_device->device_id());
  300. CHECK(diff.session_key == session_key);
  301. CHECK(diff.relay.port() == 12345);
  302. CHECK(diff.relay.address().to_string() == "127.0.0.1");
  303. received = true;
  304. io_ctx.stop();
  305. return outcome::success();
  306. }
  307. bool sent = false;
  308. bool received = false;
  309. };
  310. F().run();
  311. }
  312. int _init() {
  313. REGISTER_TEST_CASE(test_master_connect, "test_master_connect", "[relay]");
  314. REGISTER_TEST_CASE(test_passive, "test_passive", "[relay]");
  315. return 1;
  316. }
  317. static int v = _init();