mirror of
https://github.com/trapexit/mergerfs.git
synced 2024-11-25 09:41:43 +08:00
Initialize readdir threadpool after daemonizing
This commit is contained in:
parent
82781b6ff8
commit
0c555e71a0
2
Makefile
2
Makefile
|
@ -42,7 +42,7 @@ UGID_USE_RWLOCK = 0
|
||||||
ifeq ($(DEBUG),1)
|
ifeq ($(DEBUG),1)
|
||||||
OPT_FLAGS := -O0 -g -fsanitize=undefined
|
OPT_FLAGS := -O0 -g -fsanitize=undefined
|
||||||
else
|
else
|
||||||
OPT_FLAGS := -O2
|
OPT_FLAGS := -O2 -DNDEBUG
|
||||||
endif
|
endif
|
||||||
|
|
||||||
ifeq ($(STATIC),1)
|
ifeq ($(STATIC),1)
|
||||||
|
|
|
@ -391,7 +391,9 @@ to enable page caching for when \f[C]cache.files=per-process\f[R].
|
||||||
.IP \[bu] 2
|
.IP \[bu] 2
|
||||||
\f[B]parallel-direct-writes=BOOL\f[R]: Allow the kernel to dispatch
|
\f[B]parallel-direct-writes=BOOL\f[R]: Allow the kernel to dispatch
|
||||||
multiple, parallel (non-extending) write requests for files opened with
|
multiple, parallel (non-extending) write requests for files opened with
|
||||||
\f[C]direct_io=true\f[R] (if supported by the kernel)
|
\f[C]cache.files=per-process\f[R] (if the process is not in
|
||||||
|
\f[C]process-names\f[R]) or \f[C]cache.files=off\f[R].
|
||||||
|
(This requires kernel support, and was added in v6.2)
|
||||||
.IP \[bu] 2
|
.IP \[bu] 2
|
||||||
\f[B]direct_io\f[R]: deprecated - Bypass page cache.
|
\f[B]direct_io\f[R]: deprecated - Bypass page cache.
|
||||||
Use \f[C]cache.files=off\f[R] instead.
|
Use \f[C]cache.files=off\f[R] instead.
|
||||||
|
@ -859,8 +861,8 @@ calculations.
|
||||||
The reason is that it doesn\[cq]t really work for non-path preserving
|
The reason is that it doesn\[cq]t really work for non-path preserving
|
||||||
policies and can lead to non-obvious behaviors.
|
policies and can lead to non-obvious behaviors.
|
||||||
.PP
|
.PP
|
||||||
NOTE: While any policy can be assigned to a function or category though
|
NOTE: While any policy can be assigned to a function or category, some
|
||||||
some may not be very useful in practice.
|
may not be very useful in practice.
|
||||||
For instance: \f[B]rand\f[R] (random) may be useful for file creation
|
For instance: \f[B]rand\f[R] (random) may be useful for file creation
|
||||||
(create) but could lead to very odd behavior if used for \f[C]chmod\f[R]
|
(create) but could lead to very odd behavior if used for \f[C]chmod\f[R]
|
||||||
if there were more than one copy of the file.
|
if there were more than one copy of the file.
|
||||||
|
@ -1753,9 +1755,12 @@ size are unchanged since previous open.
|
||||||
cache.files=libfuse: follow traditional libfuse \f[C]direct_io\f[R],
|
cache.files=libfuse: follow traditional libfuse \f[C]direct_io\f[R],
|
||||||
\f[C]kernel_cache\f[R], and \f[C]auto_cache\f[R] arguments.
|
\f[C]kernel_cache\f[R], and \f[C]auto_cache\f[R] arguments.
|
||||||
.IP \[bu] 2
|
.IP \[bu] 2
|
||||||
cache.files=per-process: Enable page caching only for processes which
|
cache.files=per-process: Enable page caching (equivalent to
|
||||||
`comm' name matches one of the values defined in
|
\f[C]cache.files=partial\f[R]) only for processes whose `comm' name
|
||||||
|
matches one of the values defined in
|
||||||
\f[C]cache.files.process-names\f[R].
|
\f[C]cache.files.process-names\f[R].
|
||||||
|
If the name does not match the file open is equivalent to
|
||||||
|
\f[C]cache.files=off\f[R].
|
||||||
.PP
|
.PP
|
||||||
FUSE, which mergerfs uses, offers a number of page caching modes.
|
FUSE, which mergerfs uses, offers a number of page caching modes.
|
||||||
mergerfs tries to simplify their use via the \f[C]cache.files\f[R]
|
mergerfs tries to simplify their use via the \f[C]cache.files\f[R]
|
||||||
|
|
|
@ -137,6 +137,8 @@ namespace FUSE
|
||||||
|
|
||||||
ugid::init();
|
ugid::init();
|
||||||
|
|
||||||
|
cfg->readdir.initialize();
|
||||||
|
|
||||||
l::want_if_capable(conn_,FUSE_CAP_ASYNC_DIO);
|
l::want_if_capable(conn_,FUSE_CAP_ASYNC_DIO);
|
||||||
l::want_if_capable(conn_,FUSE_CAP_ASYNC_READ,&cfg->async_read);
|
l::want_if_capable(conn_,FUSE_CAP_ASYNC_READ,&cfg->async_read);
|
||||||
l::want_if_capable(conn_,FUSE_CAP_ATOMIC_O_TRUNC);
|
l::want_if_capable(conn_,FUSE_CAP_ATOMIC_O_TRUNC);
|
||||||
|
|
|
@ -17,10 +17,15 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "fuse_readdir.hpp"
|
#include "fuse_readdir.hpp"
|
||||||
#include "fuse_readdir_factory.hpp"
|
|
||||||
|
|
||||||
#include "config.hpp"
|
#include "config.hpp"
|
||||||
|
#include "fuse_readdir_factory.hpp"
|
||||||
|
|
||||||
|
/*
|
||||||
|
The _initialized stuff is not pretty but easiest way to deal with
|
||||||
|
the fact that mergerfs is doing arg parsing and setting up of things
|
||||||
|
(including thread pools) before the daemonizing
|
||||||
|
*/
|
||||||
|
|
||||||
int
|
int
|
||||||
FUSE::readdir(const fuse_file_info_t *ffi_,
|
FUSE::readdir(const fuse_file_info_t *ffi_,
|
||||||
|
@ -32,9 +37,11 @@ FUSE::readdir(const fuse_file_info_t *ffi_,
|
||||||
}
|
}
|
||||||
|
|
||||||
FUSE::ReadDir::ReadDir(std::string const s_)
|
FUSE::ReadDir::ReadDir(std::string const s_)
|
||||||
|
: _initialized(false)
|
||||||
{
|
{
|
||||||
from_string(s_);
|
from_string(s_);
|
||||||
assert(_readdir);
|
if(_initialized)
|
||||||
|
assert(_readdir);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string
|
std::string
|
||||||
|
@ -45,25 +52,46 @@ FUSE::ReadDir::to_string() const
|
||||||
return _type;
|
return _type;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
FUSE::ReadDir::initialize()
|
||||||
|
{
|
||||||
|
_initialized = true;
|
||||||
|
from_string(_type);
|
||||||
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
FUSE::ReadDir::from_string(std::string const &str_)
|
FUSE::ReadDir::from_string(std::string const &str_)
|
||||||
{
|
{
|
||||||
std::shared_ptr<FUSE::ReadDirBase> tmp;
|
if(_initialized)
|
||||||
|
{
|
||||||
|
std::shared_ptr<FUSE::ReadDirBase> tmp;
|
||||||
|
|
||||||
tmp = FUSE::ReadDirFactory::make(str_);
|
tmp = FUSE::ReadDirFactory::make(str_);
|
||||||
if(!tmp)
|
if(!tmp)
|
||||||
return -EINVAL;
|
return -EINVAL;
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lg(_mutex);
|
std::lock_guard<std::mutex> lg(_mutex);
|
||||||
|
|
||||||
_type = str_;
|
_type = str_;
|
||||||
_readdir = tmp;
|
std::swap(_readdir,tmp);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lg(_mutex);
|
||||||
|
|
||||||
|
_type = str_;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
Yeah... if not initialized it will crash... ¯\_(ツ)_/¯
|
||||||
|
This will be resolved once initialization of internal objects and
|
||||||
|
handling of input is better seperated.
|
||||||
|
*/
|
||||||
int
|
int
|
||||||
FUSE::ReadDir::operator()(fuse_file_info_t const *ffi_,
|
FUSE::ReadDir::operator()(fuse_file_info_t const *ffi_,
|
||||||
fuse_dirents_t *buf_)
|
fuse_dirents_t *buf_)
|
||||||
|
|
|
@ -25,7 +25,10 @@
|
||||||
|
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
|
|
||||||
|
// The initialization behavior is not pretty but for the moment
|
||||||
|
// needed due to the daemonizing function of the libfuse library when
|
||||||
|
// not using foreground mode. The threads need to be created after the
|
||||||
|
// fork, not before.
|
||||||
namespace FUSE
|
namespace FUSE
|
||||||
{
|
{
|
||||||
int readdir(fuse_file_info_t const *ffi,
|
int readdir(fuse_file_info_t const *ffi,
|
||||||
|
@ -44,10 +47,14 @@ namespace FUSE
|
||||||
int operator()(fuse_file_info_t const *ffi,
|
int operator()(fuse_file_info_t const *ffi,
|
||||||
fuse_dirents_t *buf);
|
fuse_dirents_t *buf);
|
||||||
|
|
||||||
|
public:
|
||||||
|
void initialize();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
mutable std::mutex _mutex;
|
mutable std::mutex _mutex;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
bool _initialized;
|
||||||
std::string _type;
|
std::string _type;
|
||||||
std::shared_ptr<FUSE::ReadDirBase> _readdir;
|
std::shared_ptr<FUSE::ReadDirBase> _readdir;
|
||||||
};
|
};
|
||||||
|
|
|
@ -2,6 +2,8 @@
|
||||||
|
|
||||||
#include "unbounded_queue.hpp"
|
#include "unbounded_queue.hpp"
|
||||||
|
|
||||||
|
#include "syslog.hpp"
|
||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <csignal>
|
#include <csignal>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
@ -21,8 +23,13 @@ public:
|
||||||
ThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(),
|
ThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(),
|
||||||
std::string const name_ = {})
|
std::string const name_ = {})
|
||||||
: _queues(thread_count_),
|
: _queues(thread_count_),
|
||||||
_count(thread_count_)
|
_count(thread_count_),
|
||||||
|
_name(name_)
|
||||||
{
|
{
|
||||||
|
syslog_info("threadpool: spawning %zu threads named '%s'",
|
||||||
|
_count,
|
||||||
|
_name.c_str());
|
||||||
|
|
||||||
auto worker = [this](std::size_t i)
|
auto worker = [this](std::size_t i)
|
||||||
{
|
{
|
||||||
while(true)
|
while(true)
|
||||||
|
@ -51,10 +58,10 @@ public:
|
||||||
_threads.reserve(thread_count_);
|
_threads.reserve(thread_count_);
|
||||||
for(std::size_t i = 0; i < thread_count_; ++i)
|
for(std::size_t i = 0; i < thread_count_; ++i)
|
||||||
_threads.emplace_back(worker, i);
|
_threads.emplace_back(worker, i);
|
||||||
if(!name_.empty())
|
if(!_name.empty())
|
||||||
{
|
{
|
||||||
for(auto &t : _threads)
|
for(auto &t : _threads)
|
||||||
pthread_setname_np(t.native_handle(),name_.c_str());
|
pthread_setname_np(t.native_handle(),_name.c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_sigmask(SIG_SETMASK,&oldset,NULL);
|
pthread_sigmask(SIG_SETMASK,&oldset,NULL);
|
||||||
|
@ -62,6 +69,10 @@ public:
|
||||||
|
|
||||||
~ThreadPool()
|
~ThreadPool()
|
||||||
{
|
{
|
||||||
|
syslog_info("threadpool: destroying %zu threads named '%s'",
|
||||||
|
_count,
|
||||||
|
_name.c_str());
|
||||||
|
|
||||||
for(auto& queue : _queues)
|
for(auto& queue : _queues)
|
||||||
queue.unblock();
|
queue.unblock();
|
||||||
for(auto& thread : _threads)
|
for(auto& thread : _threads)
|
||||||
|
@ -133,8 +144,9 @@ private:
|
||||||
std::vector<std::thread> _threads;
|
std::vector<std::thread> _threads;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const std::size_t _count;
|
std::size_t const _count;
|
||||||
std::atomic_uint _index;
|
std::atomic_uint _index;
|
||||||
|
std::string const _name;
|
||||||
|
|
||||||
static const unsigned int K = 2;
|
static const unsigned int K = 2;
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in New Issue
Block a user