diff --git a/libfuse/lib/bounded_queue.hpp b/libfuse/lib/bounded_queue.hpp index 5e5fa261..5b9d5d42 100644 --- a/libfuse/lib/bounded_queue.hpp +++ b/libfuse/lib/bounded_queue.hpp @@ -13,13 +13,14 @@ public: explicit BoundedQueue(std::size_t max_size_, bool block_ = true) - : _block(block), + : _block(block_), _max_size(max_size_) { - if(_max_size == 0) - _max_size = 1; } + BoundedQueue(const BoundedQueue&) = delete; + BoundedQueue(BoundedQueue&&) = default; + bool push(const T& item_) { diff --git a/libfuse/lib/bounded_thread_pool.hpp b/libfuse/lib/bounded_thread_pool.hpp new file mode 100644 index 00000000..f44cef21 --- /dev/null +++ b/libfuse/lib/bounded_thread_pool.hpp @@ -0,0 +1,130 @@ +#pragma once + +#include "bounded_queue.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +class BoundedThreadPool +{ +private: + using Proc = std::function; + using Queue = BoundedQueue; + using Queues = std::vector>; + +public: + explicit + BoundedThreadPool(const std::size_t thread_count_ = std::thread::hardware_concurrency()) + : _queues(), + _count(thread_count_) + { + printf("threads: %d\n",thread_count_); + for(std::size_t i = 0; i < thread_count_; i++) + _queues.emplace_back(std::make_shared(1)); + + auto worker = [this](std::size_t i) + { + while(true) + { + Proc f; + + for(std::size_t n = 0; n < (_count * K); ++n) + { + if(_queues[(i + n) % _count]->pop(f)) + break; + } + + if(!f && !_queues[i]->pop(f)) + break; + + f(); + } + }; + + _threads.reserve(thread_count_); + for(std::size_t i = 0; i < thread_count_; ++i) + _threads.emplace_back(worker, i); + } + + ~BoundedThreadPool() + { + for(auto& queue : _queues) + queue->unblock(); + for(auto& thread : _threads) + thread.join(); + } + + template + void + enqueue_work(F&& f_) + { + auto i = _index++; + + for(std::size_t n = 0; n < (_count * K); ++n) + { + if(_queues[(i + n) % _count]->push(f_)) + return; + } + + _queues[i % _count]->push(std::move(f_)); + } + + template + [[nodiscard]] + std::future::type> + enqueue_task(F&& f_) + { + using TaskReturnType = typename std::result_of::type; + using Promise = std::promise; + + auto i = _index++; + auto promise = std::make_shared(); + auto future = promise->get_future(); + auto work = [=]() { + auto rv = f_(); + promise->set_value(rv); + }; + + for(std::size_t n = 0; n < (_count * K); ++n) + { + if(_queues[(i + n) % _count]->push(work)) + return future; + } + + _queues[i % _count]->push(std::move(work)); + + return future; + } + +public: + std::vector + threads() + { + std::vector rv; + + for(auto &thread : _threads) + rv.push_back(thread.native_handle()); + + return rv; + } + +private: + Queues _queues; + +private: + std::vector _threads; + +private: + const std::size_t _count; + std::atomic_uint _index; + + static const unsigned int K = 2; +}; diff --git a/libfuse/lib/fuse.c b/libfuse/lib/fuse.c index f5729c17..6a134df3 100644 --- a/libfuse/lib/fuse.c +++ b/libfuse/lib/fuse.c @@ -3831,7 +3831,7 @@ fuse_maintenance_loop(void *fuse_) gc = lfmp_gc(&f->node_fmp); } - msgbuf_gc(); + //msgbuf_gc(); if(g_LOG_METRICS) metrics_log_nodes_info_to_tmp_dir(f); diff --git a/libfuse/lib/fuse_loop_mt.cpp b/libfuse/lib/fuse_loop_mt.cpp index dc726ff4..8440d8de 100644 --- a/libfuse/lib/fuse_loop_mt.cpp +++ b/libfuse/lib/fuse_loop_mt.cpp @@ -2,7 +2,7 @@ #define _GNU_SOURCE #endif -#include "thread_pool.hpp" +#include "bounded_thread_pool.hpp" #include "cpu.hpp" #include "fmt/core.h" @@ -33,7 +33,7 @@ struct fuse_worker_data_t struct fuse_session *se; sem_t finished; std::function msgbuf_processor; - std::shared_ptr tp; + std::shared_ptr tp; }; class WorkerCleanup @@ -444,7 +444,7 @@ fuse_session_loop_mt(struct fuse_session *se_, if(process_thread_count > 0) { - wd.tp = std::make_shared(process_thread_count); + wd.tp = std::make_shared(process_thread_count); wd.msgbuf_processor = process_msgbuf_async; process_threads = wd.tp->threads(); } diff --git a/libfuse/lib/fuse_msgbuf.cpp b/libfuse/lib/fuse_msgbuf.cpp index 60bff4f1..48514bd1 100644 --- a/libfuse/lib/fuse_msgbuf.cpp +++ b/libfuse/lib/fuse_msgbuf.cpp @@ -51,7 +51,7 @@ __attribute__((destructor)) void msgbuf_destroy() { - msgbuf_gc(); + } uint32_t