Revert "Migrate some iothread functions into member functions of thread_pool_t"

This reverts commit 22230a1a0ddddc62732418773f3741073988df96.
Also 9d7d70c204073fb7fcfc393b8b78821b905e6817

There's some subtle bug here, needs to be tracked down and tested.
This commit is contained in:
ridiculousfish 2019-11-23 23:35:34 -08:00
parent 9023c2187f
commit 80a4898e75

View File

@ -86,7 +86,7 @@ struct thread_pool_t {
};
/// Data which needs to be atomically accessed.
owning_lock<data_t> req_data{};
owning_lock<data_t> data{};
/// The condition variable used to wake up waiting threads.
/// Note this is tied to data's lock.
@ -103,32 +103,6 @@ struct thread_pool_t {
assert(soft_min_threads >= 0 && max_threads >= 1 && soft_min_threads <= max_threads &&
"Invalid thread min and max");
}
/// Enqueue a new work item onto the thread pool.
/// The function \p func will execute in one of the pool's threads.
/// \p completion will run on the main thread, if it is not missing.
int perform(void_function_t &&func, void_function_t &&completion);
private:
/// The worker loop for this thread.
void *run();
/// Dequeue a work item (perhaps waiting on the condition variable), or commit to exiting by
/// reducing the active thread count.
/// This runs in the background thread.
maybe_t<work_request_t> dequeue_work_or_commit_to_exit();
/// Trampoline function for pthread_spawn compatibility.
static void *run_trampoline(void *vpool);
/// Attempt to spawn a new pthread.
pthread_t spawn();
/// No copying or moving.
thread_pool_t(const thread_pool_t &) = delete;
thread_pool_t(thread_pool_t &&) = delete;
void operator=(const thread_pool_t &) = delete;
void operator=(thread_pool_t &&) = delete;
};
/// The thread pool for "iothreads" which are used to lift I/O off of the main thread.
@ -171,14 +145,16 @@ static const notify_pipes_t &get_notify_pipes() {
/// Dequeue a work item (perhaps waiting on the condition variable), or commit to exiting by
/// reducing the active thread count.
maybe_t<work_request_t> thread_pool_t::dequeue_work_or_commit_to_exit() {
auto data = this->req_data.acquire();
static maybe_t<work_request_t> iothread_dequeue_work_or_commit_to_exit() {
auto &pool = s_io_thread_pool;
auto data = pool.data.acquire();
while (data->request_queue.empty()) {
bool give_up = true;
if (!data->drain && data->total_threads == this->soft_min_threads) {
if (!data->drain && data->total_threads == pool.soft_min_threads) {
// If we exit, it will drop below the soft min. So wait for a while before giving up.
data->waiting_threads += 1;
auto wait_ret = this->queue_cond.wait_for(
auto wait_ret = pool.queue_cond.wait_for(
data.get_lock(), std::chrono::milliseconds(IO_WAIT_FOR_WORK_DURATION_MS));
data->waiting_threads -= 1;
give_up = (wait_ret == std::cv_status::timeout);
@ -202,8 +178,10 @@ static void enqueue_thread_result(work_request_t req) {
static void *this_thread() { return (void *)(intptr_t)pthread_self(); }
void *thread_pool_t::run() {
while (auto req = dequeue_work_or_commit_to_exit()) {
/// The function that does thread work.
static void *iothread_worker(void *unused) {
UNUSED(unused);
while (auto req = iothread_dequeue_work_or_commit_to_exit()) {
FLOGF(iothread, L"pthread %p got work", this_thread());
// Perform the work
@ -223,27 +201,24 @@ void *thread_pool_t::run() {
return nullptr;
}
void *thread_pool_t::run_trampoline(void *pool) {
assert(pool && "No thread pool given");
return static_cast<thread_pool_t *>(pool)->run();
}
/// Spawn another thread. No lock is held when this is called.
pthread_t thread_pool_t::spawn() {
static pthread_t iothread_spawn() {
// Spawn a thread. If this fails, it means there's already a bunch of threads; it is very
// unlikely that they are all on the verge of exiting, so one is likely to be ready to handle
// extant requests. So we can ignore failure with some confidence.
pthread_t thread = 0;
if (make_pthread(&thread, run_trampoline, static_cast<void *>(this))) {
if (make_pthread(&thread, iothread_worker, nullptr)) {
// We will never join this thread.
DIE_ON_FAILURE(pthread_detach(thread));
}
return thread;
}
int thread_pool_t::perform(void_function_t &&func, void_function_t &&completion) {
assert(func && "Missing function");
// Note we permit an empty completion.
int iothread_perform_impl(void_function_t &&func, void_function_t &&completion) {
ASSERT_IS_MAIN_THREAD();
ASSERT_IS_NOT_FORKED_CHILD();
assert(func && "Null function");
struct work_request_t req(std::move(func), std::move(completion));
int local_thread_count = -1;
auto &pool = s_io_thread_pool;
@ -251,7 +226,7 @@ int thread_pool_t::perform(void_function_t &&func, void_function_t &&completion)
bool wakeup_thread = false;
{
// Lock around a local region.
auto data = pool.req_data.acquire();
auto data = pool.data.acquire();
data->request_queue.push(std::move(req));
if (data->drain) {
// Do nothing here.
@ -271,22 +246,12 @@ int thread_pool_t::perform(void_function_t &&func, void_function_t &&completion)
pool.queue_cond.notify_one();
}
if (spawn_new_thread) {
if (pthread_t pt = this->spawn()) {
FLOGF(iothread, L"pthread %p spawned", pt);
} else {
// We failed to spawn a thread; decrement the thread count.
pool.req_data.acquire()->total_threads -= 1;
}
pthread_t pt = iothread_spawn();
FLOGF(iothread, L"pthread %p spawned", pt);
}
return local_thread_count;
}
int iothread_perform_impl(void_function_t &&func, void_function_t &&completion) {
ASSERT_IS_MAIN_THREAD();
ASSERT_IS_NOT_FORKED_CHILD();
return s_io_thread_pool.perform(std::move(func), std::move(completion));
}
int iothread_port() { return get_notify_pipes().read; }
void iothread_service_completion() {
@ -327,7 +292,7 @@ int iothread_drain_all() {
auto &pool = s_io_thread_pool;
// Set the drain flag.
{
auto data = pool.req_data.acquire();
auto data = pool.data.acquire();
assert(!data->drain && "Should not be draining already");
data->drain = true;
thread_count = data->total_threads;
@ -339,7 +304,7 @@ int iothread_drain_all() {
double now = timef();
// Nasty polling via select().
while (pool.req_data.acquire()->total_threads > 0) {
while (pool.data.acquire()->total_threads > 0) {
if (iothread_wait_for_pending_completions(1000)) {
iothread_service_completion();
}
@ -349,7 +314,7 @@ int iothread_drain_all() {
// Even though we released the lock, nobody should have added a new thread while the drain flag
// is set.
{
auto data = pool.req_data.acquire();
auto data = pool.data.acquire();
assert(data->total_threads == 0 && "Should be no threads");
assert(data->drain && "Should be draining");
data->drain = false;