From 0ace09076be9ce81f7e5f50015e83c82ac1d3d98 Mon Sep 17 00:00:00 2001 From: elestrias Date: Thu, 17 Feb 2022 16:36:33 +0300 Subject: [PATCH 1/9] Recconct socket && worker ping Signed-off-by: elestrias --- core/api/rpc/wsc.cpp | 22 ++++++++++++++++++- core/api/rpc/wsc.hpp | 13 +++++++++++ core/sector_storage/impl/local_worker.cpp | 3 +++ core/sector_storage/impl/local_worker.hpp | 2 ++ core/sector_storage/impl/remote_worker.cpp | 6 +++++ core/sector_storage/impl/remote_worker.hpp | 2 ++ core/sector_storage/impl/scheduler_impl.cpp | 18 ++++++++++++--- core/sector_storage/worker.hpp | 2 ++ .../mocks/sector_storage/worker_mock.hpp | 1 + 9 files changed, 65 insertions(+), 4 deletions(-) diff --git a/core/api/rpc/wsc.cpp b/core/api/rpc/wsc.cpp index 25a6e61f67..da000534ef 100644 --- a/core/api/rpc/wsc.cpp +++ b/core/api/rpc/wsc.cpp @@ -56,6 +56,7 @@ namespace fc::api::rpc { })); } socket.handshake(host, target, ec); + client_data = ClientData{host, port, target, token}; if (ec) { return ec; } @@ -87,10 +88,11 @@ namespace fc::api::rpc { } } chans.clear(); + reconnect(3, std::chrono::seconds(5)); } void Client::_flush() { - if (!writing && !write_queue.empty()) { + if (!writing && !write_queue.empty() && !reconnecting){ auto &[id, buffer] = write_queue.front(); writing = true; socket.async_write(boost::asio::buffer(buffer.data(), buffer.size()), @@ -185,4 +187,22 @@ namespace fc::api::rpc { } } } + + void Client::reconnect(int counter, std::chrono::milliseconds wait) { + if(reconnecting.exchange(true)) return; + logger_->info("Starting reconnect to {}:{}", client_data.host, client_data.port); + for(int i = 0; i < counter; i++){ + std::this_thread::sleep_for(wait*(i+1)); + auto res = connect(client_data.host, + client_data.port, + client_data.target, + client_data.token); + if(!res.has_error()) { + break; + } + } + reconnecting.store(false); + logger_->info("Reconnect to {}:{} was successful", client_data.host, client_data.port); + _flush(); + } } // namespace fc::api::rpc diff --git a/core/api/rpc/wsc.hpp b/core/api/rpc/wsc.hpp index 4c60cbf837..05e1e5429e 100644 --- a/core/api/rpc/wsc.hpp +++ b/core/api/rpc/wsc.hpp @@ -57,6 +57,18 @@ namespace fc::api::rpc { void setup(A &api) { visit(api, [&](auto &m) { _setup(*this, m); }); } + struct ClientData { + ClientData(std::string host, + std::string port, + std::string target, + std::string token) + : host(host), port(port), target(target), token(token){}; + ClientData() = default; + + std::string host, port, target, token; + }client_data; + + void reconnect(int counter, std::chrono::milliseconds wait); private: std::thread thread; @@ -72,6 +84,7 @@ namespace fc::api::rpc { std::map chans; std::queue> write_queue; bool writing{false}; + std::atomic reconnecting; template void _setup(Client &c, M &m); diff --git a/core/sector_storage/impl/local_worker.cpp b/core/sector_storage/impl/local_worker.cpp index 1ae299c4c7..8461c10c01 100644 --- a/core/sector_storage/impl/local_worker.cpp +++ b/core/sector_storage/impl/local_worker.cpp @@ -904,4 +904,7 @@ namespace fc::sector_storage { return call_id; } + void LocalWorker::ping(std::function cb) { + cb(true); + } } // namespace fc::sector_storage diff --git a/core/sector_storage/impl/local_worker.hpp b/core/sector_storage/impl/local_worker.hpp index d65c95811e..432fd5839c 100644 --- a/core/sector_storage/impl/local_worker.hpp +++ b/core/sector_storage/impl/local_worker.hpp @@ -99,6 +99,8 @@ namespace fc::sector_storage { outcome::result> getAccessiblePaths() override; + void ping(std::function cb) override; + private: template outcome::result asyncCall(const SectorRef §or, diff --git a/core/sector_storage/impl/remote_worker.cpp b/core/sector_storage/impl/remote_worker.cpp index a5d0e9063a..8254891312 100644 --- a/core/sector_storage/impl/remote_worker.cpp +++ b/core/sector_storage/impl/remote_worker.cpp @@ -254,4 +254,10 @@ namespace fc::sector_storage { AcquireMode mode) { return api_.Fetch(sector, file_type, path_type, mode); } + + void RemoteWorker::ping(std::function cb) { + api_.Version([=](auto res){ + cb(!res.has_error()); + }); + } } // namespace fc::sector_storage diff --git a/core/sector_storage/impl/remote_worker.hpp b/core/sector_storage/impl/remote_worker.hpp index 8cda77388c..53b3eeaa29 100644 --- a/core/sector_storage/impl/remote_worker.hpp +++ b/core/sector_storage/impl/remote_worker.hpp @@ -39,6 +39,8 @@ namespace fc::sector_storage { const SectorRef §or, const PreCommit1Output &pre_commit_1_output) override; + void ping(std::function cb) override; + outcome::result sealCommit1(const SectorRef §or, const SealRandomness &ticket, const InteractiveRandomness &seed, diff --git a/core/sector_storage/impl/scheduler_impl.cpp b/core/sector_storage/impl/scheduler_impl.cpp index 63337fbfce..3f651a9e3e 100644 --- a/core/sector_storage/impl/scheduler_impl.cpp +++ b/core/sector_storage/impl/scheduler_impl.cpp @@ -196,10 +196,22 @@ namespace fc::sector_storage { return SchedulerErrors::kCannotSelectWorker; } - WorkerID wid = acceptable[0]; - + std::promise wid_promise; + std::future wid_future = wid_promise.get_future(); + auto done = std::make_shared(); + for (const auto &cur : acceptable) { + workers_[cur]->worker->ping([&wid_promise, done, cur](const bool &resp) { + if (resp && !done->exchange(true)) { + wid_promise.set_value(cur); + } + }); + } + auto status = wid_future.wait_for(std::chrono::seconds(5)); + if(status == std::future_status::timeout){ + return false; + } + WorkerID wid = wid_future.get(); assignWorker(wid, workers_[wid], request); - return true; } diff --git a/core/sector_storage/worker.hpp b/core/sector_storage/worker.hpp index f1b005c441..04ea23b0b4 100644 --- a/core/sector_storage/worker.hpp +++ b/core/sector_storage/worker.hpp @@ -142,6 +142,8 @@ namespace fc::sector_storage { virtual outcome::result> getAccessiblePaths() = 0; + + virtual void ping(std::function cb) = 0; }; enum class CallErrorCode : uint64_t { diff --git a/test/testutil/mocks/sector_storage/worker_mock.hpp b/test/testutil/mocks/sector_storage/worker_mock.hpp index c5af9a565a..0af2e2454c 100644 --- a/test/testutil/mocks/sector_storage/worker_mock.hpp +++ b/test/testutil/mocks/sector_storage/worker_mock.hpp @@ -101,5 +101,6 @@ namespace fc::sector_storage { gsl::span, const UnpaddedPieceSize &, int)); + MOCK_METHOD1(ping, void(std::function)); }; } // namespace fc::sector_storage From 59e534cd6ef0b20269423dbeb4486dc57d1c81fb Mon Sep 17 00:00:00 2001 From: elestrias Date: Fri, 18 Feb 2022 15:47:14 +0300 Subject: [PATCH 2/9] workerfix Signed-off-by: elestrias --- test/core/sector_storage/scheduler_test.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/test/core/sector_storage/scheduler_test.cpp b/test/core/sector_storage/scheduler_test.cpp index f97895edca..fb9fdfc489 100644 --- a/test/core/sector_storage/scheduler_test.cpp +++ b/test/core/sector_storage/scheduler_test.cpp @@ -12,6 +12,7 @@ #include "storage/in_memory/in_memory_storage.hpp" #include "testutil/mocks/sector_storage/selector_mock.hpp" #include "testutil/outcome.hpp" +#include "testutil/mocks/sector_storage/worker_mock.hpp" namespace fc::sector_storage { using primitives::WorkerInfo; @@ -70,9 +71,12 @@ namespace fc::sector_storage { EXPECT_OUTCOME_TRUE(scheduler, SchedulerImpl::newScheduler(io_, kv_)); scheduler_ = scheduler; - + auto worker_test = std::make_shared(); + EXPECT_CALL(*worker_test, ping(_)).WillRepeatedly(testing::Invoke([](auto cb){ + cb(true); + })); std::unique_ptr worker = std::make_unique(); - + worker->worker = std::move(worker_test); worker_name_ = "worker"; worker->info = WorkerInfo{ From 16b8cd5c9be3a5f150fd61cdcd35bdbe2e28f179 Mon Sep 17 00:00:00 2001 From: elestrias Date: Sun, 20 Feb 2022 19:42:27 +0300 Subject: [PATCH 3/9] recoonect fix Signed-off-by: elestrias --- core/api/rpc/wsc.cpp | 21 ++++++++++++--------- core/api/rpc/wsc.hpp | 4 ++-- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/core/api/rpc/wsc.cpp b/core/api/rpc/wsc.cpp index da000534ef..019eab97c1 100644 --- a/core/api/rpc/wsc.cpp +++ b/core/api/rpc/wsc.cpp @@ -42,20 +42,20 @@ namespace fc::api::rpc { const std::string &target, const std::string &token) { boost::system::error_code ec; - socket.next_layer().connect({boost::asio::ip::make_address(host), + socket->next_layer().connect({boost::asio::ip::make_address(host), boost::lexical_cast(port)}, ec); if (ec) { return ec; } if (not token.empty()) { - socket.set_option( + socket->set_option( boost::beast::websocket::stream_base::decorator([&](auto &req) { req.set(boost::beast::http::field::authorization, "Bearer " + token); })); } - socket.handshake(host, target, ec); + socket->handshake(host, target, ec); client_data = ClientData{host, port, target, token}; if (ec) { return ec; @@ -92,10 +92,10 @@ namespace fc::api::rpc { } void Client::_flush() { - if (!writing && !write_queue.empty() && !reconnecting){ + if (!writing && !write_queue.empty() && not reconnecting){ auto &[id, buffer] = write_queue.front(); writing = true; - socket.async_write(boost::asio::buffer(buffer.data(), buffer.size()), + socket->async_write(boost::asio::buffer(buffer.data(), buffer.size()), [=](auto &&ec, auto) { std::lock_guard lock{mutex}; if (ec) { @@ -109,7 +109,7 @@ namespace fc::api::rpc { } void Client::_read() { - socket.async_read(buffer, [=](auto &&ec, auto) { + socket->async_read(buffer, [=](auto &&ec, auto) { if (ec) { std::lock_guard lock{mutex}; return _error(ec); @@ -189,10 +189,13 @@ namespace fc::api::rpc { } void Client::reconnect(int counter, std::chrono::milliseconds wait) { - if(reconnecting.exchange(true)) return; + if(reconnecting) return; + reconnecting = true; logger_->info("Starting reconnect to {}:{}", client_data.host, client_data.port); for(int i = 0; i < counter; i++){ - std::this_thread::sleep_for(wait*(i+1)); + std::this_thread::sleep_for(wait); + socket.reset(); + socket.emplace(io); auto res = connect(client_data.host, client_data.port, client_data.target, @@ -201,7 +204,7 @@ namespace fc::api::rpc { break; } } - reconnecting.store(false); + reconnecting = false; logger_->info("Reconnect to {}:{} was successful", client_data.host, client_data.port); _flush(); } diff --git a/core/api/rpc/wsc.hpp b/core/api/rpc/wsc.hpp index 05e1e5429e..6b143c18ac 100644 --- a/core/api/rpc/wsc.hpp +++ b/core/api/rpc/wsc.hpp @@ -76,7 +76,7 @@ namespace fc::api::rpc { io_context io; io_context &io2; boost::asio::executor_work_guard work_guard; - boost::beast::websocket::stream socket; + boost::optional> socket; boost::beast::flat_buffer buffer; std::mutex mutex; uint64_t next_req{}; @@ -84,7 +84,7 @@ namespace fc::api::rpc { std::map chans; std::queue> write_queue; bool writing{false}; - std::atomic reconnecting; + bool reconnecting{false}; template void _setup(Client &c, M &m); From 1e0881515153f776a8fb6db13d624503d09134c1 Mon Sep 17 00:00:00 2001 From: elestrias Date: Sun, 20 Feb 2022 19:58:26 +0300 Subject: [PATCH 4/9] pr fixes Signed-off-by: elestrias --- core/api/rpc/wsc.cpp | 55 ++++++++++++--------- core/sector_storage/impl/local_worker.cpp | 2 +- core/sector_storage/impl/local_worker.hpp | 2 +- core/sector_storage/impl/remote_worker.cpp | 6 +-- core/sector_storage/impl/remote_worker.hpp | 2 +- core/sector_storage/impl/scheduler_impl.cpp | 4 +- core/sector_storage/worker.hpp | 2 +- 7 files changed, 41 insertions(+), 32 deletions(-) diff --git a/core/api/rpc/wsc.cpp b/core/api/rpc/wsc.cpp index 019eab97c1..6709a5aac8 100644 --- a/core/api/rpc/wsc.cpp +++ b/core/api/rpc/wsc.cpp @@ -37,14 +37,14 @@ namespace fc::api::rpc { return connect(ip, port, target, token); } - outcome::result Client::connect(const std::string &host, - const std::string &port, - const std::string &target, - const std::string &token) { + outcome::result Client::connect(const std::string &host, + const std::string &port, + const std::string &target, + const std::string &token) { boost::system::error_code ec; socket->next_layer().connect({boost::asio::ip::make_address(host), - boost::lexical_cast(port)}, - ec); + boost::lexical_cast(port)}, + ec); if (ec) { return ec; } @@ -92,19 +92,19 @@ namespace fc::api::rpc { } void Client::_flush() { - if (!writing && !write_queue.empty() && not reconnecting){ + if (!writing && !write_queue.empty() && not reconnecting) { auto &[id, buffer] = write_queue.front(); writing = true; socket->async_write(boost::asio::buffer(buffer.data(), buffer.size()), - [=](auto &&ec, auto) { - std::lock_guard lock{mutex}; - if (ec) { - return _error(ec); - } - writing = false; - write_queue.pop(); - _flush(); - }); + [=](auto &&ec, auto) { + std::lock_guard lock{mutex}; + if (ec) { + return _error(ec); + } + writing = false; + write_queue.pop(); + _flush(); + }); } } @@ -189,10 +189,12 @@ namespace fc::api::rpc { } void Client::reconnect(int counter, std::chrono::milliseconds wait) { - if(reconnecting) return; + if (reconnecting) return; reconnecting = true; - logger_->info("Starting reconnect to {}:{}", client_data.host, client_data.port); - for(int i = 0; i < counter; i++){ + bool rec_status{false}; + logger_->info( + "Starting reconnect to {}:{}", client_data.host, client_data.port); + for (int i = 0; i < counter; i++) { std::this_thread::sleep_for(wait); socket.reset(); socket.emplace(io); @@ -200,12 +202,21 @@ namespace fc::api::rpc { client_data.port, client_data.target, client_data.token); - if(!res.has_error()) { + if (not res.has_error()) { + rec_status = true; break; } } reconnecting = false; - logger_->info("Reconnect to {}:{} was successful", client_data.host, client_data.port); - _flush(); + if (rec_status) { + logger_->info("Reconnect to {}:{} was successful", + client_data.host, + client_data.port); + _flush(); + } else { + logger_->error("Reconnect to {}:{} have been failed", + client_data.host, + client_data.port); + } } } // namespace fc::api::rpc diff --git a/core/sector_storage/impl/local_worker.cpp b/core/sector_storage/impl/local_worker.cpp index 8461c10c01..242ae45444 100644 --- a/core/sector_storage/impl/local_worker.cpp +++ b/core/sector_storage/impl/local_worker.cpp @@ -904,7 +904,7 @@ namespace fc::sector_storage { return call_id; } - void LocalWorker::ping(std::function cb) { + void LocalWorker::ping(std::function cb) { cb(true); } } // namespace fc::sector_storage diff --git a/core/sector_storage/impl/local_worker.hpp b/core/sector_storage/impl/local_worker.hpp index 432fd5839c..d527fa10bc 100644 --- a/core/sector_storage/impl/local_worker.hpp +++ b/core/sector_storage/impl/local_worker.hpp @@ -99,7 +99,7 @@ namespace fc::sector_storage { outcome::result> getAccessiblePaths() override; - void ping(std::function cb) override; + void ping(std::function cb) override; private: template diff --git a/core/sector_storage/impl/remote_worker.cpp b/core/sector_storage/impl/remote_worker.cpp index 8254891312..7847852999 100644 --- a/core/sector_storage/impl/remote_worker.cpp +++ b/core/sector_storage/impl/remote_worker.cpp @@ -255,9 +255,7 @@ namespace fc::sector_storage { return api_.Fetch(sector, file_type, path_type, mode); } - void RemoteWorker::ping(std::function cb) { - api_.Version([=](auto res){ - cb(!res.has_error()); - }); + void RemoteWorker::ping(std::function cb) { + api_.Version([=](auto res) { cb(!res.has_error()); }); } } // namespace fc::sector_storage diff --git a/core/sector_storage/impl/remote_worker.hpp b/core/sector_storage/impl/remote_worker.hpp index 53b3eeaa29..cf4257e1f3 100644 --- a/core/sector_storage/impl/remote_worker.hpp +++ b/core/sector_storage/impl/remote_worker.hpp @@ -39,7 +39,7 @@ namespace fc::sector_storage { const SectorRef §or, const PreCommit1Output &pre_commit_1_output) override; - void ping(std::function cb) override; + void ping(std::function cb) override; outcome::result sealCommit1(const SectorRef §or, const SealRandomness &ticket, diff --git a/core/sector_storage/impl/scheduler_impl.cpp b/core/sector_storage/impl/scheduler_impl.cpp index 3f651a9e3e..2242326702 100644 --- a/core/sector_storage/impl/scheduler_impl.cpp +++ b/core/sector_storage/impl/scheduler_impl.cpp @@ -200,14 +200,14 @@ namespace fc::sector_storage { std::future wid_future = wid_promise.get_future(); auto done = std::make_shared(); for (const auto &cur : acceptable) { - workers_[cur]->worker->ping([&wid_promise, done, cur](const bool &resp) { + workers_[cur]->worker->ping([&wid_promise, done, cur](bool resp) { if (resp && !done->exchange(true)) { wid_promise.set_value(cur); } }); } auto status = wid_future.wait_for(std::chrono::seconds(5)); - if(status == std::future_status::timeout){ + if (status == std::future_status::timeout) { return false; } WorkerID wid = wid_future.get(); diff --git a/core/sector_storage/worker.hpp b/core/sector_storage/worker.hpp index 04ea23b0b4..53ed51371d 100644 --- a/core/sector_storage/worker.hpp +++ b/core/sector_storage/worker.hpp @@ -143,7 +143,7 @@ namespace fc::sector_storage { virtual outcome::result> getAccessiblePaths() = 0; - virtual void ping(std::function cb) = 0; + virtual void ping(std::function cb) = 0; }; enum class CallErrorCode : uint64_t { From 5c5067a9c2c0890504431099c97eee2e4f2ce752 Mon Sep 17 00:00:00 2001 From: elestrias Date: Sun, 20 Feb 2022 22:23:03 +0300 Subject: [PATCH 5/9] duplicate worker fix Signed-off-by: elestrias --- core/api/rpc/wsc.hpp | 18 ++++++++---------- core/primitives/sector_file/sector_file.hpp | 10 +++++----- core/primitives/types.hpp | 10 +++++++++- core/sector_storage/impl/local_worker.cpp | 2 +- core/sector_storage/impl/local_worker.hpp | 2 +- core/sector_storage/impl/manager_impl.cpp | 1 - core/sector_storage/impl/remote_worker.cpp | 2 +- core/sector_storage/impl/remote_worker.hpp | 2 +- core/sector_storage/impl/scheduler_impl.cpp | 7 ++++++- core/sector_storage/selector.hpp | 5 +++++ core/sector_storage/worker.hpp | 2 +- .../mocks/sector_storage/worker_mock.hpp | 2 +- 12 files changed, 39 insertions(+), 24 deletions(-) diff --git a/core/api/rpc/wsc.hpp b/core/api/rpc/wsc.hpp index 6b143c18ac..32a1882ada 100644 --- a/core/api/rpc/wsc.hpp +++ b/core/api/rpc/wsc.hpp @@ -58,15 +58,11 @@ namespace fc::api::rpc { visit(api, [&](auto &m) { _setup(*this, m); }); } struct ClientData { - ClientData(std::string host, - std::string port, - std::string target, - std::string token) - : host(host), port(port), target(target), token(token){}; - ClientData() = default; - - std::string host, port, target, token; - }client_data; + std::string host; + std::string port; + std::string target; + std::string token; + } client_data; void reconnect(int counter, std::chrono::milliseconds wait); @@ -76,7 +72,9 @@ namespace fc::api::rpc { io_context io; io_context &io2; boost::asio::executor_work_guard work_guard; - boost::optional> socket; + boost::optional< + boost::beast::websocket::stream> + socket; boost::beast::flat_buffer buffer; std::mutex mutex; uint64_t next_req{}; diff --git a/core/primitives/sector_file/sector_file.hpp b/core/primitives/sector_file/sector_file.hpp index 8115f9f344..452ec74a29 100644 --- a/core/primitives/sector_file/sector_file.hpp +++ b/core/primitives/sector_file/sector_file.hpp @@ -83,11 +83,11 @@ namespace fc::primitives::sector_file { struct SectorPaths { public: SectorId id; - std::string unsealed; - std::string sealed; - std::string cache; - std::string update; - std::string update_cache; + std::string unsealed{}; + std::string sealed{}; + std::string cache{}; + std::string update{}; + std::string update_cache{}; void setPathByType(const SectorFileType &file_type, const std::string &path); diff --git a/core/primitives/types.hpp b/core/primitives/types.hpp index b3e9455949..6c190d6486 100644 --- a/core/primitives/types.hpp +++ b/core/primitives/types.hpp @@ -39,7 +39,7 @@ namespace fc::primitives { struct FsStat { uint64_t capacity = 0; uint64_t available = 0; - uint64_t fs_available = 0; // Available to use for sector storage + uint64_t fs_available = 0; // Available to use for sector storage uint64_t reserved = 0; uint64_t max = 0; uint64_t used = 0; @@ -91,6 +91,14 @@ namespace fc::primitives { std::vector gpus; }; + inline bool operator==(const WorkerResources &lhs, + const WorkerResources &rhs) { + return (lhs.physical_memory == rhs.physical_memory + && lhs.swap_memory == rhs.swap_memory + && lhs.reserved_memory == rhs.reserved_memory + && lhs.cpus == rhs.cpus && lhs.gpus == rhs.gpus); + } + struct WorkerInfo { std::string hostname; WorkerResources resources; diff --git a/core/sector_storage/impl/local_worker.cpp b/core/sector_storage/impl/local_worker.cpp index 242ae45444..c1f036bcbf 100644 --- a/core/sector_storage/impl/local_worker.cpp +++ b/core/sector_storage/impl/local_worker.cpp @@ -904,7 +904,7 @@ namespace fc::sector_storage { return call_id; } - void LocalWorker::ping(std::function cb) { + void LocalWorker::ping(std::function cb) { cb(true); } } // namespace fc::sector_storage diff --git a/core/sector_storage/impl/local_worker.hpp b/core/sector_storage/impl/local_worker.hpp index d527fa10bc..c5c8fb976e 100644 --- a/core/sector_storage/impl/local_worker.hpp +++ b/core/sector_storage/impl/local_worker.hpp @@ -99,7 +99,7 @@ namespace fc::sector_storage { outcome::result> getAccessiblePaths() override; - void ping(std::function cb) override; + void ping(std::function cb) override; private: template diff --git a/core/sector_storage/impl/manager_impl.cpp b/core/sector_storage/impl/manager_impl.cpp index 43b768f106..6a0eaaddbc 100644 --- a/core/sector_storage/impl/manager_impl.cpp +++ b/core/sector_storage/impl/manager_impl.cpp @@ -260,7 +260,6 @@ namespace fc::sector_storage { worker_handler->worker = std::move(worker); worker_handler->info = std::move(info); - scheduler_->newWorker(std::move(worker_handler)); return outcome::success(); diff --git a/core/sector_storage/impl/remote_worker.cpp b/core/sector_storage/impl/remote_worker.cpp index 7847852999..14ef6f4e5c 100644 --- a/core/sector_storage/impl/remote_worker.cpp +++ b/core/sector_storage/impl/remote_worker.cpp @@ -255,7 +255,7 @@ namespace fc::sector_storage { return api_.Fetch(sector, file_type, path_type, mode); } - void RemoteWorker::ping(std::function cb) { + void RemoteWorker::ping(std::function cb) { api_.Version([=](auto res) { cb(!res.has_error()); }); } } // namespace fc::sector_storage diff --git a/core/sector_storage/impl/remote_worker.hpp b/core/sector_storage/impl/remote_worker.hpp index cf4257e1f3..982a1797d9 100644 --- a/core/sector_storage/impl/remote_worker.hpp +++ b/core/sector_storage/impl/remote_worker.hpp @@ -39,7 +39,7 @@ namespace fc::sector_storage { const SectorRef §or, const PreCommit1Output &pre_commit_1_output) override; - void ping(std::function cb) override; + void ping(std::function cb) override; outcome::result sealCommit1(const SectorRef §or, const SealRandomness &ticket, diff --git a/core/sector_storage/impl/scheduler_impl.cpp b/core/sector_storage/impl/scheduler_impl.cpp index 2242326702..899cd27716 100644 --- a/core/sector_storage/impl/scheduler_impl.cpp +++ b/core/sector_storage/impl/scheduler_impl.cpp @@ -137,6 +137,11 @@ namespace fc::sector_storage { void SchedulerImpl::newWorker(std::unique_ptr worker) { std::unique_lock lock(workers_lock_); + for(const auto &[key, value] : workers_){ + if(*value == *worker){ + return; + } + } if (current_worker_id_ == std::numeric_limits::max()) { current_worker_id_ = 0; // TODO(ortyomka): maybe better mechanism } @@ -200,7 +205,7 @@ namespace fc::sector_storage { std::future wid_future = wid_promise.get_future(); auto done = std::make_shared(); for (const auto &cur : acceptable) { - workers_[cur]->worker->ping([&wid_promise, done, cur](bool resp) { + workers_[cur]->worker->ping([&wid_promise, done, cur](const bool resp) { if (resp && !done->exchange(true)) { wid_promise.set_value(cur); } diff --git a/core/sector_storage/selector.hpp b/core/sector_storage/selector.hpp index 1548db0e9a..cb7ef1880e 100644 --- a/core/sector_storage/selector.hpp +++ b/core/sector_storage/selector.hpp @@ -24,6 +24,11 @@ namespace fc::sector_storage { ActiveResources active; }; + inline bool operator==(const WorkerHandle &lhs, const WorkerHandle &rhs) { + return lhs.info.hostname == rhs.info.hostname + && lhs.info.resources == rhs.info.resources; + } + class WorkerSelector { public: virtual ~WorkerSelector() = default; diff --git a/core/sector_storage/worker.hpp b/core/sector_storage/worker.hpp index 53ed51371d..336fd68ff4 100644 --- a/core/sector_storage/worker.hpp +++ b/core/sector_storage/worker.hpp @@ -143,7 +143,7 @@ namespace fc::sector_storage { virtual outcome::result> getAccessiblePaths() = 0; - virtual void ping(std::function cb) = 0; + virtual void ping(std::function cb) = 0; }; enum class CallErrorCode : uint64_t { diff --git a/test/testutil/mocks/sector_storage/worker_mock.hpp b/test/testutil/mocks/sector_storage/worker_mock.hpp index 0af2e2454c..629d2c7950 100644 --- a/test/testutil/mocks/sector_storage/worker_mock.hpp +++ b/test/testutil/mocks/sector_storage/worker_mock.hpp @@ -101,6 +101,6 @@ namespace fc::sector_storage { gsl::span, const UnpaddedPieceSize &, int)); - MOCK_METHOD1(ping, void(std::function)); + MOCK_METHOD1(ping, void(std::function)); }; } // namespace fc::sector_storage From 566d6d260aaa57f1d463248a3ab65bef8898ea3e Mon Sep 17 00:00:00 2001 From: elestrias Date: Mon, 21 Feb 2022 14:56:47 +0300 Subject: [PATCH 6/9] Writing status move Signed-off-by: elestrias --- core/api/rpc/wsc.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/api/rpc/wsc.cpp b/core/api/rpc/wsc.cpp index 6709a5aac8..eb415cdf8b 100644 --- a/core/api/rpc/wsc.cpp +++ b/core/api/rpc/wsc.cpp @@ -98,10 +98,10 @@ namespace fc::api::rpc { socket->async_write(boost::asio::buffer(buffer.data(), buffer.size()), [=](auto &&ec, auto) { std::lock_guard lock{mutex}; + writing = false; if (ec) { return _error(ec); } - writing = false; write_queue.pop(); _flush(); }); From 3b1decb79821180374c431bde4f262e70cebc247 Mon Sep 17 00:00:00 2001 From: elestrias Date: Tue, 22 Feb 2022 20:37:56 +0300 Subject: [PATCH 7/9] check fixes Signed-off-by: elestrias --- core/api/rpc/wsc.cpp | 4 ++-- core/api/worker_api.hpp | 2 +- core/remote_worker/remote_worker_api.cpp | 3 ++- core/sector_storage/impl/manager_impl.cpp | 2 +- core/sector_storage/impl/remote_worker.cpp | 2 +- 5 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/api/rpc/wsc.cpp b/core/api/rpc/wsc.cpp index eb415cdf8b..8d3c6463a1 100644 --- a/core/api/rpc/wsc.cpp +++ b/core/api/rpc/wsc.cpp @@ -88,7 +88,7 @@ namespace fc::api::rpc { } } chans.clear(); - reconnect(3, std::chrono::seconds(5)); + reconnect(3, std::chrono::seconds(10)); } void Client::_flush() { @@ -195,7 +195,7 @@ namespace fc::api::rpc { logger_->info( "Starting reconnect to {}:{}", client_data.host, client_data.port); for (int i = 0; i < counter; i++) { - std::this_thread::sleep_for(wait); + std::this_thread::sleep_for(wait*(i+1)); socket.reset(); socket.emplace(io); auto res = connect(client_data.host, diff --git a/core/api/worker_api.hpp b/core/api/worker_api.hpp index 74b2e9b43a..17b6d4a418 100644 --- a/core/api/worker_api.hpp +++ b/core/api/worker_api.hpp @@ -126,7 +126,7 @@ namespace fc::api { const SealRandomness &, const CID &) - API_METHOD(Version, kAdminPermission, VersionResult) + API_METHOD(Version, kAdminPermission, ApiVersion) }; template diff --git a/core/remote_worker/remote_worker_api.cpp b/core/remote_worker/remote_worker_api.cpp index b1ce76e314..14a59a8e28 100644 --- a/core/remote_worker/remote_worker_api.cpp +++ b/core/remote_worker/remote_worker_api.cpp @@ -6,6 +6,7 @@ #include "remote_worker/remote_worker_api.hpp" namespace fc::remote_worker { + using api::ApiVersion; using api::VersionResult; using primitives::piece::PieceInfo; using primitives::piece::UnpaddedByteIndex; @@ -26,7 +27,7 @@ namespace fc::remote_worker { const std::shared_ptr &local_store, const std::shared_ptr &worker) { auto worker_api{std::make_shared()}; - worker_api->Version = []() { return VersionResult{"seal-worker", 0, 0}; }; + worker_api->Version = []() { return ApiVersion{0}; }; worker_api->StorageAddLocal = [=](const std::string &path) { return local_store->openPath(path); }; diff --git a/core/sector_storage/impl/manager_impl.cpp b/core/sector_storage/impl/manager_impl.cpp index 6a0eaaddbc..100a09cb44 100644 --- a/core/sector_storage/impl/manager_impl.cpp +++ b/core/sector_storage/impl/manager_impl.cpp @@ -412,7 +412,7 @@ namespace fc::sector_storage { remote, proofs); - OUTCOME_TRY(manager->addWorker(std::move(worker))); + //OUTCOME_TRY(manager->addWorker(std::move(worker))); return std::move(manager); } diff --git a/core/sector_storage/impl/remote_worker.cpp b/core/sector_storage/impl/remote_worker.cpp index 14ef6f4e5c..45f63e83af 100644 --- a/core/sector_storage/impl/remote_worker.cpp +++ b/core/sector_storage/impl/remote_worker.cpp @@ -256,6 +256,6 @@ namespace fc::sector_storage { } void RemoteWorker::ping(std::function cb) { - api_.Version([=](auto res) { cb(!res.has_error()); }); + api_.Version([=](auto res){cb(res.has_value());}); } } // namespace fc::sector_storage From 8e0d911f38c10a1a33d6aeca191d7e296dd589c5 Mon Sep 17 00:00:00 2001 From: Elestrias <45968619+Elestrias@users.noreply.github.com> Date: Tue, 22 Feb 2022 22:22:18 +0300 Subject: [PATCH 8/9] Update core/sector_storage/impl/manager_impl.cpp --- core/sector_storage/impl/manager_impl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/sector_storage/impl/manager_impl.cpp b/core/sector_storage/impl/manager_impl.cpp index 689baf49af..459171f6a3 100644 --- a/core/sector_storage/impl/manager_impl.cpp +++ b/core/sector_storage/impl/manager_impl.cpp @@ -414,7 +414,7 @@ namespace fc::sector_storage { remote, proofs); - //OUTCOME_TRY(manager->addWorker(std::move(worker))); + OUTCOME_TRY(manager->addWorker(std::move(worker))); return std::move(manager); } From a73b6dcc59621653e93eb594d44b4c9074e3c488 Mon Sep 17 00:00:00 2001 From: elestrias Date: Tue, 22 Feb 2022 22:29:13 +0300 Subject: [PATCH 9/9] fix manager Signed-off-by: elestrias --- core/sector_storage/impl/manager_impl.cpp | 3 ++- core/sector_storage/impl/scheduler_impl.cpp | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/core/sector_storage/impl/manager_impl.cpp b/core/sector_storage/impl/manager_impl.cpp index 689baf49af..9050cd2a4f 100644 --- a/core/sector_storage/impl/manager_impl.cpp +++ b/core/sector_storage/impl/manager_impl.cpp @@ -262,6 +262,7 @@ namespace fc::sector_storage { worker_handler->worker = std::move(worker); worker_handler->info = std::move(info); + scheduler_->newWorker(std::move(worker_handler)); return outcome::success(); @@ -414,7 +415,7 @@ namespace fc::sector_storage { remote, proofs); - //OUTCOME_TRY(manager->addWorker(std::move(worker))); + OUTCOME_TRY(manager->addWorker(std::move(worker))); return std::move(manager); } diff --git a/core/sector_storage/impl/scheduler_impl.cpp b/core/sector_storage/impl/scheduler_impl.cpp index 899cd27716..764397b5e1 100644 --- a/core/sector_storage/impl/scheduler_impl.cpp +++ b/core/sector_storage/impl/scheduler_impl.cpp @@ -217,6 +217,7 @@ namespace fc::sector_storage { } WorkerID wid = wid_future.get(); assignWorker(wid, workers_[wid], request); + return true; }