mirror of
https://github.com/fish-shell/fish-shell.git
synced 2024-11-26 19:03:38 +08:00
Migrate some iothread functions into member functions of thread_pool_t
This reintroduces commits22230a1a0d
and9d7d70c204
, now with the bug fixed. The problem was when there was one thread waiting in the pool. We enqueue an item onto the pool and attempt to wake up the thread. But before the thread runs, we enqueue another item - this second enqueue will see the thread waiting and attempt to wake it up as well. If the two work items were dependent (reader/writer) then we would have a deadlock. The fix is to check if the number of waiting threads is at least as large as the queue. If the number of enqueued items exceeds the number of waiting threads, then spawn a new thread always.
This commit is contained in:
parent
83f153eb4c
commit
106af5f56a
139
src/iothread.cpp
139
src/iothread.cpp
|
@ -76,17 +76,17 @@ struct thread_pool_t {
|
|||
std::queue<work_request_t> request_queue{};
|
||||
|
||||
/// The number of threads that exist in the pool.
|
||||
int total_threads{0};
|
||||
size_t total_threads{0};
|
||||
|
||||
/// The number of threads which are waiting for more work.
|
||||
int waiting_threads{0};
|
||||
size_t waiting_threads{0};
|
||||
|
||||
/// A flag indicating we should not process new requests.
|
||||
bool drain{false};
|
||||
};
|
||||
|
||||
/// Data which needs to be atomically accessed.
|
||||
owning_lock<data_t> data{};
|
||||
owning_lock<data_t> req_data{};
|
||||
|
||||
/// The condition variable used to wake up waiting threads.
|
||||
/// Note this is tied to data's lock.
|
||||
|
@ -95,14 +95,38 @@ struct thread_pool_t {
|
|||
/// The minimum and maximum number of threads.
|
||||
/// Here "minimum" means threads that are kept waiting in the pool.
|
||||
/// Note that the pool is initially empty and threads may decide to exit based on a time wait.
|
||||
const int soft_min_threads;
|
||||
const int max_threads;
|
||||
const size_t soft_min_threads;
|
||||
const size_t max_threads;
|
||||
|
||||
thread_pool_t(int soft_min_threads, int max_threads)
|
||||
: soft_min_threads(soft_min_threads), max_threads(max_threads) {
|
||||
assert(soft_min_threads >= 0 && max_threads >= 1 && soft_min_threads <= max_threads &&
|
||||
"Invalid thread min and max");
|
||||
}
|
||||
/// Construct with a soft minimum and maximum thread count.
|
||||
thread_pool_t(size_t soft_min_threads, size_t max_threads)
|
||||
: soft_min_threads(soft_min_threads), max_threads(max_threads) {}
|
||||
|
||||
/// Enqueue a new work item onto the thread pool.
|
||||
/// The function \p func will execute in one of the pool's threads.
|
||||
/// \p completion will run on the main thread, if it is not missing.
|
||||
int perform(void_function_t &&func, void_function_t &&completion);
|
||||
|
||||
private:
|
||||
/// The worker loop for this thread.
|
||||
void *run();
|
||||
|
||||
/// Dequeue a work item (perhaps waiting on the condition variable), or commit to exiting by
|
||||
/// reducing the active thread count.
|
||||
/// This runs in the background thread.
|
||||
maybe_t<work_request_t> dequeue_work_or_commit_to_exit();
|
||||
|
||||
/// Trampoline function for pthread_spawn compatibility.
|
||||
static void *run_trampoline(void *vpool);
|
||||
|
||||
/// Attempt to spawn a new pthread.
|
||||
pthread_t spawn();
|
||||
|
||||
/// No copying or moving.
|
||||
thread_pool_t(const thread_pool_t &) = delete;
|
||||
thread_pool_t(thread_pool_t &&) = delete;
|
||||
void operator=(const thread_pool_t &) = delete;
|
||||
void operator=(thread_pool_t &&) = delete;
|
||||
};
|
||||
|
||||
/// The thread pool for "iothreads" which are used to lift I/O off of the main thread.
|
||||
|
@ -145,30 +169,30 @@ static const notify_pipes_t &get_notify_pipes() {
|
|||
|
||||
/// Dequeue a work item (perhaps waiting on the condition variable), or commit to exiting by
|
||||
/// reducing the active thread count.
|
||||
static maybe_t<work_request_t> iothread_dequeue_work_or_commit_to_exit() {
|
||||
auto &pool = s_io_thread_pool;
|
||||
auto data = pool.data.acquire();
|
||||
|
||||
while (data->request_queue.empty()) {
|
||||
bool give_up = true;
|
||||
if (!data->drain && data->total_threads == pool.soft_min_threads) {
|
||||
// If we exit, it will drop below the soft min. So wait for a while before giving up.
|
||||
data->waiting_threads += 1;
|
||||
auto wait_ret = pool.queue_cond.wait_for(
|
||||
data.get_lock(), std::chrono::milliseconds(IO_WAIT_FOR_WORK_DURATION_MS));
|
||||
data->waiting_threads -= 1;
|
||||
give_up = (wait_ret == std::cv_status::timeout);
|
||||
}
|
||||
if (give_up) {
|
||||
// Balance the total_threads count from when we were spawned.
|
||||
data->total_threads -= 1;
|
||||
return none();
|
||||
}
|
||||
maybe_t<work_request_t> thread_pool_t::dequeue_work_or_commit_to_exit() {
|
||||
auto data = this->req_data.acquire();
|
||||
// If the queue is empty, check to see if we should wait.
|
||||
// We should wait if our exiting would drop us below the soft min.
|
||||
if (data->request_queue.empty() && data->total_threads == this->soft_min_threads) {
|
||||
data->waiting_threads += 1;
|
||||
this->queue_cond.wait_for(data.get_lock(),
|
||||
std::chrono::milliseconds(IO_WAIT_FOR_WORK_DURATION_MS));
|
||||
data->waiting_threads -= 1;
|
||||
}
|
||||
|
||||
// Oh! The queue has work for us!
|
||||
maybe_t<work_request_t> result = std::move(data->request_queue.front());
|
||||
data->request_queue.pop();
|
||||
// Now that we've perhaps waited, see if there's something on the queue.
|
||||
maybe_t<work_request_t> result{};
|
||||
if (!data->request_queue.empty()) {
|
||||
result = std::move(data->request_queue.front());
|
||||
data->request_queue.pop();
|
||||
}
|
||||
// If we are returning none, then ensure we balance the thread count increment from when we were
|
||||
// created. This has to be done here in this awkward place because we've already committed to
|
||||
// exiting - we will never pick up more work. So we need to ensure we decrement the thread count
|
||||
// while holding the lock as we are effectively exited.
|
||||
if (!result) {
|
||||
data->total_threads -= 1;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -178,10 +202,8 @@ static void enqueue_thread_result(work_request_t req) {
|
|||
|
||||
static void *this_thread() { return (void *)(intptr_t)pthread_self(); }
|
||||
|
||||
/// The function that does thread work.
|
||||
static void *iothread_worker(void *unused) {
|
||||
UNUSED(unused);
|
||||
while (auto req = iothread_dequeue_work_or_commit_to_exit()) {
|
||||
void *thread_pool_t::run() {
|
||||
while (auto req = dequeue_work_or_commit_to_exit()) {
|
||||
FLOGF(iothread, L"pthread %p got work", this_thread());
|
||||
|
||||
// Perform the work
|
||||
|
@ -201,24 +223,27 @@ static void *iothread_worker(void *unused) {
|
|||
return nullptr;
|
||||
}
|
||||
|
||||
void *thread_pool_t::run_trampoline(void *pool) {
|
||||
assert(pool && "No thread pool given");
|
||||
return static_cast<thread_pool_t *>(pool)->run();
|
||||
}
|
||||
|
||||
/// Spawn another thread. No lock is held when this is called.
|
||||
static pthread_t iothread_spawn() {
|
||||
pthread_t thread_pool_t::spawn() {
|
||||
// Spawn a thread. If this fails, it means there's already a bunch of threads; it is very
|
||||
// unlikely that they are all on the verge of exiting, so one is likely to be ready to handle
|
||||
// extant requests. So we can ignore failure with some confidence.
|
||||
pthread_t thread = 0;
|
||||
if (make_pthread(&thread, iothread_worker, nullptr)) {
|
||||
if (make_pthread(&thread, run_trampoline, static_cast<void *>(this))) {
|
||||
// We will never join this thread.
|
||||
DIE_ON_FAILURE(pthread_detach(thread));
|
||||
}
|
||||
return thread;
|
||||
}
|
||||
|
||||
int iothread_perform_impl(void_function_t &&func, void_function_t &&completion) {
|
||||
ASSERT_IS_MAIN_THREAD();
|
||||
ASSERT_IS_NOT_FORKED_CHILD();
|
||||
assert(func && "Null function");
|
||||
|
||||
int thread_pool_t::perform(void_function_t &&func, void_function_t &&completion) {
|
||||
assert(func && "Missing function");
|
||||
// Note we permit an empty completion.
|
||||
struct work_request_t req(std::move(func), std::move(completion));
|
||||
int local_thread_count = -1;
|
||||
auto &pool = s_io_thread_pool;
|
||||
|
@ -226,12 +251,13 @@ int iothread_perform_impl(void_function_t &&func, void_function_t &&completion)
|
|||
bool wakeup_thread = false;
|
||||
{
|
||||
// Lock around a local region.
|
||||
auto data = pool.data.acquire();
|
||||
auto data = pool.req_data.acquire();
|
||||
data->request_queue.push(std::move(req));
|
||||
FLOGF(iothread, L"enqueuing work item (count is %lu)", data->request_queue.size());
|
||||
if (data->drain) {
|
||||
// Do nothing here.
|
||||
} else if (data->waiting_threads > 0) {
|
||||
// At least one thread is waiting, we will wake it up.
|
||||
} else if (data->waiting_threads >= data->request_queue.size()) {
|
||||
// There's enough waiting threads, wake one up.
|
||||
wakeup_thread = true;
|
||||
} else if (data->total_threads < pool.max_threads) {
|
||||
// No threads are waiting but we can spawn a new thread.
|
||||
|
@ -243,15 +269,26 @@ int iothread_perform_impl(void_function_t &&func, void_function_t &&completion)
|
|||
|
||||
// Kick off the thread if we decided to do so.
|
||||
if (wakeup_thread) {
|
||||
FLOGF(iothread, L"notifying a thread", this_thread());
|
||||
pool.queue_cond.notify_one();
|
||||
}
|
||||
if (spawn_new_thread) {
|
||||
pthread_t pt = iothread_spawn();
|
||||
FLOGF(iothread, L"pthread %p spawned", pt);
|
||||
if (pthread_t pt = this->spawn()) {
|
||||
FLOGF(iothread, L"pthread %p spawned", pt);
|
||||
} else {
|
||||
// We failed to spawn a thread; decrement the thread count.
|
||||
pool.req_data.acquire()->total_threads -= 1;
|
||||
}
|
||||
}
|
||||
return local_thread_count;
|
||||
}
|
||||
|
||||
int iothread_perform_impl(void_function_t &&func, void_function_t &&completion) {
|
||||
ASSERT_IS_MAIN_THREAD();
|
||||
ASSERT_IS_NOT_FORKED_CHILD();
|
||||
return s_io_thread_pool.perform(std::move(func), std::move(completion));
|
||||
}
|
||||
|
||||
int iothread_port() { return get_notify_pipes().read; }
|
||||
|
||||
void iothread_service_completion() {
|
||||
|
@ -292,7 +329,7 @@ int iothread_drain_all() {
|
|||
auto &pool = s_io_thread_pool;
|
||||
// Set the drain flag.
|
||||
{
|
||||
auto data = pool.data.acquire();
|
||||
auto data = pool.req_data.acquire();
|
||||
assert(!data->drain && "Should not be draining already");
|
||||
data->drain = true;
|
||||
thread_count = data->total_threads;
|
||||
|
@ -304,7 +341,7 @@ int iothread_drain_all() {
|
|||
double now = timef();
|
||||
|
||||
// Nasty polling via select().
|
||||
while (pool.data.acquire()->total_threads > 0) {
|
||||
while (pool.req_data.acquire()->total_threads > 0) {
|
||||
if (iothread_wait_for_pending_completions(1000)) {
|
||||
iothread_service_completion();
|
||||
}
|
||||
|
@ -314,7 +351,7 @@ int iothread_drain_all() {
|
|||
// Even though we released the lock, nobody should have added a new thread while the drain flag
|
||||
// is set.
|
||||
{
|
||||
auto data = pool.data.acquire();
|
||||
auto data = pool.req_data.acquire();
|
||||
assert(data->total_threads == 0 && "Should be no threads");
|
||||
assert(data->drain && "Should be draining");
|
||||
data->drain = false;
|
||||
|
|
Loading…
Reference in New Issue
Block a user