Add a cant_wait parameter to iothread_perform

Sometimes we must spawn a new thread, to avoid the risk of deadlock.
Ensure we always spawn a thread in those cases. In particular this
includes the fillthread.
This commit is contained in:
ridiculousfish 2020-01-18 11:32:44 -08:00
parent d38db1bc61
commit c14d54032f
4 changed files with 18 additions and 9 deletions

View File

@ -301,7 +301,7 @@ static void run_internal_process(process_t *p, std::string &&outdata, std::strin
// builtin_run provide this directly, rather than setting it in the process.
f->success_status = p->status;
iothread_perform([f]() {
iothread_perform_cantwait([f]() {
proc_status_t status = f->success_status;
if (!f->skip_out()) {
ssize_t ret = write_loop(f->src_outfd, f->outdata.data(), f->outdata.size());

View File

@ -157,7 +157,7 @@ void io_buffer_t::begin_background_fillthread(autoclose_fd_t fd) {
// Run our function to read until the receiver is closed.
// It's OK to capture 'this' by value because 'this' owns the background thread and waits for it
// before dtor.
iothread_perform([this, promise, fdref]() {
iothread_perform_cantwait([this, promise, fdref]() {
this->run_background_fillthread(std::move(*fdref));
promise->set_value();
});

View File

@ -101,7 +101,9 @@ struct thread_pool_t {
/// Enqueue a new work item onto the thread pool.
/// The function \p func will execute in one of the pool's threads.
/// \p completion will run on the main thread, if it is not missing.
int perform(void_function_t &&func, void_function_t &&completion);
/// If \p cant_wait is set, disrespect the thread limit, because extant threads may
///want to wait for new threads.
int perform(void_function_t &&func, void_function_t &&completion, bool cant_wait);
private:
/// The worker loop for this thread.
@ -229,7 +231,7 @@ bool thread_pool_t::spawn() {
return make_detached_pthread(&run_trampoline, static_cast<void *>(this));
}
int thread_pool_t::perform(void_function_t &&func, void_function_t &&completion) {
int thread_pool_t::perform(void_function_t &&func, void_function_t &&completion, bool cant_wait) {
assert(func && "Missing function");
// Note we permit an empty completion.
struct work_request_t req(std::move(func), std::move(completion));
@ -247,8 +249,8 @@ int thread_pool_t::perform(void_function_t &&func, void_function_t &&completion)
} else if (data->waiting_threads >= data->request_queue.size()) {
// There's enough waiting threads, wake one up.
wakeup_thread = true;
} else if (data->total_threads < pool.max_threads) {
// No threads are waiting but we can spawn a new thread.
} else if (cant_wait || data->total_threads < pool.max_threads) {
// No threads are waiting but we can or must spawn a new thread.
data->total_threads += 1;
spawn_new_thread = true;
}
@ -274,10 +276,10 @@ int thread_pool_t::perform(void_function_t &&func, void_function_t &&completion)
return local_thread_count;
}
int iothread_perform_impl(void_function_t &&func, void_function_t &&completion) {
int iothread_perform_impl(void_function_t &&func, void_function_t &&completion, bool cant_wait) {
ASSERT_IS_MAIN_THREAD();
ASSERT_IS_NOT_FORKED_CHILD();
return s_io_thread_pool.perform(std::move(func), std::move(completion));
return s_io_thread_pool.perform(std::move(func), std::move(completion), cant_wait);
}
int iothread_port() { return get_notify_pipes().read; }

View File

@ -31,7 +31,7 @@ void iothread_service_completion(void);
int iothread_drain_all(void);
// Internal implementation
int iothread_perform_impl(std::function<void(void)> &&func, std::function<void(void)> &&completion);
int iothread_perform_impl(std::function<void(void)> &&func, std::function<void(void)> &&completion, bool cant_wait = false);
// Template helpers
// This is the glue part of the handler-completion handoff
@ -75,6 +75,13 @@ inline int iothread_perform(std::function<void(void)> &&func) {
return iothread_perform_impl(std::move(func), {});
}
/// Variant of iothread_perform that disrespects the thread limit.
/// It does its best to spawn a new thread if all other threads are occupied.
/// This is for cases where deferring a new thread might lead to deadlock.
inline int iothread_perform_cantwait(std::function<void(void)> &&func) {
return iothread_perform_impl(std::move(func), {}, true);
}
/// Performs a function on the main thread, blocking until it completes.
void iothread_perform_on_main(std::function<void(void)> &&func);