075-controller.cpp 56 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. // SPDX-FileCopyrightText: 2019-2024 Ivan Baidakou
  3. #include "test-utils.h"
  4. #include "access.h"
  5. #include "test_supervisor.h"
  6. #include "model/cluster.h"
  7. #include "diff-builder.h"
  8. #include "hasher/hasher_proxy_actor.h"
  9. #include "hasher/hasher_actor.h"
  10. #include "net/controller_actor.h"
  11. #include "net/names.h"
  12. #include "fs/messages.h"
  13. #include "utils/error_code.h"
  14. #include "proto/bep_support.h"
  15. #include <boost/core/demangle.hpp>
  16. using namespace syncspirit;
  17. using namespace syncspirit::test;
  18. using namespace syncspirit::model;
  19. using namespace syncspirit::net;
  20. using namespace syncspirit::hasher;
  21. namespace {
  22. struct sample_peer_config_t : public r::actor_config_t {
  23. model::device_id_t peer_device_id;
  24. };
  25. template <typename Actor> struct sample_peer_config_builder_t : r::actor_config_builder_t<Actor> {
  26. using builder_t = typename Actor::template config_builder_t<Actor>;
  27. using parent_t = r::actor_config_builder_t<Actor>;
  28. using parent_t::parent_t;
  29. builder_t &&peer_device_id(const model::device_id_t &value) && noexcept {
  30. parent_t::config.peer_device_id = value;
  31. return std::move(*static_cast<typename parent_t::builder_t *>(this));
  32. }
  33. };
  34. struct sample_peer_t : r::actor_base_t {
  35. using config_t = sample_peer_config_t;
  36. template <typename Actor> using config_builder_t = sample_peer_config_builder_t<Actor>;
  37. using remote_message_t = r::intrusive_ptr_t<net::message::forwarded_message_t>;
  38. using remote_messages_t = std::list<remote_message_t>;
  39. struct block_response_t {
  40. std::string name;
  41. size_t block_index;
  42. std::string data;
  43. sys::error_code ec;
  44. };
  45. using block_responses_t = std::list<block_response_t>;
  46. using block_request_t = r::intrusive_ptr_t<net::message::block_request_t>;
  47. using block_requests_t = std::list<block_request_t>;
  48. using uploaded_blocks_t = std::list<proto::message::Response>;
  49. sample_peer_t(config_t &config) : r::actor_base_t{config}, peer_device{config.peer_device_id} {
  50. log = utils::get_logger("test.sample_peer");
  51. }
  52. void configure(r::plugin::plugin_base_t &plugin) noexcept override {
  53. r::actor_base_t::configure(plugin);
  54. plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) { p.set_identity("sample_peer", false); });
  55. plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  56. p.subscribe_actor(&sample_peer_t::on_start_reading);
  57. p.subscribe_actor(&sample_peer_t::on_termination);
  58. p.subscribe_actor(&sample_peer_t::on_transfer);
  59. p.subscribe_actor(&sample_peer_t::on_block_request);
  60. });
  61. }
  62. void shutdown_start() noexcept override {
  63. LOG_TRACE(log, "{}, shutdown_start", identity);
  64. if (controller) {
  65. send<net::payload::termination_t>(controller, shutdown_reason);
  66. }
  67. r::actor_base_t::shutdown_start();
  68. }
  69. void shutdown_finish() noexcept override {
  70. r::actor_base_t::shutdown_finish();
  71. LOG_TRACE(log, "{}, shutdown_finish, blocks requested = {}", identity, blocks_requested);
  72. if (controller) {
  73. send<net::payload::termination_t>(controller, shutdown_reason);
  74. }
  75. }
  76. void on_start_reading(net::message::start_reading_t &msg) noexcept {
  77. LOG_TRACE(log, "{}, on_start_reading", identity);
  78. controller = msg.payload.controller;
  79. reading = msg.payload.start;
  80. }
  81. void on_termination(net::message::termination_signal_t &msg) noexcept {
  82. LOG_TRACE(log, "{}, on_termination", identity);
  83. if (!shutdown_reason) {
  84. auto &ee = msg.payload.ee;
  85. auto reason = ee->message();
  86. LOG_TRACE(log, "{}, on_termination: {}", identity, reason);
  87. do_shutdown(ee);
  88. }
  89. }
  90. void on_transfer(net::message::transfer_data_t &message) noexcept {
  91. auto &data = message.payload.data;
  92. LOG_TRACE(log, "{}, on_transfer, bytes = {}", identity, data.size());
  93. auto buff = boost::asio::buffer(data.data(), data.size());
  94. auto result = proto::parse_bep(buff);
  95. auto orig = std::move(result.value().message);
  96. auto variant = net::payload::forwarded_message_t();
  97. std::visit(
  98. [&](auto &msg) {
  99. using boost::core::demangle;
  100. using T = std::decay_t<decltype(msg)>;
  101. LOG_TRACE(log, "{}, received '{}' message", identity, demangle(typeid(T).name()));
  102. using V = net::payload::forwarded_message_t;
  103. if constexpr (std::is_constructible_v<V, T>) {
  104. variant = std::move(msg);
  105. } else if constexpr (std::is_same_v<T, proto::message::Response>) {
  106. uploaded_blocks.push_back(std::move(msg));
  107. }
  108. },
  109. orig);
  110. auto fwd_msg = new net::message::forwarded_message_t(address, std::move(variant));
  111. messages.emplace_back(fwd_msg);
  112. }
  113. void process_block_requests() noexcept {
  114. auto condition = [&]() -> bool {
  115. if (block_requests.size() && block_responses.size()) {
  116. auto &req = block_requests.front();
  117. auto &res = block_responses.front();
  118. auto &req_payload = req->payload.request_payload;
  119. if (req_payload.block.block_index() == res.block_index) {
  120. auto &name = res.name;
  121. return name.empty() || name == req_payload.file->get_name();
  122. }
  123. }
  124. return false;
  125. };
  126. while (condition()) {
  127. auto &reply = block_responses.front();
  128. auto &request = *block_requests.front();
  129. log->debug("{}, matched '{}', replying..., ec = {}", identity, reply.name, reply.ec.value());
  130. if (!reply.ec) {
  131. reply_to(request, reply.data);
  132. } else {
  133. reply_with_error(request, make_error(reply.ec));
  134. }
  135. block_responses.pop_front();
  136. block_requests.pop_front();
  137. }
  138. }
  139. void on_block_request(net::message::block_request_t &req) noexcept {
  140. block_requests.push_front(&req);
  141. ++blocks_requested;
  142. log->debug("{}, requesting block # {}", identity,
  143. block_requests.front()->payload.request_payload.block.block_index());
  144. if (block_responses.size()) {
  145. log->debug("{}, top response block # {}", identity, block_responses.front().block_index);
  146. }
  147. process_block_requests();
  148. }
  149. void forward(net::payload::forwarded_message_t payload) noexcept {
  150. send<net::payload::forwarded_message_t>(controller, std::move(payload));
  151. }
  152. static const constexpr size_t next_block = 1000000;
  153. void push_block(std::string_view data, size_t index, std::string name = {}) {
  154. if (index == next_block) {
  155. index = block_responses.size();
  156. }
  157. block_responses.push_back(block_response_t{std::move(name), index, std::string(data), {}});
  158. }
  159. void push_block(sys::error_code ec, size_t index) {
  160. if (index == next_block) {
  161. index = block_responses.size();
  162. }
  163. block_responses.push_back(block_response_t{std::string{}, index, std::string{}, ec});
  164. }
  165. size_t blocks_requested = 0;
  166. bool reading = false;
  167. remote_messages_t messages;
  168. r::address_ptr_t controller;
  169. model::device_id_t peer_device;
  170. utils::logger_t log;
  171. block_requests_t block_requests;
  172. block_responses_t block_responses;
  173. uploaded_blocks_t uploaded_blocks;
  174. };
  175. struct fixture_t {
  176. using peer_ptr_t = r::intrusive_ptr_t<sample_peer_t>;
  177. using target_ptr_t = r::intrusive_ptr_t<net::controller_actor_t>;
  178. using blk_req_t = fs::message::block_request_t;
  179. using blk_req_ptr_t = r::intrusive_ptr_t<blk_req_t>;
  180. using blk_res_t = fs::message::block_response_t;
  181. using blk_res_ptr_t = r::intrusive_ptr_t<blk_res_t>;
  182. using block_requests_t = std::deque<blk_req_ptr_t>;
  183. using block_responses_t = std::deque<r::message_ptr_t>;
  184. fixture_t(bool auto_start_, int64_t max_sequence_, bool auto_share_ = true) noexcept
  185. : auto_start{auto_start_}, max_sequence{max_sequence_}, auto_share{auto_share_} {
  186. utils::set_default("trace");
  187. }
  188. virtual void run() noexcept {
  189. auto peer_id =
  190. device_id_t::from_string("VUV42CZ-IQD5A37-RPEBPM4-VVQK6E4-6WSKC7B-PVJQHHD-4PZD44V-ENC6WAZ").value();
  191. peer_device = device_t::create(peer_id, "peer-device").value();
  192. auto my_id =
  193. device_id_t::from_string("KHQNO2S-5QSILRK-YX4JZZ4-7L77APM-QNVGZJT-EKU7IFI-PNEPBMY-4MXFMQD").value();
  194. my_device = device_t::create(my_id, "my-device").value();
  195. cluster = new cluster_t(my_device, 1);
  196. cluster->get_devices().put(my_device);
  197. cluster->get_devices().put(peer_device);
  198. auto folder_id_1 = "1234-5678";
  199. auto folder_id_2 = "5555";
  200. auto builder = diff_builder_t(*cluster);
  201. auto sha256 = peer_id.get_sha256();
  202. builder.upsert_folder(folder_id_1, "")
  203. .upsert_folder(folder_id_2, "")
  204. .configure_cluster(sha256)
  205. .add(sha256, folder_id_1, 123, max_sequence)
  206. .finish();
  207. REQUIRE(builder.apply());
  208. if (auto_share) {
  209. REQUIRE(builder.share_folder(peer_id.get_sha256(), folder_id_1).apply());
  210. }
  211. r::system_context_t ctx;
  212. sup = ctx.create_supervisor<supervisor_t>().timeout(timeout).create_registry().finish();
  213. sup->cluster = cluster;
  214. sup->configure_callback = [&](r::plugin::plugin_base_t &plugin) {
  215. plugin.template with_casted<r::plugin::registry_plugin_t>(
  216. [&](auto &p) { p.register_name(net::names::fs_actor, sup->get_address()); });
  217. plugin.template with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
  218. p.subscribe_actor(r::lambda<blk_req_t>([&](blk_req_t &msg) {
  219. block_requests.push_back(&msg);
  220. if (block_responses.size()) {
  221. sup->put(block_responses.front());
  222. block_responses.pop_front();
  223. }
  224. }));
  225. });
  226. };
  227. sup->start();
  228. sup->do_process();
  229. CHECK(static_cast<r::actor_base_t *>(sup.get())->access<to::state>() == r::state_t::OPERATIONAL);
  230. sup->create_actor<hasher_actor_t>().index(1).timeout(timeout).finish();
  231. sup->create_actor<hasher::hasher_proxy_actor_t>()
  232. .timeout(timeout)
  233. .hasher_threads(1)
  234. .name(net::names::hasher_proxy)
  235. .finish();
  236. peer_actor = sup->create_actor<sample_peer_t>().timeout(timeout).finish();
  237. auto &folders = cluster->get_folders();
  238. folder_1 = folders.by_id(folder_id_1);
  239. folder_2 = folders.by_id(folder_id_2);
  240. folder_1_peer = folder_1->get_folder_infos().by_device_id(peer_id.get_sha256());
  241. target = sup->create_actor<controller_actor_t>()
  242. .peer(peer_device)
  243. .peer_addr(peer_actor->get_address())
  244. .request_pool(1024)
  245. .outgoing_buffer_max(1024'000)
  246. .cluster(cluster)
  247. .sequencer(sup->sequencer)
  248. .timeout(timeout)
  249. .request_timeout(timeout)
  250. .finish();
  251. sup->do_process();
  252. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  253. target_addr = target->get_address();
  254. if (auto_start) {
  255. REQUIRE(peer_actor->reading);
  256. REQUIRE(peer_actor->messages.size() == 1);
  257. auto &msg = (*peer_actor->messages.front()).payload;
  258. REQUIRE(std::get_if<proto::message::ClusterConfig>(&msg));
  259. peer_actor->messages.pop_front();
  260. }
  261. main(builder);
  262. sup->shutdown();
  263. sup->do_process();
  264. CHECK(static_cast<r::actor_base_t *>(sup.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  265. }
  266. virtual void main(diff_builder_t &) noexcept {}
  267. bool auto_start;
  268. bool auto_share;
  269. int64_t max_sequence;
  270. peer_ptr_t peer_actor;
  271. target_ptr_t target;
  272. r::address_ptr_t target_addr;
  273. r::pt::time_duration timeout = r::pt::millisec{10};
  274. cluster_ptr_t cluster;
  275. device_ptr_t peer_device;
  276. device_ptr_t my_device;
  277. r::intrusive_ptr_t<supervisor_t> sup;
  278. r::system_context_t ctx;
  279. model::folder_ptr_t folder_1;
  280. model::folder_info_ptr_t folder_1_peer;
  281. model::folder_ptr_t folder_2;
  282. block_requests_t block_requests;
  283. block_responses_t block_responses;
  284. };
  285. } // namespace
  286. void test_startup() {
  287. struct F : fixture_t {
  288. using fixture_t::fixture_t;
  289. void main(diff_builder_t &) noexcept override {
  290. REQUIRE(peer_actor->reading);
  291. REQUIRE(peer_actor->messages.size() == 1);
  292. auto &msg = (*peer_actor->messages.front()).payload;
  293. REQUIRE(std::get_if<proto::message::ClusterConfig>(&msg));
  294. peer_actor->messages.pop_front();
  295. CHECK(peer_actor->messages.empty());
  296. auto cc = proto::ClusterConfig{};
  297. auto payload = proto::message::ClusterConfig(new proto::ClusterConfig(cc));
  298. peer_actor->forward(std::move(payload));
  299. sup->do_process();
  300. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  301. CHECK(peer_actor->messages.empty());
  302. }
  303. };
  304. F(false, 10, false).run();
  305. }
  306. void test_index_receiving() {
  307. struct F : fixture_t {
  308. using fixture_t::fixture_t;
  309. void main(diff_builder_t &) noexcept override {
  310. auto cc = proto::ClusterConfig{};
  311. auto index = proto::Index{};
  312. SECTION("wrong index") {
  313. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  314. index.set_folder("non-existing-folder");
  315. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  316. sup->do_process();
  317. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  318. CHECK(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  319. }
  320. SECTION("index is applied") {
  321. auto folder = cc.add_folders();
  322. folder->set_id(std::string(folder_1->get_id()));
  323. auto d_peer = folder->add_devices();
  324. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  325. REQUIRE(cluster->get_pending_folders().size() == 0);
  326. d_peer->set_max_sequence(10);
  327. d_peer->set_index_id(folder_1_peer->get_index());
  328. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  329. index.set_folder(std::string(folder_1->get_id()));
  330. auto file = index.add_files();
  331. file->set_name("some-dir");
  332. file->set_type(proto::FileInfoType::DIRECTORY);
  333. file->set_sequence(10);
  334. auto v = file->mutable_version();
  335. auto c = v->add_counters();
  336. c->set_id(peer_device->as_uint());
  337. c->set_value(1);
  338. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  339. sup->do_process();
  340. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  341. CHECK(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::OPERATIONAL);
  342. auto &folder_infos = folder_1->get_folder_infos();
  343. auto folder_peer = folder_infos.by_device(*peer_device);
  344. REQUIRE(folder_peer);
  345. CHECK(folder_peer->get_max_sequence() == 10ul);
  346. REQUIRE(folder_peer->get_file_infos().size() == 1);
  347. CHECK(folder_peer->get_file_infos().begin()->item->get_name() == file->name());
  348. auto folder_my = folder_infos.by_device(*my_device);
  349. REQUIRE(folder_my);
  350. CHECK(folder_my->get_max_sequence() == 1ul);
  351. REQUIRE(folder_my->get_file_infos().size() == 1);
  352. CHECK(folder_my->get_file_infos().begin()->item->get_name() == file->name());
  353. SECTION("then index update is applied") {
  354. auto index_update = proto::IndexUpdate{};
  355. index_update.set_folder(std::string(folder_1->get_id()));
  356. auto file = index_update.add_files();
  357. file->set_name("some-dir-2");
  358. file->set_type(proto::FileInfoType::DIRECTORY);
  359. file->set_sequence(folder_1_peer->get_max_sequence() + 1);
  360. auto v = file->mutable_version();
  361. auto c = v->add_counters();
  362. c->set_id(peer_device->as_uint());
  363. c->set_value(1);
  364. peer_actor->forward(proto::message::IndexUpdate(new proto::IndexUpdate(index_update)));
  365. sup->do_process();
  366. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  367. CHECK(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() ==
  368. r::state_t::OPERATIONAL);
  369. CHECK(folder_peer->get_max_sequence() == file->sequence());
  370. REQUIRE(folder_peer->get_file_infos().size() == 2);
  371. CHECK(folder_peer->get_file_infos().by_name("some-dir-2"));
  372. CHECK(folder_my->get_max_sequence() == 2ul);
  373. REQUIRE(folder_my->get_file_infos().size() == 2);
  374. CHECK(folder_my->get_file_infos().by_name("some-dir-2"));
  375. }
  376. }
  377. }
  378. };
  379. F(true, 10).run();
  380. }
  381. void test_index_sending() {
  382. struct F : fixture_t {
  383. using fixture_t::fixture_t;
  384. void main(diff_builder_t &) noexcept override {
  385. proto::FileInfo pr_file_info;
  386. pr_file_info.set_name("link");
  387. pr_file_info.set_type(proto::FileInfoType::SYMLINK);
  388. pr_file_info.set_symlink_target("/some/where");
  389. auto builder = diff_builder_t(*cluster);
  390. builder.local_update(folder_1->get_id(), pr_file_info);
  391. builder.apply(*sup);
  392. auto folder_1_my = folder_1->get_folder_infos().by_device(*my_device);
  393. auto cc = proto::ClusterConfig{};
  394. auto folder = cc.add_folders();
  395. folder->set_id(std::string(folder_1->get_id()));
  396. auto d_peer = folder->add_devices();
  397. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  398. d_peer->set_max_sequence(folder_1_peer->get_max_sequence());
  399. d_peer->set_index_id(folder_1_peer->get_index());
  400. SECTION("peer has outdated by sequence view") {
  401. auto d_my = folder->add_devices();
  402. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  403. d_my->set_max_sequence(folder_1_my->get_max_sequence() - 1);
  404. d_my->set_index_id(folder_1_my->get_index());
  405. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  406. sup->do_process();
  407. auto &queue = peer_actor->messages;
  408. REQUIRE(queue.size() == 2);
  409. auto msg = &(*queue.front()).payload;
  410. auto &my_index = *std::get<proto::message::Index>(*msg);
  411. REQUIRE(my_index.files_size() == 0);
  412. queue.pop_front();
  413. msg = &(*queue.front()).payload;
  414. auto &my_index_update = *std::get<proto::message::IndexUpdate>(*msg);
  415. REQUIRE(my_index_update.files_size() == 1);
  416. }
  417. SECTION("peer has outdated by index view") {
  418. auto d_my = folder->add_devices();
  419. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  420. d_my->set_max_sequence(folder_1_my->get_max_sequence());
  421. d_my->set_index_id(folder_1_my->get_index() + 5);
  422. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  423. sup->do_process();
  424. auto &queue = peer_actor->messages;
  425. REQUIRE(queue.size() == 2);
  426. auto msg = &(*queue.front()).payload;
  427. auto &my_index = *std::get<proto::message::Index>(*msg);
  428. REQUIRE(my_index.files_size() == 0);
  429. queue.pop_front();
  430. msg = &(*queue.front()).payload;
  431. auto &my_index_update = *std::get<proto::message::IndexUpdate>(*msg);
  432. REQUIRE(my_index_update.files_size() == 1);
  433. }
  434. SECTION("peer has actual view") {
  435. auto d_my = folder->add_devices();
  436. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  437. d_my->set_max_sequence(folder_1_my->get_max_sequence());
  438. d_my->set_index_id(folder_1_my->get_index());
  439. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  440. sup->do_process();
  441. auto &queue = peer_actor->messages;
  442. REQUIRE(queue.size() == 0);
  443. }
  444. }
  445. };
  446. F(true, 10).run();
  447. }
  448. void test_downloading() {
  449. struct F : fixture_t {
  450. using fixture_t::fixture_t;
  451. void main(diff_builder_t &) noexcept override {
  452. auto &folder_infos = folder_1->get_folder_infos();
  453. auto folder_my = folder_infos.by_device(*my_device);
  454. auto cc = proto::ClusterConfig{};
  455. auto folder = cc.add_folders();
  456. folder->set_id(std::string(folder_1->get_id()));
  457. auto d_peer = folder->add_devices();
  458. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  459. d_peer->set_max_sequence(10);
  460. d_peer->set_index_id(folder_1_peer->get_index());
  461. auto d_my = folder->add_devices();
  462. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  463. d_my->set_max_sequence(folder_my->get_max_sequence());
  464. d_my->set_index_id(folder_my->get_index());
  465. SECTION("cluster config & index has a new file => download it") {
  466. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  467. auto index = proto::Index{};
  468. index.set_folder(std::string(folder_1->get_id()));
  469. auto file = index.add_files();
  470. file->set_name("some-file");
  471. file->set_type(proto::FileInfoType::FILE);
  472. file->set_sequence(folder_1_peer->get_max_sequence() + 1);
  473. file->set_block_size(5);
  474. file->set_size(5);
  475. auto version = file->mutable_version();
  476. auto counter = version->add_counters();
  477. counter->set_id(1ul);
  478. counter->set_value(1ul);
  479. auto b1 = file->add_blocks();
  480. b1->set_hash(utils::sha256_digest("12345").value());
  481. b1->set_offset(0);
  482. b1->set_size(5);
  483. auto folder_my = folder_infos.by_device(*my_device);
  484. CHECK(folder_my->get_max_sequence() == 0ul);
  485. CHECK(!folder_my->get_folder()->is_synchronizing());
  486. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  487. sup->do_process();
  488. CHECK(folder_my->get_folder()->is_synchronizing());
  489. peer_actor->push_block("12345", 0);
  490. peer_actor->process_block_requests();
  491. sup->do_process();
  492. CHECK(!folder_my->get_folder()->is_synchronizing());
  493. REQUIRE(folder_my);
  494. CHECK(folder_my->get_max_sequence() == 1ul);
  495. REQUIRE(folder_my->get_file_infos().size() == 1);
  496. auto f = folder_my->get_file_infos().begin()->item;
  497. REQUIRE(f);
  498. CHECK(f->get_name() == file->name());
  499. CHECK(f->get_size() == 5);
  500. CHECK(f->get_blocks().size() == 1);
  501. CHECK(f->is_locally_available());
  502. CHECK(!f->is_locked());
  503. CHECK(peer_actor->blocks_requested == 1);
  504. auto &queue = peer_actor->messages;
  505. REQUIRE(queue.size() > 0);
  506. auto msg = &(*queue.front()).payload;
  507. auto &my_index = *std::get<proto::message::Index>(*msg);
  508. REQUIRE(my_index.files_size() == 0);
  509. queue.pop_front();
  510. msg = &(*queue.back()).payload;
  511. auto &my_index_update = *std::get<proto::message::IndexUpdate>(*msg);
  512. REQUIRE(my_index_update.files_size() == 1);
  513. SECTION("dont redownload file only if metadata has changed") {
  514. auto index_update = proto::IndexUpdate{};
  515. index_update.set_folder(index.folder());
  516. file->set_sequence(folder_1_peer->get_max_sequence() + 1);
  517. counter->set_value(2ul);
  518. *index_update.add_files() = *file;
  519. peer_actor->forward(proto::message::IndexUpdate(new proto::IndexUpdate(index_update)));
  520. sup->do_process();
  521. CHECK(peer_actor->blocks_requested == 1);
  522. CHECK(folder_my->get_max_sequence() == 2ul);
  523. f = folder_my->get_file_infos().begin()->item;
  524. CHECK(f->is_locally_available());
  525. CHECK(f->get_sequence() == 2ul);
  526. }
  527. }
  528. SECTION("download 2 files") {
  529. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  530. auto index = proto::Index{};
  531. index.set_folder(std::string(folder_1->get_id()));
  532. auto file_1 = index.add_files();
  533. file_1->set_name("file-1");
  534. file_1->set_type(proto::FileInfoType::FILE);
  535. file_1->set_sequence(folder_1_peer->get_max_sequence() + 1);
  536. file_1->set_block_size(5);
  537. file_1->set_size(5);
  538. auto version_1 = file_1->mutable_version();
  539. auto counter_1 = version_1->add_counters();
  540. counter_1->set_id(1ul);
  541. counter_1->set_value(1ul);
  542. auto file_2 = index.add_files();
  543. file_2->set_name("file-2");
  544. file_2->set_type(proto::FileInfoType::FILE);
  545. file_2->set_sequence(folder_1_peer->get_max_sequence() + 2);
  546. file_2->set_block_size(5);
  547. file_2->set_size(5);
  548. auto version_2 = file_2->mutable_version();
  549. auto counter_2 = version_2->add_counters();
  550. counter_2->set_id(1ul);
  551. counter_2->set_value(2ul);
  552. auto b1 = file_1->add_blocks();
  553. b1->set_hash(utils::sha256_digest("12345").value());
  554. b1->set_offset(0);
  555. b1->set_size(5);
  556. SECTION("with different blocks") {
  557. auto b2 = file_2->add_blocks();
  558. b2->set_hash(utils::sha256_digest("67890").value());
  559. b2->set_offset(0);
  560. b2->set_size(5);
  561. auto folder_my = folder_infos.by_device(*my_device);
  562. CHECK(folder_my->get_max_sequence() == 0ul);
  563. CHECK(!folder_my->get_folder()->is_synchronizing());
  564. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  565. peer_actor->push_block("12345", 0, file_1->name());
  566. peer_actor->push_block("67890", 0, file_2->name());
  567. sup->do_process();
  568. CHECK(!folder_my->get_folder()->is_synchronizing());
  569. CHECK(peer_actor->blocks_requested == 2);
  570. REQUIRE(folder_my);
  571. CHECK(folder_my->get_max_sequence() == 2ul);
  572. REQUIRE(folder_my->get_file_infos().size() == 2);
  573. {
  574. auto f = folder_my->get_file_infos().by_name(file_1->name());
  575. REQUIRE(f);
  576. CHECK(f->get_size() == 5);
  577. CHECK(f->get_blocks().size() == 1);
  578. CHECK(f->is_locally_available());
  579. CHECK(!f->is_locked());
  580. }
  581. {
  582. auto f = folder_my->get_file_infos().by_name(file_2->name());
  583. REQUIRE(f);
  584. CHECK(f->get_size() == 5);
  585. CHECK(f->get_blocks().size() == 1);
  586. CHECK(f->is_locally_available());
  587. CHECK(!f->is_locked());
  588. }
  589. }
  590. SECTION("with the same block") {
  591. *file_2->add_blocks() = *b1;
  592. auto folder_my = folder_infos.by_device(*my_device);
  593. CHECK(folder_my->get_max_sequence() == 0ul);
  594. CHECK(!folder_my->get_folder()->is_synchronizing());
  595. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  596. peer_actor->push_block("12345", 0, file_1->name());
  597. sup->do_process();
  598. CHECK(!folder_my->get_folder()->is_synchronizing());
  599. CHECK(peer_actor->blocks_requested == 1);
  600. REQUIRE(folder_my);
  601. CHECK(folder_my->get_max_sequence() == 2ul);
  602. REQUIRE(folder_my->get_file_infos().size() == 2);
  603. {
  604. auto f = folder_my->get_file_infos().by_name(file_1->name());
  605. REQUIRE(f);
  606. CHECK(f->get_size() == 5);
  607. CHECK(f->get_blocks().size() == 1);
  608. CHECK(f->is_locally_available());
  609. CHECK(!f->is_locked());
  610. }
  611. {
  612. auto f = folder_my->get_file_infos().by_name(file_2->name());
  613. REQUIRE(f);
  614. CHECK(f->get_size() == 5);
  615. CHECK(f->get_blocks().size() == 1);
  616. CHECK(f->is_locally_available());
  617. CHECK(!f->is_locked());
  618. }
  619. }
  620. SECTION("with the same blocks") {
  621. auto concurrent_writes = GENERATE(1, 5);
  622. cluster->modify_write_requests(concurrent_writes);
  623. *file_2->add_blocks() = *b1;
  624. *file_2->add_blocks() = *b1;
  625. file_2->set_size(10);
  626. auto folder_my = folder_infos.by_device(*my_device);
  627. CHECK(folder_my->get_max_sequence() == 0ul);
  628. CHECK(!folder_my->get_folder()->is_synchronizing());
  629. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  630. peer_actor->push_block("12345", 0, file_1->name());
  631. sup->do_process();
  632. CHECK(!folder_my->get_folder()->is_synchronizing());
  633. CHECK(peer_actor->blocks_requested == 1);
  634. REQUIRE(folder_my);
  635. CHECK(folder_my->get_max_sequence() == 2ul);
  636. REQUIRE(folder_my->get_file_infos().size() == 2);
  637. {
  638. auto f = folder_my->get_file_infos().by_name(file_1->name());
  639. REQUIRE(f);
  640. CHECK(f->get_size() == 5);
  641. CHECK(f->get_blocks().size() == 1);
  642. CHECK(f->is_locally_available());
  643. CHECK(!f->is_locked());
  644. }
  645. {
  646. auto f = folder_my->get_file_infos().by_name(file_2->name());
  647. REQUIRE(f);
  648. CHECK(f->get_size() == 10);
  649. CHECK(f->get_blocks().size() == 2);
  650. CHECK(f->is_locally_available());
  651. CHECK(!f->is_locked());
  652. }
  653. }
  654. }
  655. SECTION("don't attempt to download a file, which is deleted") {
  656. auto folder_peer = folder_infos.by_device(*peer_device);
  657. auto pr_fi = proto::FileInfo{};
  658. pr_fi.set_name("some-file");
  659. pr_fi.set_type(proto::FileInfoType::FILE);
  660. pr_fi.set_sequence(folder_1_peer->get_max_sequence());
  661. pr_fi.set_block_size(5);
  662. pr_fi.set_size(5);
  663. auto version = pr_fi.mutable_version();
  664. auto counter = version->add_counters();
  665. counter->set_id(1ul);
  666. counter->set_value(1ul);
  667. auto b1 = pr_fi.add_blocks();
  668. b1->set_hash(utils::sha256_digest("12345").value());
  669. b1->set_offset(0);
  670. b1->set_size(5);
  671. auto b = model::block_info_t::create(*b1).value();
  672. auto uuid = sup->sequencer->next_uuid();
  673. auto file_info = model::file_info_t::create(uuid, pr_fi, folder_peer).value();
  674. file_info->assign_block(b, 0);
  675. folder_peer->add(file_info, true);
  676. cluster->get_blocks().put(b);
  677. d_peer->set_max_sequence(folder_1_peer->get_max_sequence() + 1);
  678. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  679. sup->do_process();
  680. auto index = proto::IndexUpdate{};
  681. index.set_folder(std::string(folder_1->get_id()));
  682. auto file = index.add_files();
  683. file->set_name("some-file");
  684. file->set_type(proto::FileInfoType::FILE);
  685. file->set_deleted(true);
  686. file->set_sequence(folder_1_peer->get_max_sequence() + 1);
  687. file->set_block_size(0);
  688. file->set_size(0);
  689. auto v = file->mutable_version();
  690. auto c = v->add_counters();
  691. c->set_id(peer_device->as_uint());
  692. c->set_value(1);
  693. peer_actor->forward(proto::message::IndexUpdate(new proto::IndexUpdate(index)));
  694. sup->do_process();
  695. CHECK(folder_my->get_max_sequence() == 1ul);
  696. REQUIRE(folder_my->get_file_infos().size() == 1);
  697. auto f = folder_my->get_file_infos().begin()->item;
  698. REQUIRE(f);
  699. CHECK(f->get_name() == pr_fi.name());
  700. CHECK(f->get_size() == 0);
  701. CHECK(f->get_blocks().size() == 0);
  702. CHECK(f->is_locally_available());
  703. CHECK(f->is_deleted());
  704. CHECK(!f->is_locked());
  705. CHECK(f->get_sequence() == 1ul);
  706. CHECK(peer_actor->blocks_requested == 0);
  707. }
  708. SECTION("new file via index_update => download it") {
  709. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  710. auto index = proto::Index{};
  711. index.set_folder(std::string(folder_1->get_id()));
  712. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  713. auto index_update = proto::IndexUpdate{};
  714. index_update.set_folder(std::string(folder_1->get_id()));
  715. auto file = index_update.add_files();
  716. file->set_name("some-file");
  717. file->set_type(proto::FileInfoType::FILE);
  718. file->set_sequence(folder_1_peer->get_max_sequence() + 1);
  719. file->set_block_size(5);
  720. file->set_size(5);
  721. auto version = file->mutable_version();
  722. auto counter = version->add_counters();
  723. counter->set_id(1);
  724. counter->set_value(peer_device->as_uint());
  725. auto b1 = file->add_blocks();
  726. b1->set_hash(utils::sha256_digest("12345").value());
  727. b1->set_offset(0);
  728. b1->set_size(5);
  729. peer_actor->forward(proto::message::IndexUpdate(new proto::IndexUpdate(index_update)));
  730. peer_actor->push_block("12345", 0);
  731. sup->do_process();
  732. auto folder_my = folder_infos.by_device(*my_device);
  733. CHECK(folder_my->get_max_sequence() == 1);
  734. REQUIRE(folder_my->get_file_infos().size() == 1);
  735. auto f = folder_my->get_file_infos().begin()->item;
  736. REQUIRE(f);
  737. CHECK(f->get_name() == file->name());
  738. CHECK(f->get_size() == 5);
  739. CHECK(f->get_blocks().size() == 1);
  740. CHECK(f->is_locally_available());
  741. CHECK(!f->is_locked());
  742. auto fp = folder_1_peer->get_file_infos().begin()->item;
  743. REQUIRE(fp);
  744. CHECK(!fp->is_locked());
  745. }
  746. SECTION("deleted file, has been restored => download it") {
  747. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  748. sup->do_process();
  749. auto index = proto::Index{};
  750. index.set_folder(std::string(folder_1->get_id()));
  751. auto file_1 = index.add_files();
  752. file_1->set_name("some-file");
  753. file_1->set_type(proto::FileInfoType::FILE);
  754. file_1->set_sequence(folder_1_peer->get_max_sequence());
  755. file_1->set_deleted(true);
  756. auto v1 = file_1->mutable_version();
  757. auto c1 = v1->add_counters();
  758. c1->set_id(1u);
  759. c1->set_value(1u);
  760. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  761. sup->do_process();
  762. CHECK(!folder_my->get_folder()->is_synchronizing());
  763. auto folder_my = folder_infos.by_device(*my_device);
  764. CHECK(folder_my->get_max_sequence() == 1ul);
  765. auto index_update = proto::IndexUpdate{};
  766. index_update.set_folder(std::string(folder_1->get_id()));
  767. auto file_2 = index_update.add_files();
  768. file_2->set_name("some-file");
  769. file_2->set_type(proto::FileInfoType::FILE);
  770. file_2->set_sequence(folder_1_peer->get_max_sequence() + 1);
  771. file_2->set_block_size(128 * 1024);
  772. file_2->set_size(5);
  773. auto v2 = file_2->mutable_version();
  774. auto c2 = v2->add_counters();
  775. c2->set_id(1u);
  776. c2->set_value(2u);
  777. auto b1 = file_2->add_blocks();
  778. b1->set_hash(utils::sha256_digest("12345").value());
  779. b1->set_offset(0);
  780. b1->set_size(5);
  781. peer_actor->forward(proto::message::IndexUpdate(new proto::IndexUpdate(index_update)));
  782. peer_actor->push_block("12345", 0);
  783. sup->do_process();
  784. REQUIRE(folder_my->get_file_infos().size() == 1);
  785. auto f = folder_my->get_file_infos().begin()->item;
  786. REQUIRE(f);
  787. CHECK(f->get_name() == file_1->name());
  788. CHECK(f->get_size() == 5);
  789. CHECK(f->get_blocks().size() == 1);
  790. CHECK(f->is_locally_available());
  791. CHECK(!f->is_locked());
  792. CHECK(!f->is_deleted());
  793. }
  794. SECTION("download a file, which has the same blocks locally") {
  795. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  796. sup->do_process();
  797. auto index = proto::Index{};
  798. index.set_folder(std::string(folder_1->get_id()));
  799. auto file_1 = index.add_files();
  800. file_1->set_name("some-file");
  801. file_1->set_type(proto::FileInfoType::FILE);
  802. file_1->set_sequence(folder_1_peer->get_max_sequence());
  803. auto v1 = file_1->mutable_version();
  804. auto c1 = v1->add_counters();
  805. c1->set_id(1u);
  806. c1->set_value(1u);
  807. file_1->set_block_size(5);
  808. file_1->set_size(10);
  809. auto b1 = file_1->add_blocks();
  810. b1->set_hash(utils::sha256_digest("12345").value());
  811. b1->set_offset(0);
  812. b1->set_size(5);
  813. auto bi_1 = model::block_info_t::create(*b1).value();
  814. auto b2 = file_1->add_blocks();
  815. b2->set_hash(utils::sha256_digest("67890").value());
  816. b2->set_offset(5);
  817. b2->set_size(5);
  818. auto bi_2 = model::block_info_t::create(*b2).value();
  819. auto &blocks = cluster->get_blocks();
  820. blocks.put(bi_1);
  821. blocks.put(bi_2);
  822. auto pr_my = proto::FileInfo{};
  823. pr_my.set_name("some-file.source");
  824. pr_my.set_type(proto::FileInfoType::FILE);
  825. pr_my.set_sequence(2ul);
  826. pr_my.set_block_size(5);
  827. pr_my.set_size(5);
  828. auto uuid = sup->sequencer->next_uuid();
  829. auto file_my = model::file_info_t::create(uuid, pr_my, folder_my).value();
  830. file_my->assign_block(bi_1, 0);
  831. file_my->mark_local_available(0);
  832. folder_my->add(file_my, true);
  833. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  834. peer_actor->push_block("67890", 1);
  835. cluster->modify_write_requests(10);
  836. sup->do_process();
  837. REQUIRE(folder_my->get_file_infos().size() == 2);
  838. auto f = folder_my->get_file_infos().by_name(file_1->name());
  839. REQUIRE(f);
  840. CHECK(f->get_name() == file_1->name());
  841. CHECK(f->get_size() == 10);
  842. CHECK(f->get_blocks().size() == 2);
  843. CHECK(f->is_locally_available());
  844. CHECK(!f->is_locked());
  845. }
  846. }
  847. };
  848. F(true, 10).run();
  849. }
  850. void test_downloading_errors() {
  851. struct F : fixture_t {
  852. using fixture_t::fixture_t;
  853. void main(diff_builder_t &) noexcept override {
  854. auto &folder_infos = folder_1->get_folder_infos();
  855. auto folder_my = folder_infos.by_device(*my_device);
  856. auto cc = proto::ClusterConfig{};
  857. auto folder = cc.add_folders();
  858. folder->set_id(std::string(folder_1->get_id()));
  859. auto d_peer = folder->add_devices();
  860. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  861. d_peer->set_max_sequence(folder_1_peer->get_max_sequence() + 1);
  862. d_peer->set_index_id(folder_1_peer->get_index());
  863. auto d_my = folder->add_devices();
  864. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  865. d_my->set_max_sequence(folder_my->get_max_sequence());
  866. d_my->set_index_id(folder_my->get_index());
  867. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  868. auto index = proto::Index{};
  869. index.set_folder(std::string(folder_1->get_id()));
  870. auto file = index.add_files();
  871. file->set_name("some-file");
  872. file->set_type(proto::FileInfoType::FILE);
  873. file->set_sequence(folder_1_peer->get_max_sequence() + 1);
  874. file->set_block_size(5);
  875. file->set_size(5);
  876. auto version = file->mutable_version();
  877. auto counter = version->add_counters();
  878. counter->set_id(1ul);
  879. counter->set_value(1ul);
  880. auto b1 = file->add_blocks();
  881. b1->set_hash(utils::sha256_digest("12345").value());
  882. b1->set_offset(0);
  883. b1->set_size(5);
  884. CHECK(folder_my->get_max_sequence() == 0ul);
  885. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  886. SECTION("general error, ok, do not shutdown") {
  887. auto ec = utils::make_error_code(utils::request_error_code_t::generic);
  888. peer_actor->push_block(ec, 0);
  889. }
  890. SECTION("hash mismatch, do not shutdown") { peer_actor->push_block("zzz", 0); }
  891. sup->do_process();
  892. CHECK(peer_actor->blocks_requested == 1);
  893. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  894. auto folder_peer = folder_infos.by_device(*peer_device);
  895. REQUIRE(folder_peer->get_file_infos().size() == 1);
  896. auto f = folder_peer->get_file_infos().begin()->item;
  897. REQUIRE(f);
  898. CHECK(f->is_unreachable());
  899. CHECK(!f->is_locally_locked());
  900. CHECK(!f->is_locked());
  901. CHECK(!f->local_file());
  902. CHECK(!folder_my->get_folder()->is_synchronizing());
  903. sup->do_process();
  904. }
  905. };
  906. F(true, 10).run();
  907. }
  908. void test_download_from_scratch() {
  909. struct F : fixture_t {
  910. using fixture_t::fixture_t;
  911. void main(diff_builder_t &) noexcept override {
  912. sup->do_process();
  913. auto builder = diff_builder_t(*cluster);
  914. auto sha256 = peer_device->device_id().get_sha256();
  915. auto cc = proto::ClusterConfig{};
  916. auto folder = cc.add_folders();
  917. folder->set_id(std::string(folder_1->get_id()));
  918. auto d_peer = folder->add_devices();
  919. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  920. d_peer->set_max_sequence(15);
  921. d_peer->set_index_id(12345);
  922. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  923. sup->do_process();
  924. builder.share_folder(sha256, folder_1->get_id()).apply(*sup);
  925. auto index = proto::Index{};
  926. index.set_folder(std::string(folder_1->get_id()));
  927. auto file = index.add_files();
  928. file->set_name("some-file");
  929. file->set_type(proto::FileInfoType::FILE);
  930. file->set_sequence(154);
  931. file->set_block_size(5);
  932. file->set_size(5);
  933. auto version = file->mutable_version();
  934. auto counter = version->add_counters();
  935. counter->set_id(1ul);
  936. counter->set_value(1ul);
  937. auto b1 = file->add_blocks();
  938. b1->set_hash(utils::sha256_digest("12345").value());
  939. b1->set_offset(0);
  940. b1->set_size(5);
  941. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  942. peer_actor->push_block("12345", 0, file->name());
  943. sup->do_process();
  944. auto folder_my = folder_1->get_folder_infos().by_device(*my_device);
  945. CHECK(folder_my->get_max_sequence() == 1ul);
  946. CHECK(!folder_my->get_folder()->is_synchronizing());
  947. auto f = folder_my->get_file_infos().by_name(file->name());
  948. REQUIRE(f);
  949. CHECK(f->get_size() == 5);
  950. CHECK(f->get_blocks().size() == 1);
  951. CHECK(f->is_locally_available());
  952. CHECK(!f->is_locked());
  953. }
  954. };
  955. F(false, 10, false).run();
  956. }
  957. void test_my_sharing() {
  958. struct F : fixture_t {
  959. using fixture_t::fixture_t;
  960. void main(diff_builder_t &) noexcept override {
  961. sup->do_process();
  962. auto cc = proto::ClusterConfig{};
  963. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  964. // nothing is shared
  965. sup->do_process();
  966. REQUIRE(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  967. REQUIRE(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::OPERATIONAL);
  968. REQUIRE(peer_actor->messages.size() == 1);
  969. auto peer_msg = &peer_actor->messages.front()->payload;
  970. auto peer_cluster_msg = std::get_if<proto::message::ClusterConfig>(peer_msg);
  971. REQUIRE(peer_cluster_msg);
  972. REQUIRE(*peer_cluster_msg);
  973. REQUIRE((*peer_cluster_msg)->folders_size() == 0);
  974. // share folder_1
  975. peer_actor->messages.clear();
  976. auto sha256 = peer_device->device_id().get_sha256();
  977. diff_builder_t(*cluster).share_folder(sha256, folder_1->get_id()).apply(*sup);
  978. REQUIRE(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  979. REQUIRE(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::OPERATIONAL);
  980. REQUIRE(peer_actor->messages.size() == 1);
  981. peer_msg = &peer_actor->messages.front()->payload;
  982. peer_cluster_msg = std::get_if<proto::message::ClusterConfig>(peer_msg);
  983. REQUIRE(peer_cluster_msg);
  984. REQUIRE(*peer_cluster_msg);
  985. REQUIRE((*peer_cluster_msg)->folders_size() == 1);
  986. // unshare folder_1
  987. auto peer_fi = folder_1->get_folder_infos().by_device(*peer_device);
  988. peer_actor->messages.clear();
  989. diff_builder_t(*cluster).unshare_folder(*peer_fi).apply(*sup);
  990. REQUIRE(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::OPERATIONAL);
  991. REQUIRE(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::OPERATIONAL);
  992. REQUIRE(peer_actor->messages.size() == 1);
  993. peer_msg = &peer_actor->messages.front()->payload;
  994. peer_cluster_msg = std::get_if<proto::message::ClusterConfig>(peer_msg);
  995. REQUIRE(peer_cluster_msg);
  996. REQUIRE(*peer_cluster_msg);
  997. REQUIRE((*peer_cluster_msg)->folders_size() == 0);
  998. }
  999. };
  1000. F(false, 10, false).run();
  1001. }
  1002. void test_sending_index_updates() {
  1003. struct F : fixture_t {
  1004. using fixture_t::fixture_t;
  1005. void main(diff_builder_t &) noexcept override {
  1006. auto &folder_infos = folder_1->get_folder_infos();
  1007. auto folder_my = folder_infos.by_device(*my_device);
  1008. auto cc = proto::ClusterConfig{};
  1009. auto folder = cc.add_folders();
  1010. folder->set_id(std::string(folder_1->get_id()));
  1011. auto d_peer = folder->add_devices();
  1012. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  1013. d_peer->set_max_sequence(folder_1_peer->get_max_sequence());
  1014. d_peer->set_index_id(folder_1_peer->get_index());
  1015. auto d_my = folder->add_devices();
  1016. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  1017. d_my->set_max_sequence(folder_my->get_max_sequence());
  1018. d_my->set_index_id(folder_my->get_index());
  1019. auto index = proto::Index{};
  1020. auto folder_id = std::string(folder_1->get_id());
  1021. index.set_folder(folder_id);
  1022. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  1023. peer_actor->forward(proto::message::Index(new proto::Index(index)));
  1024. sup->do_process();
  1025. auto builder = diff_builder_t(*cluster);
  1026. auto pr_file = proto::FileInfo();
  1027. pr_file.set_name("a.txt");
  1028. peer_actor->messages.clear();
  1029. builder.local_update(folder_id, pr_file).apply(*sup);
  1030. REQUIRE(peer_actor->messages.size() == 1);
  1031. auto &msg = peer_actor->messages.front();
  1032. auto &index_update = *std::get<proto::message::IndexUpdate>(msg->payload);
  1033. REQUIRE(index_update.files_size() == 1);
  1034. CHECK(index_update.files(0).name() == "a.txt");
  1035. }
  1036. };
  1037. F(true, 10).run();
  1038. }
  1039. void test_uploading() {
  1040. struct F : fixture_t {
  1041. using fixture_t::fixture_t;
  1042. void main(diff_builder_t &) noexcept override {
  1043. auto &folder_infos = folder_1->get_folder_infos();
  1044. auto folder_my = folder_infos.by_device(*my_device);
  1045. auto cc = proto::ClusterConfig{};
  1046. auto folder = cc.add_folders();
  1047. folder->set_id(std::string(folder_1->get_id()));
  1048. auto d_peer = folder->add_devices();
  1049. d_peer->set_id(std::string(peer_device->device_id().get_sha256()));
  1050. d_peer->set_max_sequence(folder_1_peer->get_max_sequence());
  1051. d_peer->set_index_id(folder_1_peer->get_index());
  1052. auto d_my = folder->add_devices();
  1053. d_my->set_id(std::string(my_device->device_id().get_sha256()));
  1054. d_my->set_max_sequence(folder_my->get_max_sequence());
  1055. d_my->set_index_id(folder_my->get_index());
  1056. auto pr_fi = proto::FileInfo{};
  1057. pr_fi.set_name("data.bin");
  1058. pr_fi.set_type(proto::FileInfoType::FILE);
  1059. pr_fi.set_sequence(folder_1_peer->get_max_sequence());
  1060. pr_fi.set_block_size(5);
  1061. pr_fi.set_size(5);
  1062. auto version = pr_fi.mutable_version();
  1063. auto counter = version->add_counters();
  1064. counter->set_id(1);
  1065. counter->set_value(my_device->as_uint());
  1066. auto b1 = pr_fi.add_blocks();
  1067. b1->set_hash(utils::sha256_digest("12345").value());
  1068. b1->set_offset(0);
  1069. b1->set_size(5);
  1070. auto b = model::block_info_t::create(*b1).value();
  1071. auto uuid = sup->sequencer->next_uuid();
  1072. auto file_info = model::file_info_t::create(uuid, pr_fi, folder_my).value();
  1073. file_info->assign_block(b, 0);
  1074. folder_my->add(file_info, true);
  1075. auto req = proto::Request();
  1076. req.set_id(1);
  1077. req.set_folder(std::string(folder_1->get_id()));
  1078. req.set_name("data.bin");
  1079. req.set_offset(0);
  1080. req.set_size(5);
  1081. peer_actor->forward(proto::message::ClusterConfig(new proto::ClusterConfig(cc)));
  1082. SECTION("upload regular file, no hash") {
  1083. peer_actor->forward(proto::message::Request(new proto::Request(req)));
  1084. auto req_ptr = proto::message::Request(new proto::Request(req));
  1085. auto res = r::make_message<fs::payload::block_response_t>(target->get_address(), std::move(req_ptr),
  1086. sys::error_code{}, std::string("12345"));
  1087. block_responses.push_back(res);
  1088. sup->do_process();
  1089. REQUIRE(block_requests.size() == 1);
  1090. CHECK(block_requests[0]->payload.remote_request->id() == 1);
  1091. CHECK(block_requests[0]->payload.remote_request->name() == "data.bin");
  1092. REQUIRE(peer_actor->uploaded_blocks.size() == 1);
  1093. auto &peer_res = *peer_actor->uploaded_blocks.front();
  1094. CHECK(peer_res.id() == 1);
  1095. CHECK(peer_res.code() == proto::ErrorCode::NO_BEP_ERROR);
  1096. CHECK(peer_res.data() == "12345");
  1097. }
  1098. }
  1099. };
  1100. F(true, 10).run();
  1101. }
  1102. void test_peer_removal() {
  1103. struct F : fixture_t {
  1104. using fixture_t::fixture_t;
  1105. void main(diff_builder_t &builder) noexcept override {
  1106. builder.remove_peer(*peer_device).apply(*sup);
  1107. CHECK(static_cast<r::actor_base_t *>(target.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  1108. CHECK(static_cast<r::actor_base_t *>(peer_actor.get())->access<to::state>() == r::state_t::SHUT_DOWN);
  1109. CHECK(target->get_shutdown_reason()->root()->ec == utils::error_code_t::peer_has_been_removed);
  1110. }
  1111. };
  1112. F(true, 10).run();
  1113. }
  1114. int _init() {
  1115. REGISTER_TEST_CASE(test_startup, "test_startup", "[net]");
  1116. REGISTER_TEST_CASE(test_index_receiving, "test_index_receiving", "[net]");
  1117. REGISTER_TEST_CASE(test_index_sending, "test_index_sending", "[net]");
  1118. REGISTER_TEST_CASE(test_downloading, "test_downloading", "[net]");
  1119. REGISTER_TEST_CASE(test_downloading_errors, "test_downloading_errors", "[net]");
  1120. REGISTER_TEST_CASE(test_download_from_scratch, "test_download_from_scratch", "[net]");
  1121. REGISTER_TEST_CASE(test_my_sharing, "test_my_sharing", "[net]");
  1122. REGISTER_TEST_CASE(test_sending_index_updates, "test_sending_index_updates", "[net]");
  1123. REGISTER_TEST_CASE(test_uploading, "test_uploading", "[net]");
  1124. REGISTER_TEST_CASE(test_peer_removal, "test_peer_removal", "[net]");
  1125. return 1;
  1126. }
  1127. static int v = _init();