diff --git a/src/iothread.cpp b/src/iothread.cpp index 6d83e9897..d8d6771aa 100644 --- a/src/iothread.cpp +++ b/src/iothread.cpp @@ -31,6 +31,9 @@ #define IO_MAX_THREADS 64 #endif +// The minimum number of threads kept waiting in the pool. +#define IO_MIN_THREADS 1 + // Values for the wakeup bytes sent to the ioport. #define IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE 99 #define IO_SERVICE_RESULT_QUEUE 100 @@ -40,19 +43,17 @@ static void iothread_service_result_queue(); typedef std::function void_function_t; -struct spawn_request_t { +struct work_request_t { void_function_t handler; void_function_t completion; - spawn_request_t() {} - - spawn_request_t(void_function_t &&f, void_function_t &&comp) : handler(f), completion(comp) {} + work_request_t(void_function_t &&f, void_function_t &&comp) : handler(f), completion(comp) {} // Move-only - spawn_request_t &operator=(const spawn_request_t &) = delete; - spawn_request_t &operator=(spawn_request_t &&) = default; - spawn_request_t(const spawn_request_t &) = delete; - spawn_request_t(spawn_request_t &&) = default; + work_request_t &operator=(const work_request_t &) = delete; + work_request_t &operator=(work_request_t &&) = default; + work_request_t(const work_request_t &) = delete; + work_request_t(work_request_t &&) = default; }; struct main_thread_request_t { @@ -68,13 +69,17 @@ struct main_thread_request_t { main_thread_request_t(main_thread_request_t &&) = delete; }; -// Spawn support. Requests are allocated and come in on request_queue and go out on result_queue -struct thread_data_t { - std::queue request_queue; +/// Data about the current set of IO threads and requests. +struct thread_pool_data_t { + /// The queue of outstanding, unclaimed requests. + std::queue request_queue; + + /// The number of extant threads which are able to run requests. int thread_count = 0; }; -static owning_lock s_spawn_requests; -static owning_lock> s_result_queue; +static owning_lock s_thread_pool; + +static owning_lock> s_result_queue; // "Do on main thread" support. static std::mutex s_main_thread_performer_lock; // protects the main thread requests @@ -108,17 +113,17 @@ static const notify_pipes_t &get_notify_pipes() { return s_notify_pipes; } -static bool dequeue_spawn_request(spawn_request_t *result) { - auto requests = s_spawn_requests.acquire(); +static maybe_t dequeue_spawn_request() { + maybe_t result{}; + auto requests = s_thread_pool.acquire(); if (!requests->request_queue.empty()) { - *result = std::move(requests->request_queue.front()); + result = std::move(requests->request_queue.front()); requests->request_queue.pop(); - return true; } - return false; + return result; } -static void enqueue_thread_result(spawn_request_t req) { +static void enqueue_thread_result(work_request_t req) { s_result_queue.acquire()->push(std::move(req)); } @@ -127,18 +132,17 @@ static void *this_thread() { return (void *)(intptr_t)pthread_self(); } /// The function that does thread work. static void *iothread_worker(void *unused) { UNUSED(unused); - struct spawn_request_t req; - while (dequeue_spawn_request(&req)) { + while (auto req = dequeue_spawn_request()) { debug(5, "pthread %p dequeued", this_thread()); // Perform the work - req.handler(); + req->handler(); // If there's a completion handler, we have to enqueue it on the result queue. // Note we're using std::function's weirdo operator== here - if (req.completion != nullptr) { + if (req->completion != nullptr) { // Enqueue the result, and tell the main thread about it. - enqueue_thread_result(std::move(req)); + enqueue_thread_result(req.acquire()); const char wakeup_byte = IO_SERVICE_RESULT_QUEUE; int notify_fd = get_notify_pipes().write; assert_with_errno(write_loop(notify_fd, &wakeup_byte, sizeof wakeup_byte) != -1); @@ -150,9 +154,9 @@ static void *iothread_worker(void *unused) { // it's possible that the main thread saw that thread_count is full, and decided to not // spawn a new thread, trusting in one of the existing threads to handle it. But we've already // committed to not handling anything else. Therefore, we have to decrement - // the thread count under the lock, which we still hold. Likewise, the main thread must - // check the value under the lock. - int new_thread_count = --s_spawn_requests.acquire()->thread_count; + // the thread count under the lock. Likewise, the main thread must check the value under the + // lock. + int new_thread_count = --s_thread_pool.acquire()->thread_count; assert(new_thread_count >= 0); debug(5, "pthread %p exiting", this_thread()); @@ -176,12 +180,12 @@ int iothread_perform_impl(void_function_t &&func, void_function_t &&completion) ASSERT_IS_MAIN_THREAD(); ASSERT_IS_NOT_FORKED_CHILD(); - struct spawn_request_t req(std::move(func), std::move(completion)); + struct work_request_t req(std::move(func), std::move(completion)); int local_thread_count = -1; bool spawn_new_thread = false; { // Lock around a local region. - auto spawn_reqs = s_spawn_requests.acquire(); + auto spawn_reqs = s_thread_pool.acquire(); spawn_reqs->request_queue.push(std::move(req)); if (spawn_reqs->thread_count < IO_MAX_THREADS) { spawn_reqs->thread_count++; @@ -246,7 +250,7 @@ void iothread_drain_all() { #endif // Nasty polling via select(). - while (s_spawn_requests.acquire()->thread_count > 0) { + while (s_thread_pool.acquire()->thread_count > 0) { if (iothread_wait_for_pending_completions(1000)) { iothread_service_completion(); } @@ -294,12 +298,12 @@ static void iothread_service_main_thread_requests() { // Service the queue of results static void iothread_service_result_queue() { // Move the queue to a local variable. - std::queue result_queue; + std::queue result_queue; s_result_queue.acquire()->swap(result_queue); // Perform each completion in order while (!result_queue.empty()) { - spawn_request_t req(std::move(result_queue.front())); + work_request_t req(std::move(result_queue.front())); result_queue.pop(); // ensure we don't invoke empty functions, that raises an exception if (req.completion != nullptr) {