diff --git a/src/exec.cpp b/src/exec.cpp index 0d509a64f..304b5a287 100644 --- a/src/exec.cpp +++ b/src/exec.cpp @@ -301,7 +301,7 @@ static void run_internal_process(process_t *p, std::string &&outdata, std::strin // builtin_run provide this directly, rather than setting it in the process. f->success_status = p->status; - iothread_perform([f]() { + iothread_perform_cantwait([f]() { proc_status_t status = f->success_status; if (!f->skip_out()) { ssize_t ret = write_loop(f->src_outfd, f->outdata.data(), f->outdata.size()); diff --git a/src/io.cpp b/src/io.cpp index 38c088f67..a28f61010 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -157,7 +157,7 @@ void io_buffer_t::begin_background_fillthread(autoclose_fd_t fd) { // Run our function to read until the receiver is closed. // It's OK to capture 'this' by value because 'this' owns the background thread and waits for it // before dtor. - iothread_perform([this, promise, fdref]() { + iothread_perform_cantwait([this, promise, fdref]() { this->run_background_fillthread(std::move(*fdref)); promise->set_value(); }); diff --git a/src/iothread.cpp b/src/iothread.cpp index df015681d..5f40fe5bc 100644 --- a/src/iothread.cpp +++ b/src/iothread.cpp @@ -101,7 +101,9 @@ struct thread_pool_t { /// Enqueue a new work item onto the thread pool. /// The function \p func will execute in one of the pool's threads. /// \p completion will run on the main thread, if it is not missing. - int perform(void_function_t &&func, void_function_t &&completion); + /// If \p cant_wait is set, disrespect the thread limit, because extant threads may + ///want to wait for new threads. + int perform(void_function_t &&func, void_function_t &&completion, bool cant_wait); private: /// The worker loop for this thread. @@ -229,7 +231,7 @@ bool thread_pool_t::spawn() { return make_detached_pthread(&run_trampoline, static_cast(this)); } -int thread_pool_t::perform(void_function_t &&func, void_function_t &&completion) { +int thread_pool_t::perform(void_function_t &&func, void_function_t &&completion, bool cant_wait) { assert(func && "Missing function"); // Note we permit an empty completion. struct work_request_t req(std::move(func), std::move(completion)); @@ -247,8 +249,8 @@ int thread_pool_t::perform(void_function_t &&func, void_function_t &&completion) } else if (data->waiting_threads >= data->request_queue.size()) { // There's enough waiting threads, wake one up. wakeup_thread = true; - } else if (data->total_threads < pool.max_threads) { - // No threads are waiting but we can spawn a new thread. + } else if (cant_wait || data->total_threads < pool.max_threads) { + // No threads are waiting but we can or must spawn a new thread. data->total_threads += 1; spawn_new_thread = true; } @@ -274,10 +276,10 @@ int thread_pool_t::perform(void_function_t &&func, void_function_t &&completion) return local_thread_count; } -int iothread_perform_impl(void_function_t &&func, void_function_t &&completion) { +int iothread_perform_impl(void_function_t &&func, void_function_t &&completion, bool cant_wait) { ASSERT_IS_MAIN_THREAD(); ASSERT_IS_NOT_FORKED_CHILD(); - return s_io_thread_pool.perform(std::move(func), std::move(completion)); + return s_io_thread_pool.perform(std::move(func), std::move(completion), cant_wait); } int iothread_port() { return get_notify_pipes().read; } diff --git a/src/iothread.h b/src/iothread.h index ad55b0996..4dd646a78 100644 --- a/src/iothread.h +++ b/src/iothread.h @@ -31,7 +31,7 @@ void iothread_service_completion(void); int iothread_drain_all(void); // Internal implementation -int iothread_perform_impl(std::function &&func, std::function &&completion); +int iothread_perform_impl(std::function &&func, std::function &&completion, bool cant_wait = false); // Template helpers // This is the glue part of the handler-completion handoff @@ -75,6 +75,13 @@ inline int iothread_perform(std::function &&func) { return iothread_perform_impl(std::move(func), {}); } +/// Variant of iothread_perform that disrespects the thread limit. +/// It does its best to spawn a new thread if all other threads are occupied. +/// This is for cases where deferring a new thread might lead to deadlock. +inline int iothread_perform_cantwait(std::function &&func) { + return iothread_perform_impl(std::move(func), {}, true); +} + /// Performs a function on the main thread, blocking until it completes. void iothread_perform_on_main(std::function &&func);