075-controller.cpp 56 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. // SPDX-FileCopyrightText: 2019-2024 Ivan Baidakou
  3. #include "test-utils.h"
  4. #include "access.h"
  5. #include "test_supervisor.h"
  6. #include "model/cluster.h"
  7. #include "diff-builder.h"
  8. #include "hasher/hasher_proxy_actor.h"
  9. #include "hasher/hasher_actor.h"
  10. #include "net/controller_actor.h"
  11. #include "net/names.h"
  12. #include "fs/messages.h"
  13. #include "utils/error_code.h"
  14. #include "proto/bep_support.h"
  15. #include <boost/core/demangle.hpp>
  16. using namespace syncspirit;
  17. using namespace syncspirit::test;
  18. using namespace syncspirit::model;
  19. using namespace syncspirit::net;
  20. using namespace syncspirit::hasher;
  21. namespace {
  22. struct sample_peer_config_t : public r::actor_config_t {
  23. model::device_id_t peer_device_id;
  24. };
  25. template <typename Actor> struct sample_peer_config_builder_t : r::actor_config_builder_t<Actor> {
  26. using builder_t = typename Actor::template config_builder_t<Actor>;
  27. using parent_t = r::actor_config_builder_t<Actor>;
  28. using parent_t::parent_t;
  29. builder_t &&peer_device_id(const model::device_id_t &value) && noexcept {
  30. parent_t::config.peer_device_id = value;
  31. return std::move(*static_cast<typename parent_t::builder_t *>(this));
  32. }
  33. };
  34. struct sample_peer_t : r::actor_base_t {
  35. using config_t = sample_peer_config_t;
  36. template <typename Actor> using config_builder_t = sample_peer_config_builder_t<Actor>;
  37. using remote_message_t = r::intrusive_ptr_t<net::message::forwarded_message_t>;
  38. using remote_messages_t = std::list<remote_message_t>;
  39. struct block_response_t {
  40. std::string name;
  41. size_t block_index;
  42. std::string data;
  43. sys::error_code ec;
  44. };
  45. using block_responses_t = std::list<block_response_t>;
  46. using block_request_t = r::intrusive_ptr_t<net::message::block_request_t>;
  47. using block_requests_t = std::list<block_request_t>;
  48. using uploaded_blocks_t = std::list<proto::message::Response>;
  49. sample_peer_t(config_t &config) : r::actor_base_t{config}, peer_device{config.peer_device_id} {
  50. log = utils::get_logger("test.sample_peer");
  51. }
  52. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  53. r::actor_base_t::configure(plugin);
  54. plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) { p.set_identity("sample_peer", false); });
  55. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  56. p.subscribe_actor(&sample_peer_t::on_start_reading);
  57. p.subscribe_actor(&sample_peer_t::on_termination);
  58. p.subscribe_actor(&sample_peer_t::on_transfer);
  59. p.subscribe_actor(&sample_peer_t::on_block_request);
  60. });
  61. }
  62. void shutdown_start() noexcept override {
  63. LOG_TRACE(log, "{}, shutdown_start", identity);
  64. if (controller) {
  65. send<net::payload::termination_t>(controller, shutdown_reason);
  66. }
  67. r::actor_base_t::shutdown_start();
  68. }
  69. void shutdown_finish() noexcept override {
  70. r::actor_base_t::shutdown_finish();
  71. LOG_TRACE(log, "{}, shutdown_finish, blocks requested = {}", identity, blocks_requested);
  72. if (controller) {
  73. send<net::payload::termination_t>(controller, shutdown_reason);
  74. }
  75. }
  76. void on_start_reading(net::message::start_reading_t &msg) noexcept {
  77. LOG_TRACE(log, "{}, on_start_reading", identity);
  78. controller = msg.payload.controller;
  79. reading = msg.payload.start;
  80. }
  81. void on_termination(net::message::termination_signal_t &msg) noexcept {
  82. LOG_TRACE(log, "{}, on_termination", identity);
  83. if (!shutdown_reason) {
  84. auto &ee = msg.payload.ee;
  85. auto reason = ee->message();
  86. LOG_TRACE(log, "{}, on_termination: {}", identity, reason);
  87. do_shutdown(ee);
  88. }
  89. }
  90. void on_transfer(net::message::transfer_data_t &message) noexcept {
  91. auto &data = message.payload.data;
  92. LOG_TRACE(log, "{}, on_transfer, bytes = {}", identity, data.size());
  93. auto buff = boost::asio::buffer(data.data(), data.size());
  94. auto result = proto::parse_bep(buff);
  95. auto orig = std::move(result.value().message);
  96. auto variant = net::payload::forwarded_message_t();
  97. std::visit(
  98. [&](auto &msg) {
  99. using boost::core::demangle;
  100. using T = std::decay_t<decltype(msg)>;
  101. LOG_TRACE(log, "{}, received '{}' message", identity, demangle(typeid(T).name()));
  102. using V = net::payload::forwarded_message_t;
  103. if constexpr (std::is_constructible_v<V, T>) {
  104. variant = std::move(msg);
  105. } else if constexpr (std::is_same_v<T, proto::message::Response>) {
  106. uploaded_blocks.push_back(std::move(msg));
  107. }
  108. },
  109. orig);
  110. auto fwd_msg = new net::message::forwarded_message_t(address, std::move(variant));
  111. messages.emplace_back(fwd_msg);
  112. }
  113. void process_block_requests() noexcept {
  114. auto condition = [&]() -> bool {
  115. if (block_requests.size() && block_responses.size()) {
  116. auto &req = block_requests.front();
  117. auto &res = block_responses.front();
  118. auto &req_payload = req->payload.request_payload;
  119. if (req_payload.block.block_index() == res.block_index) {
  120. auto &name = res.name;
  121. return name.empty() || name == req_payload.file->get_name();
  122. }
  123. }
  124. return false;
  125. };
  126. while (condition()) {
  127. auto &reply = block_responses.front();
  128. auto &request = *block_requests.front();
  129. log->debug("{}, matched '{}', replying..., ec = {}", identity, reply.name, reply.ec.value());
  130. if (!reply.ec) {
  131. reply_to(request, reply.data);
  132. } else {
  133. reply_with_error(request, make_error(reply.ec));
  134. }
  135. block_responses.pop_front();
  136. block_requests.pop_front();
  137. }
  138. }
  139. void on_block_request(net::message::block_request_t &req) noexcept {
  140. block_requests.push_front(&req);
  141. ++blocks_requested;
  142. log->debug("{}, requesting block # {}", identity,
  143. block_requests.front()->payload.request_payload.block.block_index());
  144. if (block_responses.size()) {
  145. log->debug("{}, top response block # {}", identity, block_responses.front().block_index);
  146. }
  147. process_block_requests();
  148. }
  149. void forward(net::payload::forwarded_message_t payload) noexcept {
  150. send<net::payload::forwarded_message_t>(controller, std::move(payload));
  151. }
  152. static const constexpr size_t next_block = 1000000;
  153. void push_block(std::string_view data, size_t index, std::string name = {}) {
  154. if (index == next_block) {
  155. index = block_responses.size();
  156. }
  157. block_responses.push_back(block_response_t{std::move(name), index, std::string(data), {}});
  158. }
  159. void push_block(sys::error_code ec, size_t index) {
  160. if (index == next_block) {
  161. index = block_responses.size();
  162. }
  163. block_responses.push_back(block_response_t{std::string{}, index, std::string{}, ec});
  164. }
  165. size_t blocks_requested = 0;
  166. bool reading = false;
  167. remote_messages_t messages;
  168. r::address_ptr_t controller;
  169. model::device_id_t peer_device;
  170. utils::logger_t log;
  171. block_requests_t block_requests;
  172. block_responses_t block_responses;
  173. uploaded_blocks_t uploaded_blocks;
  174. };
  175. struct fixture_t {
  176. using peer_ptr_t = r::intrusive_ptr_t<sample_peer_t>;
  177. using target_ptr_t = r::intrusive_ptr_t<net::controller_actor_t>;
  178. using blk_req_t = fs::message::block_request_t;
  179. using blk_req_ptr_t = r::intrusive_ptr_t<blk_req_t>;
  180. using blk_res_t = fs::message::block_response_t;
  181. using blk_res_ptr_t = r::intrusive_ptr_t<blk_res_t>;
  182. using block_requests_t = std::deque<blk_req_ptr_t>;
  183. using block_responses_t = std::deque<r::message_ptr_t>;
  184. fixture_t(bool auto_start_, int64_t max_sequence_, bool auto_share_ = true) noexcept
  185. : auto_start{auto_start_}, max_sequence{max_sequence_}, auto_share{auto_share_} {
  186. utils::set_default("trace");
  187. }
  188. virtual void run() noexcept {
  189. auto peer_id =
  190. device_id_t::from_string("VUV42CZ-IQD5A37-RPEBPM4-VVQK6E4-6WSKC7B-PVJQHHD-4PZD44V-ENC6WAZ").value();
  191. peer_device = device_t::create(peer_id, "peer-device").value();
  192. auto my_id =
  193. device_id_t::from_string("KHQNO2S-5QSILRK-YX4JZZ4-7L77APM-QNVGZJT-EKU7IFI-PNEPBMY-4MXFMQD").value();
  194. my_device = device_t::create(my_id, "my-device").value();
  195. cluster = new cluster_t(my_device, 1);
  196. cluster->get_devices().put(my_device);
  197. cluster->get_devices().put(peer_device);
  198. auto folder_id_1 = "1234-5678";
  199. auto folder_id_2 = "5555";
  200. auto builder = diff_builder_t(*cluster);
  201. auto sha256 = peer_id.get_sha256();
  202. builder.upsert_folder(folder_id_1, "")
  203. .upsert_folder(folder_id_2, "")
  204. .configure_cluster(sha256)
  205. .add(sha256, folder_id_1, 123, max_sequence)
  206. .finish();
  207. REQUIRE(builder.apply());
  208. if (auto_share) {
  209. REQUIRE(builder.share_folder(peer_id.get_sha256(), folder_id_1).apply());
  210. }
  211. r::system_context_t ctx;
  212. sup = ctx.create_supervisor<supervisor_t>().timeout(timeout).create_registry().finish();
  213. sup->cluster = cluster;
  214. sup->configure_callback = [&](r::plugin::plugin_base_t &plugin) {
  215. plugin.template with_casted<r::plugin::registry_plugin_t>(
  216. [&](auto &p) { p.register_name(net::names::fs_actor, sup->get_address()); });
  217. plugin.template with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  218. p.subscribe_actor(r::lambda<blk_req_t>([&](blk_req_t &msg) {
  219. block_requests.push_back(&msg);
  220. if (block_responses.size()) {
  221. sup->put(block_responses.front());
  222. block_responses.pop_front();
  223. }
  224. }));
  225. });
  226. };
  227. sup->start();
  228. sup->do_process();
  229. CHECK(static_cast<r::actor_base_t *>(sup.get())->access<to::state>() == r::state_t::OPERATIONAL);
  230. sup->create_actor<hasher_actor_t>().index(1).timeout(timeout).finish();
  231. sup->create_actor<hasher::hasher_proxy_actor_t>()
  232. .timeout(timeout)
  233. .hasher_threads(1)
  234. .name(net::names::hasher_proxy)
  235. .finish();
  236. peer_actor = sup->create_actor<sample_peer_t>().timeout(timeout).finish();
  237. auto &folders = cluster->get_folders();
  238. folder_1 = folders.by_id(folder_id_1);
  239. folder_2 = folders.by_id(folder_id_2);
  240. folder_1_peer = folder_1->get_folder_infos().by_device_id(peer_id.get_sha256());
  241. target = sup->create_actor<controller_actor_t>()
  242. .peer(peer_device)
  243. .peer_addr(peer_actor->get_address())
  244. .request_pool(1024)
  245. .outgoing_buffer_max(1024'000)
  246. .cluster(cluster)
  247. .sequencer(sup->sequencer)
  248. .timeout(timeout)
  249. .request_timeout(timeout)
  250. .finish();
  251. sup->do_process();
  252. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  253. target_addr = target->get_address();
  254. if (auto_start) {
  255. REQUIRE(peer_actor->reading);
  256. REQUIRE(peer_actor->messages.size() == 1);
  257. auto &msg = (*peer_actor->messages.front()).payload;
  258. REQUIRE(std::get_if<proto::message::ClusterConfig>(&msg));
  259. peer_actor->messages.pop_front();
  260. }
  261. main(builder);
  262. sup->shutdown();
  263. sup->do_process();
  264. CHECK(static_cast<r::actor_base_t *>(sup.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  265. }
  266. virtual void main(diff_builder_t &) noexcept {}
  267. bool auto_start;
  268. bool auto_share;
  269. int64_t max_sequence;
  270. peer_ptr_t peer_actor;
  271. target_ptr_t target;
  272. r::address_ptr_t target_addr;
  273. r::pt::time_duration timeout = r::pt::millisec{10};
  274. cluster_ptr_t cluster;
  275. device_ptr_t peer_device;
  276. device_ptr_t my_device;
  277. r::intrusive_ptr_t<supervisor_t> sup;
  278. r::system_context_t ctx;
  279. model::folder_ptr_t folder_1;
  280. model::folder_info_ptr_t folder_1_peer;
  281. model::folder_ptr_t folder_2;
  282. block_requests_t block_requests;
  283. block_responses_t block_responses;
  284. };
  285. } // namespace
  286. void test_startup() {
  287. struct F : fixture_t {
  288. using fixture_t::fixture_t;
  289. void main(diff_builder_t &) noexcept override {
  290. REQUIRE(peer_actor->reading);
  291. REQUIRE(peer_actor->messages.size() == 1);
  292. auto &msg = (*peer_actor->messages.front()).payload;
  293. REQUIRE(std::get_if<proto::message::ClusterConfig>(&msg));
  294. peer_actor->messages.pop_front();
  295. CHECK(peer_actor->messages.empty());
  296. auto cc = proto::ClusterConfig{};
  297. auto payload = proto::message::ClusterConfig(new proto::ClusterConfig(cc));
  298. peer_actor->forward(std::move(payload));
  299. sup->do_process();
  300. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  301. CHECK(peer_actor->messages.empty());
  302. }
  303. };
  304. F(false, 10, false).run();
  305. }
  306. void test_index_receiving() {
  307. struct F : fixture_t {
  308. using fixture_t::fixture_t;
  309. void main(diff_builder_t &) noexcept override {
  310. auto cc = proto::ClusterConfig{};
  311. auto index = proto::Index{};
  312. SECTION("wrong index") {
  313. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  314. index.set_folder("non-existing-folder");
  315. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  316. sup->do_process();
  317. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  318. CHECK(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  319. }
  320. SECTION("index is applied") {
  321. auto folder = cc.add_folders();
  322. folder->set_id(std::string(folder_1->get_id()));
  323. auto d_peer = folder->add_devices();
  324. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  325. REQUIRE(cluster->get_pending_folders().size() == 0);
  326. d_peer->set_max_sequence(10);
  327. d_peer->set_index_id(folder_1_peer->get_index());
  328. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  329. index.set_folder(std::string(folder_1->get_id()));
  330. auto file = index.add_files();
  331. file->set_name("some-dir");
  332. file->set_type(proto::FileInfoType::DIRECTORY);
  333. file->set_sequence(10);
  334. auto v = file->mutable_version();
  335. auto c = v->add_counters();
  336. c->set_id(peer_device->as_uint());
  337. c->set_value(1);
  338. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  339. sup->do_process();
  340. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  341. CHECK(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::OPERATIONAL);
  342. auto &folder_infos = folder_1->get_folder_infos();
  343. auto folder_peer = folder_infos.by_device(*peer_device);
  344. REQUIRE(folder_peer);
  345. CHECK(folder_peer->get_max_sequence() == 10ul);
  346. REQUIRE(folder_peer->get_file_infos().size() == 1);
  347. CHECK(folder_peer->get_file_infos().begin()->item->get_name() == file->name());
  348. auto folder_my = folder_infos.by_device(*my_device);
  349. REQUIRE(folder_my);
  350. CHECK(folder_my->get_max_sequence() == 1ul);
  351. REQUIRE(folder_my->get_file_infos().size() == 1);
  352. CHECK(folder_my->get_file_infos().begin()->item->get_name() == file->name());
  353. SECTION("then index update is applied") {
  354. auto index_update = proto::IndexUpdate{};
  355. index_update.set_folder(std::string(folder_1->get_id()));
  356. auto file = index_update.add_files();
  357. file->set_name("some-dir-2");
  358. file->set_type(proto::FileInfoType::DIRECTORY);
  359. file->set_sequence(folder_1_peer->get_max_sequence() + 1);
  360. auto v = file->mutable_version();
  361. auto c = v->add_counters();
  362. c->set_id(peer_device->as_uint());
  363. c->set_value(1);
  364. peer_actor->forward(proto::message::IndexUpdate(new proto::IndexUpdate(index_update)));
  365. sup->do_process();
  366. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  367. CHECK(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() ==
  368. r::state_t::OPERATIONAL);
  369. CHECK(folder_peer->get_max_sequence() == file->sequence());
  370. REQUIRE(folder_peer->get_file_infos().size() == 2);
  371. CHECK(folder_peer->get_file_infos().by_name("some-dir-2"));
  372. CHECK(folder_my->get_max_sequence() == 2ul);
  373. REQUIRE(folder_my->get_file_infos().size() == 2);
  374. CHECK(folder_my->get_file_infos().by_name("some-dir-2"));
  375. }
  376. }
  377. }
  378. };
  379. F(true, 10).run();
  380. }
  381. void test_index_sending() {
  382. struct F : fixture_t {
  383. using fixture_t::fixture_t;
  384. void main(diff_builder_t &) noexcept override {
  385. proto::FileInfo pr_file_info;
  386. pr_file_info.set_name("link");
  387. pr_file_info.set_type(proto::FileInfoType::SYMLINK);
  388. pr_file_info.set_symlink_target("/some/where");
  389. auto builder = diff_builder_t(*cluster);
  390. builder.local_update(folder_1->get_id(), pr_file_info);
  391. builder.apply(*sup);
  392. auto folder_1_my = folder_1->get_folder_infos().by_device(*my_device);
  393. auto cc = proto::ClusterConfig{};
  394. auto folder = cc.add_folders();
  395. folder->set_id(std::string(folder_1->get_id()));
  396. auto d_peer = folder->add_devices();
  397. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  398. d_peer->set_max_sequence(folder_1_peer->get_max_sequence());
  399. d_peer->set_index_id(folder_1_peer->get_index());
  400. SECTION("peer has outdated by sequence view") {
  401. auto d_my = folder->add_devices();
  402. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  403. d_my->set_max_sequence(folder_1_my->get_max_sequence() - 1);
  404. d_my->set_index_id(folder_1_my->get_index());
  405. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  406. sup->do_process();
  407. auto &queue = peer_actor->messages;
  408. REQUIRE(queue.size() == 2);
  409. auto msg = &(*queue.front()).payload;
  410. auto &my_index = *std::get<proto::message::Index>(*msg);
  411. REQUIRE(my_index.files_size() == 0);
  412. queue.pop_front();
  413. msg = &(*queue.front()).payload;
  414. auto &my_index_update = *std::get<proto::message::IndexUpdate>(*msg);
  415. REQUIRE(my_index_update.files_size() == 1);
  416. }
  417. SECTION("peer has outdated by index view") {
  418. auto d_my = folder->add_devices();
  419. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  420. d_my->set_max_sequence(folder_1_my->get_max_sequence());
  421. d_my->set_index_id(folder_1_my->get_index() + 5);
  422. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  423. sup->do_process();
  424. auto &queue = peer_actor->messages;
  425. REQUIRE(queue.size() == 2);
  426. auto msg = &(*queue.front()).payload;
  427. auto &my_index = *std::get<proto::message::Index>(*msg);
  428. REQUIRE(my_index.files_size() == 0);
  429. queue.pop_front();
  430. msg = &(*queue.front()).payload;
  431. auto &my_index_update = *std::get<proto::message::IndexUpdate>(*msg);
  432. REQUIRE(my_index_update.files_size() == 1);
  433. }
  434. SECTION("peer has actual view") {
  435. auto d_my = folder->add_devices();
  436. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  437. d_my->set_max_sequence(folder_1_my->get_max_sequence());
  438. d_my->set_index_id(folder_1_my->get_index());
  439. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  440. sup->do_process();
  441. auto &queue = peer_actor->messages;
  442. REQUIRE(queue.size() == 0);
  443. }
  444. }
  445. };
  446. F(true, 10).run();
  447. }
  448. void test_downloading() {
  449. struct F : fixture_t {
  450. using fixture_t::fixture_t;
  451. void main(diff_builder_t &) noexcept override {
  452. auto &folder_infos = folder_1->get_folder_infos();
  453. auto folder_my = folder_infos.by_device(*my_device);
  454. auto cc = proto::ClusterConfig{};
  455. auto folder = cc.add_folders();
  456. folder->set_id(std::string(folder_1->get_id()));
  457. auto d_peer = folder->add_devices();
  458. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  459. d_peer->set_max_sequence(folder_1_peer->get_max_sequence());
  460. d_peer->set_index_id(folder_1_peer->get_index());
  461. auto d_my = folder->add_devices();
  462. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  463. d_my->set_max_sequence(folder_my->get_max_sequence());
  464. d_my->set_index_id(folder_my->get_index());
  465. SECTION("cluster config & index has a new file => download it") {
  466. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  467. auto index = proto::Index{};
  468. index.set_folder(std::string(folder_1->get_id()));
  469. auto file = index.add_files();
  470. file->set_name("some-file");
  471. file->set_type(proto::FileInfoType::FILE);
  472. file->set_sequence(folder_1_peer->get_max_sequence());
  473. file->set_block_size(5);
  474. file->set_size(5);
  475. auto version = file->mutable_version();
  476. auto counter = version->add_counters();
  477. counter->set_id(1ul);
  478. counter->set_value(1ul);
  479. auto b1 = file->add_blocks();
  480. b1->set_hash(utils::sha256_digest("12345").value());
  481. b1->set_offset(0);
  482. b1->set_size(5);
  483. auto folder_my = folder_infos.by_device(*my_device);
  484. CHECK(folder_my->get_max_sequence() == 0ul);
  485. CHECK(!folder_my->get_folder()->is_synchronizing());
  486. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  487. sup->do_process();
  488. CHECK(folder_my->get_folder()->is_synchronizing());
  489. peer_actor->push_block("12345", 0);
  490. peer_actor->process_block_requests();
  491. sup->do_process();
  492. CHECK(!folder_my->get_folder()->is_synchronizing());
  493. REQUIRE(folder_my);
  494. CHECK(folder_my->get_max_sequence() == 1ul);
  495. REQUIRE(folder_my->get_file_infos().size() == 1);
  496. auto f = folder_my->get_file_infos().begin()->item;
  497. REQUIRE(f);
  498. CHECK(f->get_name() == file->name());
  499. CHECK(f->get_size() == 5);
  500. CHECK(f->get_blocks().size() == 1);
  501. CHECK(f->is_locally_available());
  502. CHECK(!f->is_locked());
  503. CHECK(peer_actor->blocks_requested == 1);
  504. auto &queue = peer_actor->messages;
  505. REQUIRE(queue.size() > 0);
  506. auto msg = &(*queue.front()).payload;
  507. auto &my_index = *std::get<proto::message::Index>(*msg);
  508. REQUIRE(my_index.files_size() == 0);
  509. queue.pop_front();
  510. msg = &(*queue.back()).payload;
  511. auto &my_index_update = *std::get<proto::message::IndexUpdate>(*msg);
  512. REQUIRE(my_index_update.files_size() == 1);
  513. SECTION("dont redownload file only if metadata has changed") {
  514. auto index_update = proto::IndexUpdate{};
  515. index_update.set_folder(index.folder());
  516. file->set_sequence(folder_1_peer->get_max_sequence() + 1);
  517. counter->set_value(2ul);
  518. *index_update.add_files() = *file;
  519. peer_actor->forward(proto::message::IndexUpdate(new proto::IndexUpdate(index_update)));
  520. sup->do_process();
  521. CHECK(peer_actor->blocks_requested == 1);
  522. CHECK(folder_my->get_max_sequence() == 2ul);
  523. f = folder_my->get_file_infos().begin()->item;
  524. CHECK(f->is_locally_available());
  525. CHECK(f->get_sequence() == 2ul);
  526. }
  527. }
  528. SECTION("download 2 files") {
  529. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  530. auto index = proto::Index{};
  531. index.set_folder(std::string(folder_1->get_id()));
  532. auto file_1 = index.add_files();
  533. file_1->set_name("file-1");
  534. file_1->set_type(proto::FileInfoType::FILE);
  535. file_1->set_sequence(folder_1_peer->get_max_sequence());
  536. file_1->set_block_size(5);
  537. file_1->set_size(5);
  538. auto version_1 = file_1->mutable_version();
  539. auto counter_1 = version_1->add_counters();
  540. counter_1->set_id(1ul);
  541. counter_1->set_value(1ul);
  542. auto file_2 = index.add_files();
  543. file_2->set_name("file-2");
  544. file_2->set_type(proto::FileInfoType::FILE);
  545. file_2->set_sequence(folder_1_peer->get_max_sequence());
  546. file_2->set_block_size(5);
  547. file_2->set_size(5);
  548. auto version_2 = file_2->mutable_version();
  549. auto counter_2 = version_2->add_counters();
  550. counter_2->set_id(1ul);
  551. counter_2->set_value(2ul);
  552. auto b1 = file_1->add_blocks();
  553. b1->set_hash(utils::sha256_digest("12345").value());
  554. b1->set_offset(0);
  555. b1->set_size(5);
  556. SECTION("with different blocks") {
  557. auto b2 = file_2->add_blocks();
  558. b2->set_hash(utils::sha256_digest("67890").value());
  559. b2->set_offset(0);
  560. b2->set_size(5);
  561. auto folder_my = folder_infos.by_device(*my_device);
  562. CHECK(folder_my->get_max_sequence() == 0ul);
  563. CHECK(!folder_my->get_folder()->is_synchronizing());
  564. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  565. peer_actor->push_block("12345", 0, file_1->name());
  566. peer_actor->push_block("67890", 0, file_2->name());
  567. sup->do_process();
  568. CHECK(!folder_my->get_folder()->is_synchronizing());
  569. CHECK(peer_actor->blocks_requested == 2);
  570. REQUIRE(folder_my);
  571. CHECK(folder_my->get_max_sequence() == 2ul);
  572. REQUIRE(folder_my->get_file_infos().size() == 2);
  573. {
  574. auto f = folder_my->get_file_infos().by_name(file_1->name());
  575. REQUIRE(f);
  576. CHECK(f->get_size() == 5);
  577. CHECK(f->get_blocks().size() == 1);
  578. CHECK(f->is_locally_available());
  579. CHECK(!f->is_locked());
  580. }
  581. {
  582. auto f = folder_my->get_file_infos().by_name(file_2->name());
  583. REQUIRE(f);
  584. CHECK(f->get_size() == 5);
  585. CHECK(f->get_blocks().size() == 1);
  586. CHECK(f->is_locally_available());
  587. CHECK(!f->is_locked());
  588. }
  589. }
  590. SECTION("with the same block") {
  591. *file_2->add_blocks() = *b1;
  592. auto folder_my = folder_infos.by_device(*my_device);
  593. CHECK(folder_my->get_max_sequence() == 0ul);
  594. CHECK(!folder_my->get_folder()->is_synchronizing());
  595. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  596. peer_actor->push_block("12345", 0, file_1->name());
  597. sup->do_process();
  598. CHECK(!folder_my->get_folder()->is_synchronizing());
  599. CHECK(peer_actor->blocks_requested == 1);
  600. REQUIRE(folder_my);
  601. CHECK(folder_my->get_max_sequence() == 2ul);
  602. REQUIRE(folder_my->get_file_infos().size() == 2);
  603. {
  604. auto f = folder_my->get_file_infos().by_name(file_1->name());
  605. REQUIRE(f);
  606. CHECK(f->get_size() == 5);
  607. CHECK(f->get_blocks().size() == 1);
  608. CHECK(f->is_locally_available());
  609. CHECK(!f->is_locked());
  610. }
  611. {
  612. auto f = folder_my->get_file_infos().by_name(file_2->name());
  613. REQUIRE(f);
  614. CHECK(f->get_size() == 5);
  615. CHECK(f->get_blocks().size() == 1);
  616. CHECK(f->is_locally_available());
  617. CHECK(!f->is_locked());
  618. }
  619. }
  620. SECTION("with the same blocks") {
  621. auto concurrent_writes = GENERATE(1, 5);
  622. cluster->modify_write_requests(concurrent_writes);
  623. *file_2->add_blocks() = *b1;
  624. *file_2->add_blocks() = *b1;
  625. file_2->set_size(10);
  626. auto folder_my = folder_infos.by_device(*my_device);
  627. CHECK(folder_my->get_max_sequence() == 0ul);
  628. CHECK(!folder_my->get_folder()->is_synchronizing());
  629. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  630. peer_actor->push_block("12345", 0, file_1->name());
  631. sup->do_process();
  632. CHECK(!folder_my->get_folder()->is_synchronizing());
  633. CHECK(peer_actor->blocks_requested == 1);
  634. REQUIRE(folder_my);
  635. CHECK(folder_my->get_max_sequence() == 2ul);
  636. REQUIRE(folder_my->get_file_infos().size() == 2);
  637. {
  638. auto f = folder_my->get_file_infos().by_name(file_1->name());
  639. REQUIRE(f);
  640. CHECK(f->get_size() == 5);
  641. CHECK(f->get_blocks().size() == 1);
  642. CHECK(f->is_locally_available());
  643. CHECK(!f->is_locked());
  644. }
  645. {
  646. auto f = folder_my->get_file_infos().by_name(file_2->name());
  647. REQUIRE(f);
  648. CHECK(f->get_size() == 10);
  649. CHECK(f->get_blocks().size() == 2);
  650. CHECK(f->is_locally_available());
  651. CHECK(!f->is_locked());
  652. }
  653. }
  654. }
  655. SECTION("don't attempt to download a file, which is deleted") {
  656. auto folder_peer = folder_infos.by_device(*peer_device);
  657. auto pr_fi = proto::FileInfo{};
  658. pr_fi.set_name("some-file");
  659. pr_fi.set_type(proto::FileInfoType::FILE);
  660. pr_fi.set_sequence(folder_1_peer->get_max_sequence());
  661. pr_fi.set_block_size(5);
  662. pr_fi.set_size(5);
  663. auto b1 = pr_fi.add_blocks();
  664. b1->set_hash(utils::sha256_digest("12345").value());
  665. b1->set_offset(0);
  666. b1->set_size(5);
  667. auto b = model::block_info_t::create(*b1).value();
  668. auto uuid = sup->sequencer->next_uuid();
  669. auto file_info = model::file_info_t::create(uuid, pr_fi, folder_peer).value();
  670. file_info->assign_block(b, 0);
  671. folder_peer->add(file_info, true);
  672. cluster->get_blocks().put(b);
  673. d_peer->set_max_sequence(folder_1_peer->get_max_sequence() + 1);
  674. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  675. sup->do_process();
  676. auto index = proto::IndexUpdate{};
  677. index.set_folder(std::string(folder_1->get_id()));
  678. auto file = index.add_files();
  679. file->set_name("some-file");
  680. file->set_type(proto::FileInfoType::FILE);
  681. file->set_deleted(true);
  682. file->set_sequence(folder_1_peer->get_max_sequence() + 1);
  683. file->set_block_size(0);
  684. file->set_size(0);
  685. auto v = file->mutable_version();
  686. auto c = v->add_counters();
  687. c->set_id(peer_device->as_uint());
  688. c->set_value(1);
  689. peer_actor->forward(proto::message::IndexUpdate(new proto::IndexUpdate(index)));
  690. sup->do_process();
  691. CHECK(folder_my->get_max_sequence() == 1ul);
  692. REQUIRE(folder_my->get_file_infos().size() == 1);
  693. auto f = folder_my->get_file_infos().begin()->item;
  694. REQUIRE(f);
  695. CHECK(f->get_name() == pr_fi.name());
  696. CHECK(f->get_size() == 0);
  697. CHECK(f->get_blocks().size() == 0);
  698. CHECK(f->is_locally_available());
  699. CHECK(f->is_deleted());
  700. CHECK(!f->is_locked());
  701. CHECK(f->get_sequence() == 1ul);
  702. CHECK(peer_actor->blocks_requested == 0);
  703. }
  704. SECTION("new file via index_update => download it") {
  705. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  706. auto index = proto::Index{};
  707. index.set_folder(std::string(folder_1->get_id()));
  708. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  709. auto index_update = proto::IndexUpdate{};
  710. index_update.set_folder(std::string(folder_1->get_id()));
  711. auto file = index_update.add_files();
  712. file->set_name("some-file");
  713. file->set_type(proto::FileInfoType::FILE);
  714. file->set_sequence(folder_1_peer->get_max_sequence() + 1);
  715. file->set_block_size(5);
  716. file->set_size(5);
  717. auto version = file->mutable_version();
  718. auto counter = version->add_counters();
  719. counter->set_id(1);
  720. counter->set_value(peer_device->as_uint());
  721. auto b1 = file->add_blocks();
  722. b1->set_hash(utils::sha256_digest("12345").value());
  723. b1->set_offset(0);
  724. b1->set_size(5);
  725. peer_actor->forward(proto::message::IndexUpdate(new proto::IndexUpdate(index_update)));
  726. peer_actor->push_block("12345", 0);
  727. sup->do_process();
  728. auto folder_my = folder_infos.by_device(*my_device);
  729. CHECK(folder_my->get_max_sequence() == 1);
  730. REQUIRE(folder_my->get_file_infos().size() == 1);
  731. auto f = folder_my->get_file_infos().begin()->item;
  732. REQUIRE(f);
  733. CHECK(f->get_name() == file->name());
  734. CHECK(f->get_size() == 5);
  735. CHECK(f->get_blocks().size() == 1);
  736. CHECK(f->is_locally_available());
  737. CHECK(!f->is_locked());
  738. auto fp = folder_1_peer->get_file_infos().begin()->item;
  739. REQUIRE(fp);
  740. CHECK(!fp->is_locked());
  741. }
  742. SECTION("deleted file, has been restored => download it") {
  743. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  744. sup->do_process();
  745. auto index = proto::Index{};
  746. index.set_folder(std::string(folder_1->get_id()));
  747. auto file_1 = index.add_files();
  748. file_1->set_name("some-file");
  749. file_1->set_type(proto::FileInfoType::FILE);
  750. file_1->set_sequence(folder_1_peer->get_max_sequence());
  751. file_1->set_deleted(true);
  752. auto v1 = file_1->mutable_version();
  753. auto c1 = v1->add_counters();
  754. c1->set_id(1u);
  755. c1->set_value(1u);
  756. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  757. sup->do_process();
  758. CHECK(!folder_my->get_folder()->is_synchronizing());
  759. auto folder_my = folder_infos.by_device(*my_device);
  760. CHECK(folder_my->get_max_sequence() == 1ul);
  761. auto index_update = proto::IndexUpdate{};
  762. index_update.set_folder(std::string(folder_1->get_id()));
  763. auto file_2 = index_update.add_files();
  764. file_2->set_name("some-file");
  765. file_2->set_type(proto::FileInfoType::FILE);
  766. file_2->set_sequence(folder_1_peer->get_max_sequence() + 1);
  767. file_2->set_block_size(128 * 1024);
  768. file_2->set_size(5);
  769. auto v2 = file_2->mutable_version();
  770. auto c2 = v2->add_counters();
  771. c2->set_id(1u);
  772. c2->set_value(2u);
  773. auto b1 = file_2->add_blocks();
  774. b1->set_hash(utils::sha256_digest("12345").value());
  775. b1->set_offset(0);
  776. b1->set_size(5);
  777. peer_actor->forward(proto::message::IndexUpdate(new proto::IndexUpdate(index_update)));
  778. peer_actor->push_block("12345", 0);
  779. sup->do_process();
  780. REQUIRE(folder_my->get_file_infos().size() == 1);
  781. auto f = folder_my->get_file_infos().begin()->item;
  782. REQUIRE(f);
  783. CHECK(f->get_name() == file_1->name());
  784. CHECK(f->get_size() == 5);
  785. CHECK(f->get_blocks().size() == 1);
  786. CHECK(f->is_locally_available());
  787. CHECK(!f->is_locked());
  788. CHECK(!f->is_deleted());
  789. }
  790. SECTION("download a file, which has the same blocks locally") {
  791. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  792. sup->do_process();
  793. auto index = proto::Index{};
  794. index.set_folder(std::string(folder_1->get_id()));
  795. auto file_1 = index.add_files();
  796. file_1->set_name("some-file");
  797. file_1->set_type(proto::FileInfoType::FILE);
  798. file_1->set_sequence(folder_1_peer->get_max_sequence());
  799. auto v1 = file_1->mutable_version();
  800. auto c1 = v1->add_counters();
  801. c1->set_id(1u);
  802. c1->set_value(1u);
  803. file_1->set_block_size(5);
  804. file_1->set_size(10);
  805. auto b1 = file_1->add_blocks();
  806. b1->set_hash(utils::sha256_digest("12345").value());
  807. b1->set_offset(0);
  808. b1->set_size(5);
  809. auto bi_1 = model::block_info_t::create(*b1).value();
  810. auto b2 = file_1->add_blocks();
  811. b2->set_hash(utils::sha256_digest("67890").value());
  812. b2->set_offset(5);
  813. b2->set_size(5);
  814. auto bi_2 = model::block_info_t::create(*b2).value();
  815. auto &blocks = cluster->get_blocks();
  816. blocks.put(bi_1);
  817. blocks.put(bi_2);
  818. auto pr_my = proto::FileInfo{};
  819. pr_my.set_name("some-file.source");
  820. pr_my.set_type(proto::FileInfoType::FILE);
  821. pr_my.set_sequence(2ul);
  822. pr_my.set_block_size(5);
  823. pr_my.set_size(5);
  824. auto uuid = sup->sequencer->next_uuid();
  825. auto file_my = model::file_info_t::create(uuid, pr_my, folder_my).value();
  826. file_my->assign_block(bi_1, 0);
  827. file_my->mark_local_available(0);
  828. folder_my->add(file_my, true);
  829. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  830. peer_actor->push_block("67890", 1);
  831. cluster->modify_write_requests(10);
  832. sup->do_process();
  833. REQUIRE(folder_my->get_file_infos().size() == 2);
  834. auto f = folder_my->get_file_infos().by_name(file_1->name());
  835. REQUIRE(f);
  836. CHECK(f->get_name() == file_1->name());
  837. CHECK(f->get_size() == 10);
  838. CHECK(f->get_blocks().size() == 2);
  839. CHECK(f->is_locally_available());
  840. CHECK(!f->is_locked());
  841. }
  842. }
  843. };
  844. F(true, 10).run();
  845. }
  846. void test_downloading_errors() {
  847. struct F : fixture_t {
  848. using fixture_t::fixture_t;
  849. void main(diff_builder_t &) noexcept override {
  850. auto &folder_infos = folder_1->get_folder_infos();
  851. auto folder_my = folder_infos.by_device(*my_device);
  852. auto cc = proto::ClusterConfig{};
  853. auto folder = cc.add_folders();
  854. folder->set_id(std::string(folder_1->get_id()));
  855. auto d_peer = folder->add_devices();
  856. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  857. d_peer->set_max_sequence(folder_1_peer->get_max_sequence());
  858. d_peer->set_index_id(folder_1_peer->get_index());
  859. auto d_my = folder->add_devices();
  860. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  861. d_my->set_max_sequence(folder_my->get_max_sequence());
  862. d_my->set_index_id(folder_my->get_index());
  863. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  864. auto index = proto::Index{};
  865. index.set_folder(std::string(folder_1->get_id()));
  866. auto file = index.add_files();
  867. file->set_name("some-file");
  868. file->set_type(proto::FileInfoType::FILE);
  869. file->set_sequence(folder_1_peer->get_max_sequence());
  870. file->set_block_size(5);
  871. file->set_size(5);
  872. auto version = file->mutable_version();
  873. auto counter = version->add_counters();
  874. counter->set_id(1ul);
  875. counter->set_value(1ul);
  876. auto b1 = file->add_blocks();
  877. b1->set_hash(utils::sha256_digest("12345").value());
  878. b1->set_offset(0);
  879. b1->set_size(5);
  880. CHECK(folder_my->get_max_sequence() == 0ul);
  881. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  882. SECTION("general error, ok, do not shutdown") {
  883. auto ec = utils::make_error_code(utils::request_error_code_t::generic);
  884. peer_actor->push_block(ec, 0);
  885. }
  886. SECTION("hash mismatch, do not shutdown") { peer_actor->push_block("zzz", 0); }
  887. sup->do_process();
  888. CHECK(peer_actor->blocks_requested == 1);
  889. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  890. auto folder_peer = folder_infos.by_device(*peer_device);
  891. REQUIRE(folder_peer->get_file_infos().size() == 1);
  892. auto f = folder_peer->get_file_infos().begin()->item;
  893. REQUIRE(f);
  894. CHECK(f->is_unreachable());
  895. CHECK(!f->is_locally_locked());
  896. CHECK(!f->is_locked());
  897. auto lf = f->local_file();
  898. CHECK(!lf->is_locally_locked());
  899. CHECK(!lf->is_locked());
  900. CHECK(!folder_my->get_folder()->is_synchronizing());
  901. sup->do_process();
  902. }
  903. };
  904. F(true, 10).run();
  905. }
  906. void test_download_from_scratch() {
  907. struct F : fixture_t {
  908. using fixture_t::fixture_t;
  909. void main(diff_builder_t &) noexcept override {
  910. sup->do_process();
  911. auto builder = diff_builder_t(*cluster);
  912. auto sha256 = peer_device->device_id().get_sha256();
  913. auto cc = proto::ClusterConfig{};
  914. auto folder = cc.add_folders();
  915. folder->set_id(std::string(folder_1->get_id()));
  916. auto d_peer = folder->add_devices();
  917. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  918. d_peer->set_max_sequence(15);
  919. d_peer->set_index_id(12345);
  920. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  921. sup->do_process();
  922. builder.share_folder(sha256, folder_1->get_id()).apply(*sup);
  923. auto index = proto::Index{};
  924. index.set_folder(std::string(folder_1->get_id()));
  925. auto file = index.add_files();
  926. file->set_name("some-file");
  927. file->set_type(proto::FileInfoType::FILE);
  928. file->set_sequence(154);
  929. file->set_block_size(5);
  930. file->set_size(5);
  931. auto version = file->mutable_version();
  932. auto counter = version->add_counters();
  933. counter->set_id(1ul);
  934. counter->set_value(1ul);
  935. auto b1 = file->add_blocks();
  936. b1->set_hash(utils::sha256_digest("12345").value());
  937. b1->set_offset(0);
  938. b1->set_size(5);
  939. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  940. peer_actor->push_block("12345", 0, file->name());
  941. sup->do_process();
  942. auto folder_my = folder_1->get_folder_infos().by_device(*my_device);
  943. CHECK(folder_my->get_max_sequence() == 1ul);
  944. CHECK(!folder_my->get_folder()->is_synchronizing());
  945. auto f = folder_my->get_file_infos().by_name(file->name());
  946. REQUIRE(f);
  947. CHECK(f->get_size() == 5);
  948. CHECK(f->get_blocks().size() == 1);
  949. CHECK(f->is_locally_available());
  950. CHECK(!f->is_locked());
  951. }
  952. };
  953. F(false, 10, false).run();
  954. }
  955. void test_my_sharing() {
  956. struct F : fixture_t {
  957. using fixture_t::fixture_t;
  958. void main(diff_builder_t &) noexcept override {
  959. sup->do_process();
  960. auto cc = proto::ClusterConfig{};
  961. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  962. // nothing is shared
  963. sup->do_process();
  964. REQUIRE(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  965. REQUIRE(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::OPERATIONAL);
  966. REQUIRE(peer_actor->messages.size() == 1);
  967. auto peer_msg = &peer_actor->messages.front()->payload;
  968. auto peer_cluster_msg = std::get_if<proto::message::ClusterConfig>(peer_msg);
  969. REQUIRE(peer_cluster_msg);
  970. REQUIRE(*peer_cluster_msg);
  971. REQUIRE((*peer_cluster_msg)->folders_size() == 0);
  972. // share folder_1
  973. peer_actor->messages.clear();
  974. auto sha256 = peer_device->device_id().get_sha256();
  975. diff_builder_t(*cluster).share_folder(sha256, folder_1->get_id()).apply(*sup);
  976. REQUIRE(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  977. REQUIRE(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::OPERATIONAL);
  978. REQUIRE(peer_actor->messages.size() == 1);
  979. peer_msg = &peer_actor->messages.front()->payload;
  980. peer_cluster_msg = std::get_if<proto::message::ClusterConfig>(peer_msg);
  981. REQUIRE(peer_cluster_msg);
  982. REQUIRE(*peer_cluster_msg);
  983. REQUIRE((*peer_cluster_msg)->folders_size() == 1);
  984. // unshare folder_1
  985. auto peer_fi = folder_1->get_folder_infos().by_device(*peer_device);
  986. peer_actor->messages.clear();
  987. diff_builder_t(*cluster).unshare_folder(*peer_fi).apply(*sup);
  988. REQUIRE(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  989. REQUIRE(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::OPERATIONAL);
  990. REQUIRE(peer_actor->messages.size() == 1);
  991. peer_msg = &peer_actor->messages.front()->payload;
  992. peer_cluster_msg = std::get_if<proto::message::ClusterConfig>(peer_msg);
  993. REQUIRE(peer_cluster_msg);
  994. REQUIRE(*peer_cluster_msg);
  995. REQUIRE((*peer_cluster_msg)->folders_size() == 0);
  996. }
  997. };
  998. F(false, 10, false).run();
  999. }
  1000. void test_sending_index_updates() {
  1001. struct F : fixture_t {
  1002. using fixture_t::fixture_t;
  1003. void main(diff_builder_t &) noexcept override {
  1004. auto &folder_infos = folder_1->get_folder_infos();
  1005. auto folder_my = folder_infos.by_device(*my_device);
  1006. auto cc = proto::ClusterConfig{};
  1007. auto folder = cc.add_folders();
  1008. folder->set_id(std::string(folder_1->get_id()));
  1009. auto d_peer = folder->add_devices();
  1010. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  1011. d_peer->set_max_sequence(folder_1_peer->get_max_sequence());
  1012. d_peer->set_index_id(folder_1_peer->get_index());
  1013. auto d_my = folder->add_devices();
  1014. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  1015. d_my->set_max_sequence(folder_my->get_max_sequence());
  1016. d_my->set_index_id(folder_my->get_index());
  1017. auto index = proto::Index{};
  1018. auto folder_id = std::string(folder_1->get_id());
  1019. index.set_folder(folder_id);
  1020. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  1021. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  1022. sup->do_process();
  1023. auto builder = diff_builder_t(*cluster);
  1024. auto pr_file = proto::FileInfo();
  1025. pr_file.set_name("a.txt");
  1026. peer_actor->messages.clear();
  1027. builder.local_update(folder_id, pr_file).apply(*sup);
  1028. REQUIRE(peer_actor->messages.size() == 1);
  1029. auto &msg = peer_actor->messages.front();
  1030. auto &index_update = *std::get<proto::message::IndexUpdate>(msg->payload);
  1031. REQUIRE(index_update.files_size() == 1);
  1032. CHECK(index_update.files(0).name() == "a.txt");
  1033. }
  1034. };
  1035. F(true, 10).run();
  1036. }
  1037. void test_uploading() {
  1038. struct F : fixture_t {
  1039. using fixture_t::fixture_t;
  1040. void main(diff_builder_t &) noexcept override {
  1041. auto &folder_infos = folder_1->get_folder_infos();
  1042. auto folder_my = folder_infos.by_device(*my_device);
  1043. auto cc = proto::ClusterConfig{};
  1044. auto folder = cc.add_folders();
  1045. folder->set_id(std::string(folder_1->get_id()));
  1046. auto d_peer = folder->add_devices();
  1047. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  1048. d_peer->set_max_sequence(folder_1_peer->get_max_sequence());
  1049. d_peer->set_index_id(folder_1_peer->get_index());
  1050. auto d_my = folder->add_devices();
  1051. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  1052. d_my->set_max_sequence(folder_my->get_max_sequence());
  1053. d_my->set_index_id(folder_my->get_index());
  1054. auto pr_fi = proto::FileInfo{};
  1055. pr_fi.set_name("data.bin");
  1056. pr_fi.set_type(proto::FileInfoType::FILE);
  1057. pr_fi.set_sequence(folder_1_peer->get_max_sequence());
  1058. pr_fi.set_block_size(5);
  1059. pr_fi.set_size(5);
  1060. auto version = pr_fi.mutable_version();
  1061. auto counter = version->add_counters();
  1062. counter->set_id(1);
  1063. counter->set_value(my_device->as_uint());
  1064. auto b1 = pr_fi.add_blocks();
  1065. b1->set_hash(utils::sha256_digest("12345").value());
  1066. b1->set_offset(0);
  1067. b1->set_size(5);
  1068. auto b = model::block_info_t::create(*b1).value();
  1069. auto uuid = sup->sequencer->next_uuid();
  1070. auto file_info = model::file_info_t::create(uuid, pr_fi, folder_my).value();
  1071. file_info->assign_block(b, 0);
  1072. folder_my->add(file_info, true);
  1073. auto req = proto::Request();
  1074. req.set_id(1);
  1075. req.set_folder(std::string(folder_1->get_id()));
  1076. req.set_name("data.bin");
  1077. req.set_offset(0);
  1078. req.set_size(5);
  1079. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  1080. SECTION("upload regular file, no hash") {
  1081. peer_actor->forward(proto::message::Request(new proto::Request(req)));
  1082. auto req_ptr = proto::message::Request(new proto::Request(req));
  1083. auto res = r::make_message<fs::payload::block_response_t>(target->get_address(), std::move(req_ptr),
  1084. sys::error_code{}, std::string("12345"));
  1085. block_responses.push_back(res);
  1086. sup->do_process();
  1087. REQUIRE(block_requests.size() == 1);
  1088. CHECK(block_requests[0]->payload.remote_request->id() == 1);
  1089. CHECK(block_requests[0]->payload.remote_request->name() == "data.bin");
  1090. REQUIRE(peer_actor->uploaded_blocks.size() == 1);
  1091. auto &peer_res = *peer_actor->uploaded_blocks.front();
  1092. CHECK(peer_res.id() == 1);
  1093. CHECK(peer_res.code() == proto::ErrorCode::NO_BEP_ERROR);
  1094. CHECK(peer_res.data() == "12345");
  1095. }
  1096. }
  1097. };
  1098. F(true, 10).run();
  1099. }
  1100. void test_peer_removal() {
  1101. struct F : fixture_t {
  1102. using fixture_t::fixture_t;
  1103. void main(diff_builder_t &builder) noexcept override {
  1104. builder.remove_peer(*peer_device).apply(*sup);
  1105. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  1106. CHECK(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  1107. CHECK(target->get_shutdown_reason()->root()->ec == utils::error_code_t::peer_has_been_removed);
  1108. }
  1109. };
  1110. F(true, 10).run();
  1111. }
  1112. int _init() {
  1113. REGISTER_TEST_CASE(test_startup, "test_startup", "[net]");
  1114. REGISTER_TEST_CASE(test_index_receiving, "test_index_receiving", "[net]");
  1115. REGISTER_TEST_CASE(test_index_sending, "test_index_sending", "[net]");
  1116. REGISTER_TEST_CASE(test_downloading, "test_downloading", "[net]");
  1117. REGISTER_TEST_CASE(test_downloading_errors, "test_downloading_errors", "[net]");
  1118. REGISTER_TEST_CASE(test_download_from_scratch, "test_download_from_scratch", "[net]");
  1119. REGISTER_TEST_CASE(test_my_sharing, "test_my_sharing", "[net]");
  1120. REGISTER_TEST_CASE(test_sending_index_updates, "test_sending_index_updates", "[net]");
  1121. REGISTER_TEST_CASE(test_uploading, "test_uploading", "[net]");
  1122. REGISTER_TEST_CASE(test_peer_removal, "test_peer_removal", "[net]");
  1123. return 1;
  1124. }
  1125. static int v = _init();