From 80a4898e7592d372c1216a1e05d08d15dbdbec6d Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Sat, 23 Nov 2019 23:35:34 -0800 Subject: [PATCH] Revert "Migrate some iothread functions into member functions of thread_pool_t" This reverts commit 22230a1a0ddddc62732418773f3741073988df96. Also 9d7d70c204073fb7fcfc393b8b78821b905e6817 There's some subtle bug here, needs to be tracked down and tested. --- src/iothread.cpp | 83 ++++++++++++++---------------------------------- 1 file changed, 24 insertions(+), 59 deletions(-) diff --git a/src/iothread.cpp b/src/iothread.cpp index 035c5c482..e13af9900 100644 --- a/src/iothread.cpp +++ b/src/iothread.cpp @@ -86,7 +86,7 @@ struct thread_pool_t { }; /// Data which needs to be atomically accessed. - owning_lock req_data{}; + owning_lock data{}; /// The condition variable used to wake up waiting threads. /// Note this is tied to data's lock. @@ -103,32 +103,6 @@ struct thread_pool_t { assert(soft_min_threads >= 0 && max_threads >= 1 && soft_min_threads <= max_threads && "Invalid thread min and max"); } - - /// Enqueue a new work item onto the thread pool. - /// The function \p func will execute in one of the pool's threads. - /// \p completion will run on the main thread, if it is not missing. - int perform(void_function_t &&func, void_function_t &&completion); - - private: - /// The worker loop for this thread. - void *run(); - - /// Dequeue a work item (perhaps waiting on the condition variable), or commit to exiting by - /// reducing the active thread count. - /// This runs in the background thread. - maybe_t dequeue_work_or_commit_to_exit(); - - /// Trampoline function for pthread_spawn compatibility. - static void *run_trampoline(void *vpool); - - /// Attempt to spawn a new pthread. - pthread_t spawn(); - - /// No copying or moving. - thread_pool_t(const thread_pool_t &) = delete; - thread_pool_t(thread_pool_t &&) = delete; - void operator=(const thread_pool_t &) = delete; - void operator=(thread_pool_t &&) = delete; }; /// The thread pool for "iothreads" which are used to lift I/O off of the main thread. @@ -171,14 +145,16 @@ static const notify_pipes_t &get_notify_pipes() { /// Dequeue a work item (perhaps waiting on the condition variable), or commit to exiting by /// reducing the active thread count. -maybe_t thread_pool_t::dequeue_work_or_commit_to_exit() { - auto data = this->req_data.acquire(); +static maybe_t iothread_dequeue_work_or_commit_to_exit() { + auto &pool = s_io_thread_pool; + auto data = pool.data.acquire(); + while (data->request_queue.empty()) { bool give_up = true; - if (!data->drain && data->total_threads == this->soft_min_threads) { + if (!data->drain && data->total_threads == pool.soft_min_threads) { // If we exit, it will drop below the soft min. So wait for a while before giving up. data->waiting_threads += 1; - auto wait_ret = this->queue_cond.wait_for( + auto wait_ret = pool.queue_cond.wait_for( data.get_lock(), std::chrono::milliseconds(IO_WAIT_FOR_WORK_DURATION_MS)); data->waiting_threads -= 1; give_up = (wait_ret == std::cv_status::timeout); @@ -202,8 +178,10 @@ static void enqueue_thread_result(work_request_t req) { static void *this_thread() { return (void *)(intptr_t)pthread_self(); } -void *thread_pool_t::run() { - while (auto req = dequeue_work_or_commit_to_exit()) { +/// The function that does thread work. +static void *iothread_worker(void *unused) { + UNUSED(unused); + while (auto req = iothread_dequeue_work_or_commit_to_exit()) { FLOGF(iothread, L"pthread %p got work", this_thread()); // Perform the work @@ -223,27 +201,24 @@ void *thread_pool_t::run() { return nullptr; } -void *thread_pool_t::run_trampoline(void *pool) { - assert(pool && "No thread pool given"); - return static_cast(pool)->run(); -} - /// Spawn another thread. No lock is held when this is called. -pthread_t thread_pool_t::spawn() { +static pthread_t iothread_spawn() { // Spawn a thread. If this fails, it means there's already a bunch of threads; it is very // unlikely that they are all on the verge of exiting, so one is likely to be ready to handle // extant requests. So we can ignore failure with some confidence. pthread_t thread = 0; - if (make_pthread(&thread, run_trampoline, static_cast(this))) { + if (make_pthread(&thread, iothread_worker, nullptr)) { // We will never join this thread. DIE_ON_FAILURE(pthread_detach(thread)); } return thread; } -int thread_pool_t::perform(void_function_t &&func, void_function_t &&completion) { - assert(func && "Missing function"); - // Note we permit an empty completion. +int iothread_perform_impl(void_function_t &&func, void_function_t &&completion) { + ASSERT_IS_MAIN_THREAD(); + ASSERT_IS_NOT_FORKED_CHILD(); + assert(func && "Null function"); + struct work_request_t req(std::move(func), std::move(completion)); int local_thread_count = -1; auto &pool = s_io_thread_pool; @@ -251,7 +226,7 @@ int thread_pool_t::perform(void_function_t &&func, void_function_t &&completion) bool wakeup_thread = false; { // Lock around a local region. - auto data = pool.req_data.acquire(); + auto data = pool.data.acquire(); data->request_queue.push(std::move(req)); if (data->drain) { // Do nothing here. @@ -271,22 +246,12 @@ int thread_pool_t::perform(void_function_t &&func, void_function_t &&completion) pool.queue_cond.notify_one(); } if (spawn_new_thread) { - if (pthread_t pt = this->spawn()) { - FLOGF(iothread, L"pthread %p spawned", pt); - } else { - // We failed to spawn a thread; decrement the thread count. - pool.req_data.acquire()->total_threads -= 1; - } + pthread_t pt = iothread_spawn(); + FLOGF(iothread, L"pthread %p spawned", pt); } return local_thread_count; } -int iothread_perform_impl(void_function_t &&func, void_function_t &&completion) { - ASSERT_IS_MAIN_THREAD(); - ASSERT_IS_NOT_FORKED_CHILD(); - return s_io_thread_pool.perform(std::move(func), std::move(completion)); -} - int iothread_port() { return get_notify_pipes().read; } void iothread_service_completion() { @@ -327,7 +292,7 @@ int iothread_drain_all() { auto &pool = s_io_thread_pool; // Set the drain flag. { - auto data = pool.req_data.acquire(); + auto data = pool.data.acquire(); assert(!data->drain && "Should not be draining already"); data->drain = true; thread_count = data->total_threads; @@ -339,7 +304,7 @@ int iothread_drain_all() { double now = timef(); // Nasty polling via select(). - while (pool.req_data.acquire()->total_threads > 0) { + while (pool.data.acquire()->total_threads > 0) { if (iothread_wait_for_pending_completions(1000)) { iothread_service_completion(); } @@ -349,7 +314,7 @@ int iothread_drain_all() { // Even though we released the lock, nobody should have added a new thread while the drain flag // is set. { - auto data = pool.req_data.acquire(); + auto data = pool.data.acquire(); assert(data->total_threads == 0 && "Should be no threads"); assert(data->drain && "Should be draining"); data->drain = false;