5 Комити 074dd076bf ... c311cf78e0

Аутор SHA1 Порука Датум
  Ivan Baidakou c311cf78e0 Merge branch 'controller-concurrency' into v0.4.3-dev пре 4 дана
  Ivan Baidakou bac1ae15b5 tests: add moar checks for concurrent downloading пре 4 дана
  Ivan Baidakou f4b54ae425 core: support concurrent blocks downloading пре 4 дана
  Ivan Baidakou c8ee8a4075 tests: add concurrency controllers test пре 4 дана
  Ivan Baidakou 790fc08ed4 core: add path_lock пре 4 дана

+ 15 - 1
src/model/cluster.cpp

@@ -3,11 +3,16 @@
 
 #include "cluster.h"
 #include "proto/proto-helpers.h"
-#include <spdlog/spdlog.h>
 
 using namespace syncspirit;
 using namespace syncspirit::model;
 
+path_guard_t::~path_guard_t() {
+    if (cluster) {
+        cluster->locked_paths.erase(path);
+    }
+}
+
 cluster_t::cluster_t(device_ptr_t device_, int32_t write_requests_) noexcept
     : device(device_), tainted{false}, write_requests{write_requests_} {
     path_cache.reset(new path_cache_t());
@@ -56,6 +61,15 @@ auto cluster_t::get_path_cache() noexcept -> path_cache_t & { return *path_cache
 
 int32_t cluster_t::get_write_requests() const noexcept { return write_requests; }
 
+bool cluster_t::is_locked(path_t *path) noexcept { return locked_paths.contains(path); }
+
+path_guard_t cluster_t::lock(path_t *path) noexcept {
+    assert(locked_paths.count(path) == 0);
+    auto guard = path_guard_t(this, path);
+    locked_paths.emplace(path);
+    return guard;
+}
+
 void cluster_t::modify_write_requests(int32_t delta) noexcept {
     write_requests += delta;
     assert(write_requests >= 0);

+ 28 - 1
src/model/cluster.h

@@ -12,11 +12,26 @@
 #include "pending_device.h"
 #include "pending_folder.h"
 #include "block_info.h"
+#include <unordered_set>
 
 namespace syncspirit::model {
 
-struct SYNCSPIRIT_API cluster_t final : arc_base_t<cluster_t> {
+struct SYNCSPIRIT_API path_guard_t {
+    path_guard_t() noexcept = default;
+    path_guard_t(cluster_t *cluster_, path_ptr_t path_) noexcept : path(path_), cluster{cluster_} {}
+    path_guard_t(path_guard_t &&) = default;
+    path_guard_t(const path_guard_t &) = delete;
+    ~path_guard_t();
+
+    path_guard_t &operator=(path_guard_t &&) noexcept = default;
+
+    inline operator bool() const noexcept { return path.get(); }
+
+    path_ptr_t path = {};
+    cluster_t *cluster = {};
+};
 
+struct SYNCSPIRIT_API cluster_t final : arc_base_t<cluster_t> {
     cluster_t(device_ptr_t device_, int32_t write_requests) noexcept;
     cluster_t(const cluster_t &) = delete;
 
@@ -41,8 +56,17 @@ struct SYNCSPIRIT_API cluster_t final : arc_base_t<cluster_t> {
     inline void mark_tainted() noexcept { tainted = true; }
     int32_t get_write_requests() const noexcept;
     void modify_write_requests(int32_t delta) noexcept;
+    path_guard_t lock(path_t *path) noexcept;
+    bool is_locked(path_t *path) noexcept;
 
   private:
+    struct path_hasher_t {
+        using is_transparent = void;
+        inline size_t operator()(const path_ptr_t &path) const noexcept { return reinterpret_cast<size_t>(path.get()); }
+        inline size_t operator()(const path_t *file) const noexcept { return reinterpret_cast<size_t>(file); }
+    };
+
+    using locked_paths_t = std::unordered_set<path_ptr_t, path_hasher_t>;
     device_ptr_t device;
     folders_map_t folders;
     block_infos_map_t blocks;
@@ -52,8 +76,11 @@ struct SYNCSPIRIT_API cluster_t final : arc_base_t<cluster_t> {
     pending_folder_map_t pending_folders;
     pending_devices_map_t pending_devices;
     path_cache_ptr_t path_cache;
+    locked_paths_t locked_paths;
     bool tainted = false;
     int32_t write_requests;
+
+    friend struct path_guard_t;
 };
 
 using cluster_ptr_t = intrusive_ptr_t<cluster_t>;

+ 1 - 1
src/model/diff/peer/update_folder.cpp

@@ -22,7 +22,7 @@ update_folder_t::update_folder_t(std::string_view folder_id_, utils::bytes_view_
                                  uuids_t uuids, blocks_t blocks, orphaned_blocks_t::set_t removed_blocks) noexcept
     : folder_id{std::string(folder_id_)}, peer_id{peer_id_.begin(), peer_id_.end()}, files(std::move(files_)),
       uuids{std::move(uuids)} {
-    LOG_DEBUG(log, "update_folder_t, folder = {}", folder_id);
+    LOG_DEBUG(log, "update_folder_t, folder = {}, files: {}, blocks = {}", folder_id, files.size(), blocks.size());
     auto current = (cluster_diff_t *)(nullptr);
     if (!blocks.empty()) {
         current = assign_child(new modify::add_blocks_t(std::move(blocks)));

+ 3 - 0
src/model/file_info.cpp

@@ -89,6 +89,9 @@ static void fill(unsigned char *key, const bu::uuid &uuid, const folder_info_ptr
 file_info_t::guard_t::guard_t(file_info_t &file_, const folder_info_t *folder_info_) noexcept
     : file{&file_}, folder_info{folder_info_} {
     file_.synchronizing_lock();
+    auto cluster = folder_info->get_folder()->get_cluster();
+    path_guard = std::make_unique<path_guard_t>(cluster->lock(file->get_name().get()));
+    assert(path_guard && *path_guard);
 }
 
 file_info_t::guard_t::~guard_t() {

+ 3 - 1
src/model/file_info.h

@@ -4,7 +4,6 @@
 #pragma once
 
 #include <cstdint>
-#include <vector>
 #include <unordered_set>
 #include <filesystem>
 #include <boost/outcome.hpp>
@@ -34,6 +33,7 @@ struct file_info_t;
 using file_info_ptr_t = intrusive_ptr_t<file_info_t>;
 
 struct path_cache_t;
+struct path_guard_t;
 
 struct SYNCSPIRIT_API file_info_t {
 
@@ -63,6 +63,7 @@ struct SYNCSPIRIT_API file_info_t {
     };
 
     struct guard_t {
+        using path_guard_ptr_t = std::unique_ptr<path_guard_t>;
         guard_t() noexcept = default;
         guard_t(file_info_t &file, const folder_info_t *folder_info) noexcept;
         guard_t(const guard_t &) = delete;
@@ -72,6 +73,7 @@ struct SYNCSPIRIT_API file_info_t {
         guard_t &operator=(guard_t &&) noexcept = default;
 
         file_info_ptr_t file;
+        path_guard_ptr_t path_guard;
         const folder_info_t *folder_info;
     };
 

+ 10 - 5
src/model/misc/file_iterator.cpp

@@ -116,11 +116,16 @@ auto file_iterator_t::next() noexcept -> result_t {
             auto it = queue->begin();
             while (it != queue->end()) {
                 auto &file = **it;
-                it = queue->erase(it);
-                auto local_file = local_files.by_name(file.get_name()->get_full_name());
-                auto action = resolve(file, local_file.get(), local_folder);
-                if (action != advance_action_t::ignore) {
-                    return std::make_tuple(&file, &peer_folder, action);
+                if (cluster.is_locked(file.get_name().get())) {
+                    ++it;
+                    continue;
+                } else {
+                    it = queue->erase(it);
+                    auto local_file = local_files.by_name(file.get_name()->get_full_name());
+                    auto action = resolve(file, local_file.get(), local_folder);
+                    if (action != advance_action_t::ignore) {
+                        return std::make_tuple(&file, &peer_folder, action);
+                    }
                 }
             }
         }

+ 8 - 6
src/net/controller_actor.cpp

@@ -219,13 +219,15 @@ void controller_actor_t::on_tx_signal(message::tx_signal_t &) noexcept {
 }
 
 void controller_actor_t::on_peer_down(message::peer_down_t &message) noexcept {
-    if (resources->has(resource::peer)) {
-        resources->release(resource::peer);
-        peer_address.reset();
+    if (message.payload.peer == peer_address) {
+        if (resources->has(resource::peer)) {
+            resources->release(resource::peer);
+            peer_address.reset();
+        }
+        auto &ee = message.payload.ee;
+        LOG_TRACE(log, "on_peer_down reason: {}", ee->message());
+        do_shutdown(ee);
     }
-    auto &ee = message.payload.ee;
-    LOG_TRACE(log, "on_peer_down reason: {}", ee->message());
-    do_shutdown(ee);
 }
 
 void controller_actor_t::push_pending() noexcept {

+ 1 - 0
src/net/messages.h

@@ -138,6 +138,7 @@ struct controller_predown_t {
 };
 
 struct peer_down_t {
+    r::address_ptr_t peer;
     r::extended_error_ptr_t ee;
 };
 

+ 2 - 2
src/net/peer_actor.cpp

@@ -259,7 +259,7 @@ void peer_actor_t::shutdown_start() noexcept {
         r::actor_base_t::cancel_timer(*rx_timer_request);
     }
     if (controller) {
-        send<payload::peer_down_t>(controller, shutdown_reason);
+        send<payload::peer_down_t>(controller, address, shutdown_reason);
     }
 
     auto timeout = shutdown_timeout * 8 / 9;
@@ -284,7 +284,7 @@ void peer_actor_t::shutdown_finish() noexcept {
     }
     block_requests.clear();
     if (controller) {
-        send<payload::peer_down_t>(controller, shutdown_reason);
+        send<payload::peer_down_t>(controller, address, shutdown_reason);
     }
     r::actor_base_t::shutdown_finish();
     auto sha256 = peer_device_id.get_sha256();

+ 0 - 0
tests/031-diff-cluster_update.cpp


Неке датотеке нису приказане због велике количине промена