checkpoint

This commit is contained in:
Antonio SJ Musumeci 2023-04-25 22:36:56 -04:00
parent c96bac9cb8
commit 8e83821f32
5 changed files with 139 additions and 8 deletions

View File

@ -13,13 +13,14 @@ public:
explicit
BoundedQueue(std::size_t max_size_,
bool block_ = true)
: _block(block),
: _block(block_),
_max_size(max_size_)
{
if(_max_size == 0)
_max_size = 1;
}
BoundedQueue(const BoundedQueue&) = delete;
BoundedQueue(BoundedQueue&&) = default;
bool
push(const T& item_)
{

View File

@ -0,0 +1,130 @@
#pragma once
#include "bounded_queue.hpp"
#include <tuple>
#include <atomic>
#include <vector>
#include <thread>
#include <memory>
#include <future>
#include <utility>
#include <functional>
#include <type_traits>
class BoundedThreadPool
{
private:
using Proc = std::function<void(void)>;
using Queue = BoundedQueue<Proc>;
using Queues = std::vector<std::shared_ptr<Queue>>;
public:
explicit
BoundedThreadPool(const std::size_t thread_count_ = std::thread::hardware_concurrency())
: _queues(),
_count(thread_count_)
{
printf("threads: %d\n",thread_count_);
for(std::size_t i = 0; i < thread_count_; i++)
_queues.emplace_back(std::make_shared<Queue>(1));
auto worker = [this](std::size_t i)
{
while(true)
{
Proc f;
for(std::size_t n = 0; n < (_count * K); ++n)
{
if(_queues[(i + n) % _count]->pop(f))
break;
}
if(!f && !_queues[i]->pop(f))
break;
f();
}
};
_threads.reserve(thread_count_);
for(std::size_t i = 0; i < thread_count_; ++i)
_threads.emplace_back(worker, i);
}
~BoundedThreadPool()
{
for(auto& queue : _queues)
queue->unblock();
for(auto& thread : _threads)
thread.join();
}
template<typename F>
void
enqueue_work(F&& f_)
{
auto i = _index++;
for(std::size_t n = 0; n < (_count * K); ++n)
{
if(_queues[(i + n) % _count]->push(f_))
return;
}
_queues[i % _count]->push(std::move(f_));
}
template<typename F>
[[nodiscard]]
std::future<typename std::result_of<F()>::type>
enqueue_task(F&& f_)
{
using TaskReturnType = typename std::result_of<F()>::type;
using Promise = std::promise<TaskReturnType>;
auto i = _index++;
auto promise = std::make_shared<Promise>();
auto future = promise->get_future();
auto work = [=]() {
auto rv = f_();
promise->set_value(rv);
};
for(std::size_t n = 0; n < (_count * K); ++n)
{
if(_queues[(i + n) % _count]->push(work))
return future;
}
_queues[i % _count]->push(std::move(work));
return future;
}
public:
std::vector<pthread_t>
threads()
{
std::vector<pthread_t> rv;
for(auto &thread : _threads)
rv.push_back(thread.native_handle());
return rv;
}
private:
Queues _queues;
private:
std::vector<std::thread> _threads;
private:
const std::size_t _count;
std::atomic_uint _index;
static const unsigned int K = 2;
};

View File

@ -3831,7 +3831,7 @@ fuse_maintenance_loop(void *fuse_)
gc = lfmp_gc(&f->node_fmp);
}
msgbuf_gc();
//msgbuf_gc();
if(g_LOG_METRICS)
metrics_log_nodes_info_to_tmp_dir(f);

View File

@ -2,7 +2,7 @@
#define _GNU_SOURCE
#endif
#include "thread_pool.hpp"
#include "bounded_thread_pool.hpp"
#include "cpu.hpp"
#include "fmt/core.h"
@ -33,7 +33,7 @@ struct fuse_worker_data_t
struct fuse_session *se;
sem_t finished;
std::function<void(fuse_worker_data_t*,fuse_msgbuf_t*)> msgbuf_processor;
std::shared_ptr<ThreadPool> tp;
std::shared_ptr<BoundedThreadPool> tp;
};
class WorkerCleanup
@ -444,7 +444,7 @@ fuse_session_loop_mt(struct fuse_session *se_,
if(process_thread_count > 0)
{
wd.tp = std::make_shared<ThreadPool>(process_thread_count);
wd.tp = std::make_shared<BoundedThreadPool>(process_thread_count);
wd.msgbuf_processor = process_msgbuf_async;
process_threads = wd.tp->threads();
}

View File

@ -51,7 +51,7 @@ __attribute__((destructor))
void
msgbuf_destroy()
{
msgbuf_gc();
}
uint32_t