077-initiator.cpp 34 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. // SPDX-FileCopyrightText: 2019-2024 Ivan Baidakou
  3. #include "test-utils.h"
  4. #include "access.h"
  5. #include "utils/tls.h"
  6. #include "utils/format.hpp"
  7. #include "model/cluster.h"
  8. #include "model/messages.h"
  9. #include "net/names.h"
  10. #include "net/initiator_actor.h"
  11. #include "net/resolver_actor.h"
  12. #include "proto/relay_support.h"
  13. #include "transport/stream.h"
  14. #include <rotor/asio.hpp>
  15. using namespace syncspirit;
  16. using namespace syncspirit::test;
  17. using namespace syncspirit::model;
  18. using namespace syncspirit::net;
  19. namespace asio = boost::asio;
  20. namespace sys = boost::system;
  21. namespace r = rotor;
  22. namespace ra = r::asio;
  23. using configure_callback_t = std::function<void(r::plugin::plugin_base_t &)>;
  24. using finish_callback_t = std::function<void()>;
  25. auto timeout = r::pt::time_duration{r::pt::millisec{2000}};
  26. auto host = "127.0.0.1";
  27. struct supervisor_t : ra::supervisor_asio_t {
  28. using ra::supervisor_asio_t::supervisor_asio_t;
  29. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  30. ra::supervisor_asio_t::configure(plugin);
  31. plugin.with_casted<r::plugin::registry_plugin_t>(
  32. [&](auto &p) { p.register_name(names::coordinator, get_address()); });
  33. if (configure_callback) {
  34. configure_callback(plugin);
  35. }
  36. }
  37. void shutdown_finish() noexcept override {
  38. ra::supervisor_asio_t::shutdown_finish();
  39. if (finish_callback) {
  40. finish_callback();
  41. }
  42. }
  43. auto get_state() noexcept { return state; }
  44. finish_callback_t finish_callback;
  45. configure_callback_t configure_callback;
  46. };
  47. using supervisor_ptr_t = r::intrusive_ptr_t<supervisor_t>;
  48. using actor_ptr_t = r::intrusive_ptr_t<initiator_actor_t>;
  49. struct fixture_t {
  50. using acceptor_t = asio::ip::tcp::acceptor;
  51. using ready_ptr_t = r::intrusive_ptr_t<net::message::peer_connected_t>;
  52. using diff_ptr_t = r::intrusive_ptr_t<model::message::model_update_t>;
  53. using diff_msgs_t = std::vector<diff_ptr_t>;
  54. fixture_t() noexcept : ctx(io_ctx), acceptor(io_ctx), peer_sock(io_ctx) {
  55. utils::set_default("trace");
  56. log = utils::get_logger("fixture");
  57. }
  58. virtual void finish() {
  59. acceptor.cancel();
  60. if (peer_trans) {
  61. peer_trans->cancel();
  62. }
  63. }
  64. void run() noexcept {
  65. auto strand = std::make_shared<asio::io_context::strand>(io_ctx);
  66. sup = ctx.create_supervisor<supervisor_t>().strand(strand).timeout(timeout).create_registry().finish();
  67. sup->configure_callback = [&](r::plugin::plugin_base_t &plugin) {
  68. plugin.template with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  69. using connected_t = typename ready_ptr_t::element_type;
  70. using diff_t = typename diff_ptr_t::element_type;
  71. p.subscribe_actor(r::lambda<connected_t>([&](connected_t &msg) {
  72. connected_message = &msg;
  73. LOG_INFO(log, "received message::peer_connected_t");
  74. }));
  75. p.subscribe_actor(r::lambda<diff_t>([&](diff_t &msg) {
  76. diff_msgs.emplace_back(&msg);
  77. LOG_INFO(log, "received diff message");
  78. }));
  79. });
  80. };
  81. sup->finish_callback = [&]() { finish(); };
  82. sup->start();
  83. sup->create_actor<resolver_actor_t>().resolve_timeout(timeout / 2).timeout(timeout).finish();
  84. sup->do_process();
  85. my_keys = utils::generate_pair("me").value();
  86. peer_keys = utils::generate_pair("peer").value();
  87. auto md = model::device_id_t::from_cert(my_keys.cert_data).value();
  88. auto pd = model::device_id_t::from_cert(peer_keys.cert_data).value();
  89. my_device = device_t::create(md, "my-device").value();
  90. peer_device = device_t::create(pd, "peer-device").value();
  91. auto ep = asio::ip::tcp::endpoint(asio::ip::make_address(host), 0);
  92. acceptor.open(ep.protocol());
  93. acceptor.bind(ep);
  94. acceptor.listen();
  95. listening_ep = acceptor.local_endpoint();
  96. peer_uri = utils::parse(get_uri(listening_ep));
  97. log->debug("listening on {}", peer_uri);
  98. initiate_accept();
  99. cluster = new cluster_t(my_device, 1, 1);
  100. cluster->get_devices().put(my_device);
  101. cluster->get_devices().put(peer_device);
  102. main();
  103. }
  104. virtual void initiate_accept() noexcept {
  105. acceptor.async_accept(peer_sock, [this](auto ec) { this->accept(ec); });
  106. }
  107. virtual std::string get_uri(const asio::ip::tcp::endpoint &) noexcept {
  108. return fmt::format("tcp://{}", listening_ep);
  109. }
  110. virtual void accept(const sys::error_code &ec) noexcept {
  111. LOG_INFO(log, "accept, ec: {}", ec.message());
  112. peer_trans = transport::initiate_tls_passive(*sup, peer_keys, std::move(peer_sock));
  113. initiate_peer_handshake();
  114. }
  115. virtual void initiate_peer_handshake() noexcept {
  116. transport::handshake_fn_t handshake_fn = [this](bool valid_peer, utils::x509_t &, const tcp::endpoint &,
  117. const model::device_id_t *) {
  118. valid_handshake = valid_peer;
  119. on_peer_handshake();
  120. };
  121. transport::error_fn_t on_error = [](const auto &) {};
  122. peer_trans->async_handshake(handshake_fn, on_error);
  123. }
  124. virtual void on_peer_handshake() noexcept { LOG_INFO(log, "peer handshake"); }
  125. void initiate_active() noexcept {
  126. auto ip = asio::ip::make_address(host);
  127. auto ep = tcp::endpoint(ip, listening_ep.port());
  128. auto addresses = std::vector<tcp::endpoint>{ep};
  129. peer_trans = transport::initiate_tls_active(*sup, peer_keys, my_device->device_id(), peer_uri);
  130. transport::error_fn_t on_error = [&](auto &ec) {
  131. LOG_WARN(log, "initiate_active/connect, err: {}", ec.message());
  132. };
  133. transport::connect_fn_t on_connect = [&](auto) {
  134. LOG_INFO(log, "initiate_active/peer connect");
  135. active_connect();
  136. };
  137. peer_trans->async_connect(addresses, on_connect, on_error);
  138. }
  139. virtual void active_connect() {
  140. LOG_TRACE(log, "active_connect");
  141. transport::handshake_fn_t handshake_fn = [this](bool, utils::x509_t &, const tcp::endpoint &,
  142. const model::device_id_t *) {
  143. valid_handshake = true;
  144. LOG_INFO(log, "test_passive_success/peer handshake");
  145. };
  146. transport::error_fn_t on_hs_error = [&](const auto &ec) {
  147. LOG_WARN(log, "test_passive_success/peer handshake, err: {}", ec.message());
  148. };
  149. peer_trans->async_handshake(handshake_fn, on_hs_error);
  150. }
  151. virtual void main() noexcept {}
  152. virtual actor_ptr_t create_actor() noexcept {
  153. return sup->create_actor<initiator_actor_t>()
  154. .timeout(timeout)
  155. .peer_device_id(peer_device->device_id())
  156. .relay_session(relay_session)
  157. .relay_enabled(true)
  158. .uris(utils::uri_container_t{peer_uri})
  159. .cluster(use_model ? cluster : nullptr)
  160. .sink(sup->get_address())
  161. .ssl_pair(&my_keys)
  162. .router(*sup)
  163. .escalate_failure()
  164. .finish();
  165. }
  166. virtual actor_ptr_t create_passive_actor() noexcept {
  167. return sup->create_actor<initiator_actor_t>()
  168. .timeout(timeout)
  169. .sock(std::move(peer_sock))
  170. .ssl_pair(&my_keys)
  171. .router(*sup)
  172. .cluster(cluster)
  173. .sink(sup->get_address())
  174. .escalate_failure()
  175. .finish();
  176. }
  177. cluster_ptr_t cluster;
  178. asio::io_context io_ctx{1};
  179. ra::system_context_asio_t ctx;
  180. acceptor_t acceptor;
  181. supervisor_ptr_t sup;
  182. asio::ip::tcp::endpoint listening_ep;
  183. utils::logger_t log;
  184. asio::ip::tcp::socket peer_sock;
  185. config::bep_config_t bep_config;
  186. utils::key_pair_t my_keys;
  187. utils::key_pair_t peer_keys;
  188. utils::uri_ptr_t peer_uri;
  189. model::device_ptr_t my_device;
  190. model::device_ptr_t peer_device;
  191. transport::stream_sp_t peer_trans;
  192. ready_ptr_t connected_message;
  193. diff_msgs_t diff_msgs;
  194. std::string relay_session;
  195. bool use_model = true;
  196. bool valid_handshake = false;
  197. };
  198. void test_connect_timeout() {
  199. struct F : fixture_t {
  200. void initiate_accept() noexcept override {}
  201. void main() noexcept override {
  202. auto act = create_actor();
  203. io_ctx.run();
  204. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  205. CHECK(!connected_message);
  206. }
  207. };
  208. F().run();
  209. }
  210. void test_connect_unsupported_proto() {
  211. struct F : fixture_t {
  212. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override {
  213. return fmt::format("xxx://{}", listening_ep);
  214. }
  215. void main() noexcept override {
  216. create_actor();
  217. io_ctx.run();
  218. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  219. CHECK(!connected_message);
  220. }
  221. };
  222. F().run();
  223. }
  224. void test_handshake_timeout() {
  225. struct F : fixture_t {
  226. void accept(const sys::error_code &ec) noexcept override { LOG_INFO(log, "accept (ignoring)", ec.message()); }
  227. void main() noexcept override {
  228. auto act = create_actor();
  229. io_ctx.run();
  230. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  231. CHECK(!connected_message);
  232. REQUIRE(diff_msgs.size() == 2);
  233. CHECK(diff_msgs[0]->payload.diff->apply(*cluster));
  234. CHECK(peer_device->get_state() == device_state_t::dialing);
  235. CHECK(diff_msgs[1]->payload.diff->apply(*cluster));
  236. CHECK(peer_device->get_state() == device_state_t::offline);
  237. }
  238. };
  239. F().run();
  240. }
  241. void test_handshake_garbage() {
  242. struct F : fixture_t {
  243. void accept(const sys::error_code &) noexcept override {
  244. auto buff = asio::buffer("garbage-garbage-garbage");
  245. peer_sock.write_some(buff);
  246. }
  247. void main() noexcept override {
  248. auto act = create_actor();
  249. io_ctx.run();
  250. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  251. CHECK(!connected_message);
  252. REQUIRE(diff_msgs.size() == 2);
  253. CHECK(diff_msgs[0]->payload.diff->apply(*cluster));
  254. CHECK(peer_device->get_state() == device_state_t::dialing);
  255. CHECK(diff_msgs[1]->payload.diff->apply(*cluster));
  256. CHECK(peer_device->get_state() == device_state_t::offline);
  257. }
  258. };
  259. F().run();
  260. }
  261. void test_connection_refused() {
  262. struct F : fixture_t {
  263. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override {
  264. return fmt::format("tcp://{}:0", host);
  265. }
  266. void main() noexcept override {
  267. auto act = create_actor();
  268. io_ctx.run();
  269. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  270. CHECK(!connected_message);
  271. }
  272. };
  273. F().run();
  274. }
  275. void test_connection_refused_no_model() {
  276. struct F : fixture_t {
  277. F() { use_model = false; }
  278. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override {
  279. return fmt::format("tcp://{}:0", host);
  280. }
  281. void main() noexcept override {
  282. auto act = create_actor();
  283. io_ctx.run();
  284. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  285. CHECK(!connected_message);
  286. }
  287. };
  288. F().run();
  289. }
  290. void test_resolve_failure() {
  291. struct F : fixture_t {
  292. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override { return "tcp://x.example.com"; }
  293. void main() noexcept override {
  294. auto act = create_actor();
  295. io_ctx.run();
  296. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  297. CHECK(!connected_message);
  298. }
  299. };
  300. F().run();
  301. }
  302. void test_success() {
  303. struct F : fixture_t {
  304. void main() noexcept override {
  305. auto act = create_actor();
  306. io_ctx.run();
  307. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  308. REQUIRE(connected_message);
  309. CHECK(connected_message->payload.proto == "tcp");
  310. CHECK(connected_message->payload.peer_device_id == peer_device->device_id());
  311. CHECK(valid_handshake);
  312. sup->do_shutdown();
  313. sup->do_process();
  314. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  315. REQUIRE(diff_msgs.size() == 1);
  316. CHECK(diff_msgs[0]->payload.diff->apply(*cluster));
  317. CHECK(peer_device->get_state() == device_state_t::dialing);
  318. }
  319. };
  320. F().run();
  321. }
  322. void test_success_no_model() {
  323. struct F : fixture_t {
  324. F() { use_model = false; }
  325. void main() noexcept override {
  326. auto act = create_actor();
  327. io_ctx.run();
  328. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  329. CHECK(connected_message);
  330. CHECK(connected_message->payload.peer_device_id == peer_device->device_id());
  331. CHECK(valid_handshake);
  332. sup->do_shutdown();
  333. sup->do_process();
  334. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  335. REQUIRE(diff_msgs.size() == 0);
  336. }
  337. };
  338. F().run();
  339. }
  340. struct passive_fixture_t : fixture_t {
  341. actor_ptr_t act;
  342. bool active_connect_invoked = false;
  343. void active_connect() override {
  344. LOG_TRACE(log, "active_connect");
  345. if (!act || active_connect_invoked) {
  346. return;
  347. }
  348. active_connect_invoked = true;
  349. active_connect_impl();
  350. }
  351. virtual void active_connect_impl() { fixture_t::active_connect(); }
  352. void accept(const sys::error_code &ec) noexcept override {
  353. LOG_INFO(log, "test_passive_success/accept, ec: {}", ec.message());
  354. act = create_passive_actor();
  355. sup->do_process();
  356. active_connect();
  357. }
  358. };
  359. void test_passive_success() {
  360. struct F : passive_fixture_t {
  361. void main() noexcept override {
  362. initiate_active();
  363. io_ctx.run();
  364. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  365. REQUIRE(connected_message);
  366. CHECK(connected_message->payload.proto == "tcp");
  367. CHECK(connected_message->payload.peer_device_id == peer_device->device_id());
  368. CHECK(valid_handshake);
  369. sup->do_shutdown();
  370. sup->do_process();
  371. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  372. }
  373. };
  374. F().run();
  375. }
  376. void test_passive_garbage() {
  377. struct F : passive_fixture_t {
  378. tcp::socket client_sock;
  379. tcp::resolver::results_type addresses;
  380. F() : client_sock{io_ctx} {}
  381. void active_connect_impl() noexcept override {
  382. tcp::resolver resolver(io_ctx);
  383. addresses = resolver.resolve(host, std::to_string(listening_ep.port()));
  384. asio::async_connect(client_sock, addresses.begin(), addresses.end(), [&](auto ec, auto) {
  385. LOG_INFO(log, "test_passive_garbage/peer connect, ec: {}", ec.message());
  386. auto buff = asio::buffer("garbage-garbage-garbage");
  387. client_sock.write_some(buff);
  388. sup->do_process();
  389. });
  390. }
  391. void main() noexcept override {
  392. initiate_active();
  393. io_ctx.run();
  394. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  395. CHECK(!connected_message);
  396. }
  397. };
  398. F().run();
  399. }
  400. void test_passive_timeout() {
  401. struct F : passive_fixture_t {
  402. void active_connect() noexcept override { LOG_INFO(log, "test_passive_timeout/active_connect NOOP"); }
  403. void main() noexcept override {
  404. initiate_active();
  405. io_ctx.run();
  406. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  407. CHECK(!connected_message);
  408. }
  409. };
  410. F().run();
  411. }
  412. struct passive_relay_fixture_t : fixture_t {
  413. std::string rx_buff;
  414. bool initiate_handshake = true;
  415. passive_relay_fixture_t() {
  416. relay_session = "relay-session-key";
  417. rx_buff.resize(128);
  418. }
  419. void on_read(size_t bytes) noexcept {
  420. LOG_TRACE(log, "read (relay/passive), {} bytes", bytes);
  421. auto r = proto::relay::parse({rx_buff.data(), bytes});
  422. auto &wrapped = std::get<proto::relay::wrapped_message_t>(r);
  423. auto &msg = std::get<proto::relay::join_session_request_t>(wrapped.message);
  424. CHECK(msg.key == relay_session);
  425. relay_reply();
  426. }
  427. virtual void on_write(size_t bytes) noexcept {
  428. LOG_TRACE(log, "write (relay/passive), {} bytes", bytes);
  429. if (initiate_handshake) {
  430. auto upgradeable = static_cast<transport::upgradeable_stream_base_t *>(peer_trans.get());
  431. auto ssl = transport::ssl_junction_t{my_device->device_id(), &peer_keys, false, "bep"};
  432. peer_trans = upgradeable->upgrade(ssl, true);
  433. initiate_peer_handshake();
  434. }
  435. }
  436. virtual void relay_reply() noexcept { write(proto::relay::response_t{0, "success"}); }
  437. virtual void write(const proto::relay::message_t &msg) noexcept {
  438. proto::relay::serialize(msg, rx_buff);
  439. transport::error_fn_t err_fn([&](auto ec) { log->error("(relay/passive), read_err: {}", ec.message()); });
  440. transport::io_fn_t write_fn = [this](size_t bytes) { on_write(bytes); };
  441. peer_trans->async_send(asio::buffer(rx_buff), write_fn, err_fn);
  442. }
  443. void accept(const sys::error_code &ec) noexcept override {
  444. LOG_INFO(log, "accept (relay/passive), ec: {}", ec.message());
  445. auto uri = utils::parse("tcp://127.0.0.1:0/");
  446. auto cfg = transport::transport_config_t{{}, uri, *sup, std::move(peer_sock), false};
  447. peer_trans = transport::initiate_stream(cfg);
  448. transport::error_fn_t read_err_fn([&](auto ec) { log->error("(relay/passive), read_err: {}", ec.message()); });
  449. transport::io_fn_t read_fn = [this](size_t bytes) { on_read(bytes); };
  450. peer_trans->async_recv(asio::buffer(rx_buff), read_fn, read_err_fn);
  451. }
  452. };
  453. void test_relay_passive_success() {
  454. struct F : passive_relay_fixture_t {
  455. void main() noexcept override {
  456. auto act = create_actor();
  457. io_ctx.run();
  458. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  459. REQUIRE(connected_message);
  460. CHECK(connected_message->payload.proto == "relay");
  461. CHECK(connected_message->payload.peer_device_id == peer_device->device_id());
  462. CHECK(valid_handshake);
  463. sup->do_shutdown();
  464. sup->do_process();
  465. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  466. CHECK(diff_msgs.size() == 0);
  467. }
  468. };
  469. F().run();
  470. }
  471. void test_relay_passive_garbage() {
  472. struct F : passive_relay_fixture_t {
  473. void write(const proto::relay::message_t &) noexcept override {
  474. rx_buff = "garbage-garbage-garbage";
  475. initiate_handshake = false;
  476. transport::error_fn_t err_fn([&](auto ec) { log->error("(relay/passive), read_err: {}", ec.message()); });
  477. transport::io_fn_t write_fn = [this](size_t bytes) { on_write(bytes); };
  478. peer_trans->async_send(asio::buffer(rx_buff), write_fn, err_fn);
  479. }
  480. void main() noexcept override {
  481. create_actor();
  482. io_ctx.run();
  483. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  484. CHECK(!connected_message);
  485. CHECK(!valid_handshake);
  486. sup->do_shutdown();
  487. sup->do_process();
  488. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  489. CHECK(diff_msgs.size() == 0);
  490. }
  491. };
  492. F().run();
  493. }
  494. void test_relay_passive_wrong_message() {
  495. struct F : passive_relay_fixture_t {
  496. void relay_reply() noexcept override { write(proto::relay::pong_t{}); }
  497. void main() noexcept override {
  498. initiate_handshake = false;
  499. auto act = create_actor();
  500. io_ctx.run();
  501. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  502. CHECK(!connected_message);
  503. CHECK(!valid_handshake);
  504. sup->do_shutdown();
  505. sup->do_process();
  506. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  507. CHECK(diff_msgs.size() == 0);
  508. }
  509. };
  510. F().run();
  511. }
  512. void test_relay_passive_unsuccessful_join() {
  513. struct F : passive_relay_fixture_t {
  514. void relay_reply() noexcept override { write(proto::relay::response_t{5, "some-fail-reason"}); }
  515. void main() noexcept override {
  516. initiate_handshake = false;
  517. auto act = create_actor();
  518. io_ctx.run();
  519. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  520. CHECK(!connected_message);
  521. CHECK(!valid_handshake);
  522. sup->do_shutdown();
  523. sup->do_process();
  524. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  525. CHECK(diff_msgs.size() == 0);
  526. }
  527. };
  528. F().run();
  529. }
  530. void test_relay_malformed_uri() {
  531. struct F : fixture_t {
  532. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override {
  533. return fmt::format("relay://{}", listening_ep);
  534. }
  535. void main() noexcept override {
  536. auto act = create_actor();
  537. io_ctx.run();
  538. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  539. CHECK(!connected_message);
  540. CHECK(!valid_handshake);
  541. sup->do_shutdown();
  542. sup->do_process();
  543. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  544. CHECK(diff_msgs.size() == 2);
  545. }
  546. };
  547. F().run();
  548. }
  549. void test_relay_active_wrong_relay_device_id() {
  550. struct F : fixture_t {
  551. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override {
  552. return fmt::format("relay://{}?id={}", listening_ep, my_device->device_id().get_value());
  553. }
  554. void main() noexcept override {
  555. auto act = create_actor();
  556. io_ctx.run();
  557. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  558. CHECK(!connected_message);
  559. CHECK(!valid_handshake);
  560. sup->do_shutdown();
  561. sup->do_process();
  562. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  563. CHECK(diff_msgs.size() == 2);
  564. }
  565. };
  566. F().run();
  567. }
  568. struct active_relay_fixture_t : fixture_t {
  569. utils::key_pair_t relay_keys;
  570. model::device_id_t relay_device;
  571. std::string rx_buff;
  572. std::string session_key = "lorem-session-dolor";
  573. transport::stream_sp_t relay_trans;
  574. bool session_mode = false;
  575. active_relay_fixture_t() {
  576. relay_keys = utils::generate_pair("relay").value();
  577. relay_device = model::device_id_t::from_cert(relay_keys.cert_data).value();
  578. rx_buff.resize(128);
  579. }
  580. std::string get_uri(const asio::ip::tcp::endpoint &) noexcept override {
  581. return fmt::format("relay://{}?id={}", listening_ep, relay_device.get_value());
  582. }
  583. void accept(const sys::error_code &ec) noexcept override {
  584. LOG_INFO(log, "relay/accept, ec: {}", ec.message());
  585. if (!session_mode) {
  586. relay_trans = transport::initiate_tls_passive(*sup, relay_keys, std::move(peer_sock));
  587. transport::handshake_fn_t handshake_fn = [this](bool valid_peer, utils::x509_t &, const tcp::endpoint &,
  588. const model::device_id_t *) {
  589. valid_handshake = valid_peer;
  590. on_relay_handshake();
  591. };
  592. transport::error_fn_t on_error = [](const auto &) {};
  593. relay_trans->async_handshake(handshake_fn, on_error);
  594. return;
  595. }
  596. auto uri = utils::parse("tcp://127.0.0.1:0/");
  597. auto cfg = transport::transport_config_t{{}, uri, *sup, std::move(peer_sock), false};
  598. peer_trans = transport::initiate_stream(cfg);
  599. transport::error_fn_t read_err_fn([&](auto ec) { log->error("(relay/active), read_err: {}", ec.message()); });
  600. transport::io_fn_t read_fn = [this](size_t bytes) { on_read_peer(bytes); };
  601. peer_trans->async_recv(asio::buffer(rx_buff), read_fn, read_err_fn);
  602. }
  603. virtual void on_relay_handshake() noexcept {
  604. transport::error_fn_t read_err_fn([&](auto ec) { log->error("(relay/active), read_err: {}", ec.message()); });
  605. transport::io_fn_t read_fn = [this](size_t bytes) { on_read(bytes); };
  606. relay_trans->async_recv(asio::buffer(rx_buff), read_fn, read_err_fn);
  607. }
  608. virtual void relay_reply() noexcept {
  609. write(relay_trans, proto::relay::session_invitation_t{std::string(peer_device->device_id().get_sha256()),
  610. session_key, "", listening_ep.port(), false});
  611. }
  612. virtual void session_reply() noexcept { write(peer_trans, proto::relay::response_t{0, "ok"}); }
  613. virtual void write(transport::stream_sp_t &stream, const proto::relay::message_t &msg) noexcept {
  614. proto::relay::serialize(msg, rx_buff);
  615. transport::error_fn_t err_fn([&](auto ec) { log->error("(relay/passive), read_err: {}", ec.message()); });
  616. transport::io_fn_t write_fn = [this](size_t bytes) { on_write(bytes); };
  617. stream->async_send(asio::buffer(rx_buff), write_fn, err_fn);
  618. }
  619. virtual void on_read_peer(size_t bytes) {
  620. log->debug("(relay/active) read peer {} bytes", bytes);
  621. auto r = proto::relay::parse({rx_buff.data(), bytes});
  622. auto &wrapped = std::get<proto::relay::wrapped_message_t>(r);
  623. auto &msg = std::get<proto::relay::join_session_request_t>(wrapped.message);
  624. CHECK(msg.key == session_key);
  625. session_reply();
  626. }
  627. virtual void on_read(size_t bytes) {
  628. log->debug("(relay/active) read {} bytes", bytes);
  629. auto r = proto::relay::parse({rx_buff.data(), bytes});
  630. auto &wrapped = std::get<proto::relay::wrapped_message_t>(r);
  631. auto &msg = std::get<proto::relay::connect_request_t>(wrapped.message);
  632. CHECK(msg.device_id == peer_device->device_id().get_sha256());
  633. relay_reply();
  634. }
  635. virtual void on_write(size_t bytes) {
  636. log->debug("(relay/active) write {} bytes", bytes);
  637. if (!session_mode) {
  638. acceptor.async_accept(peer_sock, [this](auto ec) { this->accept(ec); });
  639. session_mode = true;
  640. } else {
  641. auto upgradeable = static_cast<transport::upgradeable_stream_base_t *>(peer_trans.get());
  642. auto ssl = transport::ssl_junction_t{my_device->device_id(), &peer_keys, false, "bep"};
  643. peer_trans = upgradeable->upgrade(ssl, false);
  644. initiate_peer_handshake();
  645. }
  646. }
  647. };
  648. void test_relay_active_success() {
  649. struct F : active_relay_fixture_t {
  650. void main() noexcept override {
  651. auto act = create_actor();
  652. io_ctx.run();
  653. CHECK(sup->get_state() == r::state_t::OPERATIONAL);
  654. REQUIRE(connected_message);
  655. CHECK(connected_message->payload.proto == "relay");
  656. CHECK(connected_message->payload.peer_device_id == peer_device->device_id());
  657. CHECK(valid_handshake);
  658. sup->do_shutdown();
  659. sup->do_process();
  660. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  661. REQUIRE(diff_msgs.size() == 1);
  662. CHECK(diff_msgs[0]->payload.diff->apply(*cluster));
  663. CHECK(peer_device->get_state() == device_state_t::dialing);
  664. }
  665. };
  666. F().run();
  667. }
  668. void test_relay_active_not_enabled() {
  669. struct F : active_relay_fixture_t {
  670. actor_ptr_t create_actor() noexcept override {
  671. return sup->create_actor<initiator_actor_t>()
  672. .timeout(timeout)
  673. .peer_device_id(peer_device->device_id())
  674. .relay_session(relay_session)
  675. .uris({peer_uri})
  676. .cluster(use_model ? cluster : nullptr)
  677. .sink(sup->get_address())
  678. .ssl_pair(&my_keys)
  679. .router(*sup)
  680. .escalate_failure()
  681. .finish();
  682. }
  683. void main() noexcept override {
  684. auto act = create_actor();
  685. io_ctx.run();
  686. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  687. sup->do_shutdown();
  688. sup->do_process();
  689. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  690. CHECK(peer_device->get_state() == device_state_t::offline);
  691. }
  692. };
  693. F().run();
  694. }
  695. void test_relay_wrong_device() {
  696. struct F : active_relay_fixture_t {
  697. void relay_reply() noexcept override {
  698. write(relay_trans, proto::relay::session_invitation_t{std::string(relay_device.get_sha256()), session_key,
  699. "", listening_ep.port(), false});
  700. }
  701. void on_write(size_t) override {}
  702. void main() noexcept override {
  703. auto act = create_actor();
  704. io_ctx.run();
  705. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  706. CHECK(!connected_message);
  707. CHECK(valid_handshake);
  708. sup->do_shutdown();
  709. sup->do_process();
  710. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  711. CHECK(diff_msgs.size() == 2);
  712. }
  713. };
  714. F().run();
  715. }
  716. void test_relay_non_connectable() {
  717. struct F : active_relay_fixture_t {
  718. void relay_reply() noexcept override {
  719. write(relay_trans, proto::relay::session_invitation_t{std::string(peer_device->device_id().get_sha256()),
  720. session_key, "", 0, false});
  721. }
  722. void main() noexcept override {
  723. auto act = create_actor();
  724. io_ctx.run();
  725. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  726. CHECK(!connected_message);
  727. sup->do_shutdown();
  728. sup->do_process();
  729. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  730. CHECK(diff_msgs.size() == 2);
  731. }
  732. };
  733. F().run();
  734. }
  735. void test_relay_malformed_address() {
  736. struct F : active_relay_fixture_t {
  737. void relay_reply() noexcept override {
  738. write(relay_trans, proto::relay::session_invitation_t{std::string(peer_device->device_id().get_sha256()),
  739. session_key, "8.8.8.8z", listening_ep.port(), false});
  740. }
  741. void main() noexcept override {
  742. auto act = create_actor();
  743. io_ctx.run();
  744. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  745. CHECK(!connected_message);
  746. sup->do_shutdown();
  747. sup->do_process();
  748. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  749. CHECK(diff_msgs.size() == 2);
  750. }
  751. };
  752. F().run();
  753. }
  754. void test_relay_garbage_reply() {
  755. struct F : active_relay_fixture_t {
  756. void write(transport::stream_sp_t &stream, const proto::relay::message_t &) noexcept override {
  757. rx_buff = "garbage-garbage-garbage";
  758. transport::error_fn_t err_fn([&](auto ec) { log->error("(relay/passive), read_err: {}", ec.message()); });
  759. transport::io_fn_t write_fn = [this](size_t bytes) { on_write(bytes); };
  760. stream->async_send(asio::buffer(rx_buff), write_fn, err_fn);
  761. }
  762. void on_write(size_t) override {}
  763. void main() noexcept override {
  764. auto act = create_actor();
  765. io_ctx.run();
  766. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  767. CHECK(!connected_message);
  768. sup->do_shutdown();
  769. sup->do_process();
  770. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  771. CHECK(diff_msgs.size() == 2);
  772. }
  773. };
  774. F().run();
  775. }
  776. void test_relay_non_invitation_reply() {
  777. struct F : active_relay_fixture_t {
  778. void relay_reply() noexcept override { write(relay_trans, proto::relay::pong_t{}); }
  779. void on_write(size_t) override {}
  780. void main() noexcept override {
  781. auto act = create_actor();
  782. io_ctx.run();
  783. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  784. CHECK(!connected_message);
  785. sup->do_shutdown();
  786. sup->do_process();
  787. CHECK(sup->get_state() == r::state_t::SHUT_DOWN);
  788. CHECK(diff_msgs.size() == 2);
  789. }
  790. };
  791. F().run();
  792. }
  793. int _init() {
  794. REGISTER_TEST_CASE(test_connect_unsupported_proto, "test_connect_unsupported_proto", "[initiator]");
  795. REGISTER_TEST_CASE(test_connect_timeout, "test_connect_timeout", "[initiator]");
  796. REGISTER_TEST_CASE(test_handshake_timeout, "test_handshake_timeout", "[initiator]");
  797. REGISTER_TEST_CASE(test_handshake_garbage, "test_handshake_garbage", "[initiator]");
  798. REGISTER_TEST_CASE(test_connection_refused, "test_connection_refused", "[initiator]");
  799. REGISTER_TEST_CASE(test_connection_refused_no_model, "test_connection_refused_no_model", "[initiator]");
  800. REGISTER_TEST_CASE(test_resolve_failure, "test_resolve_failure", "[initiator]");
  801. REGISTER_TEST_CASE(test_success, "test_success", "[initiator]");
  802. REGISTER_TEST_CASE(test_success_no_model, "test_success_no_model", "[initiator]");
  803. REGISTER_TEST_CASE(test_passive_success, "test_passive_success", "[initiator]");
  804. REGISTER_TEST_CASE(test_passive_garbage, "test_passive_garbage", "[initiator]");
  805. REGISTER_TEST_CASE(test_passive_timeout, "test_passive_timeout", "[initiator]");
  806. REGISTER_TEST_CASE(test_relay_passive_success, "test_relay_passive_success", "[initiator]");
  807. REGISTER_TEST_CASE(test_relay_passive_garbage, "test_relay_passive_garbage", "[initiator]");
  808. REGISTER_TEST_CASE(test_relay_passive_wrong_message, "test_relay_passive_wrong_message", "[initiator]");
  809. REGISTER_TEST_CASE(test_relay_passive_unsuccessful_join, "test_relay_passive_unsuccessful_join", "[initiator]");
  810. REGISTER_TEST_CASE(test_relay_malformed_uri, "test_relay_malformed_uri", "[initiator]");
  811. REGISTER_TEST_CASE(test_relay_active_wrong_relay_device_id, "test_relay_active_wrong_relay_device_id",
  812. "[initiator]");
  813. REGISTER_TEST_CASE(test_relay_active_success, "test_relay_active_success", "[initiator]");
  814. REGISTER_TEST_CASE(test_relay_active_not_enabled, "test_relay_active_not_enabled", "[initiator]");
  815. REGISTER_TEST_CASE(test_relay_wrong_device, "test_relay_wrong_device", "[initiator]");
  816. REGISTER_TEST_CASE(test_relay_non_connectable, "test_relay_non_connectable", "[initiator]");
  817. REGISTER_TEST_CASE(test_relay_malformed_address, "test_relay_malformed_address", "[initiator]");
  818. REGISTER_TEST_CASE(test_relay_garbage_reply, "test_relay_garbage_reply", "[initiator]");
  819. REGISTER_TEST_CASE(test_relay_non_invitation_reply, "test_relay_non_invitation_reply", "[initiator]");
  820. return 1;
  821. }
  822. static int v = _init();