diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index 602c3fa53..ad28ba3d3 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -971,7 +971,7 @@ static void test_debounce() { // Wait until the last completion is done. while (!completion_ran.back()) { - iothread_service_completion(); + iothread_service_main(); } iothread_drain_all(); diff --git a/src/input_common.cpp b/src/input_common.cpp index 2f1bef66d..4c39f884b 100644 --- a/src/input_common.cpp +++ b/src/input_common.cpp @@ -124,7 +124,7 @@ char_event_t input_event_queue_t::readb() { // Check for iothread completions only if there is no data to be read from the stdin. // This gives priority to the foreground. if (ioport > 0 && FD_ISSET(ioport, &fdset)) { - iothread_service_completion(); + iothread_service_main(); if (auto mc = pop_discard_timeouts()) { return *mc; } diff --git a/src/iothread.cpp b/src/iothread.cpp index 3e862e354..b5e145db4 100644 --- a/src/iothread.cpp +++ b/src/iothread.cpp @@ -31,16 +31,9 @@ // which is too low, even tho the system can handle more than 64 threads. #define IO_MAX_THREADS 1024 -// Values for the wakeup bytes sent to the ioport. -#define IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE 99 -#define IO_SERVICE_RESULT_QUEUE 100 - // The amount of time an IO thread many hang around to service requests, in milliseconds. #define IO_WAIT_FOR_WORK_DURATION_MS 500 -static void iothread_service_main_thread_requests(); -static void iothread_service_result_queue(); - using void_function_t = std::function; struct work_request_t { @@ -140,32 +133,38 @@ struct thread_pool_t { /// Leaked to avoid shutdown dtor registration (including tsan). static thread_pool_t &s_io_thread_pool = *(new thread_pool_t(1, IO_MAX_THREADS)); -static owning_lock> s_result_queue; +/// A queue of "things to do on the main thread." +struct main_thread_queue_t { + // Functions to invoke as the completion callback from iothread_perform. + std::vector completions; -// "Do on main thread" support. -// The queue of main thread requests. This queue contains pointers to structs that are -// stack-allocated on the requesting thread. -static owning_lock> s_main_thread_request_queue; + // iothread_perform_on_main requests. + // Note this contains pointers to structs that are stack-allocated on the requesting thread. + std::vector requests; -// Pipes used for notifying. -struct notify_pipes_t { - int read; - int write; + /// Transfer ownership of ourselves to a new queue and return it. + /// 'this' is left empty. + main_thread_queue_t take() { + main_thread_queue_t result; + std::swap(result.completions, this->completions); + std::swap(result.requests, this->requests); + return result; + } + + // Moving is allowed, but not copying. + main_thread_queue_t() = default; + main_thread_queue_t(main_thread_queue_t &&) = default; + main_thread_queue_t &operator=(main_thread_queue_t &&) = default; + main_thread_queue_t(const main_thread_queue_t &) = delete; + void operator=(const main_thread_queue_t &) = delete; }; +static owning_lock s_main_thread_queue; -/// \return the (immortal) set of pipes used for notifying of completions. -static const notify_pipes_t &get_notify_pipes() { - static const notify_pipes_t s_notify_pipes = [] { - auto pipes = make_autoclose_pipes(); - if (!pipes) { - DIE_WITH_ERRNO("Unable to create iothread notify pipes"); - } - // Mark both ends as non-blocking. - if (make_fd_nonblocking(pipes->read.fd())) wperror(L"fcntl"); - if (make_fd_nonblocking(pipes->write.fd())) wperror(L"fcntl"); - return notify_pipes_t{pipes->read.acquire(), pipes->write.acquire()}; - }(); - return s_notify_pipes; +/// \return the signaller for completions and main thread requests. +static fd_event_signaller_t &get_notify_signaller() { + // Leaked to avoid shutdown dtors. + static fd_event_signaller_t *s_signaller = new fd_event_signaller_t(); + return *s_signaller; } /// Dequeue a work item (perhaps waiting on the condition variable), or commit to exiting by @@ -198,10 +197,8 @@ maybe_t thread_pool_t::dequeue_work_or_commit_to_exit() { } static void enqueue_thread_result(void_function_t req) { - s_result_queue.acquire()->push_back(std::move(req)); - const char wakeup_byte = IO_SERVICE_RESULT_QUEUE; - int notify_fd = get_notify_pipes().write; - assert_with_errno(write_loop(notify_fd, &wakeup_byte, sizeof wakeup_byte) != -1); + s_main_thread_queue.acquire()->completions.push_back(std::move(req)); + get_notify_signaller().post(); } static void *this_thread() { return (void *)(intptr_t)pthread_self(); } @@ -285,23 +282,9 @@ void iothread_perform_impl(void_function_t &&func, void_function_t &&completion, s_io_thread_pool.perform(std::move(func), std::move(completion), cant_wait); } -int iothread_port() { return get_notify_pipes().read; } +int iothread_port() { return get_notify_signaller().read_fd(); } -void iothread_service_completion() { - ASSERT_IS_MAIN_THREAD(); - // Drain the read buffer, and then service completions. - // The order is important. - int port = iothread_port(); - char buff[256]; - while (read(port, buff, sizeof buff) > 0) { - // pass - } - - iothread_service_main_thread_requests(); - iothread_service_result_queue(); -} - -static bool iothread_wait_for_pending_completions(long timeout_usec) { +static bool iothread_wait_for_main_requests(long timeout_usec) { const long usec_per_sec = 1000000; struct timeval tv; tv.tv_sec = timeout_usec / usec_per_sec; @@ -314,9 +297,9 @@ static bool iothread_wait_for_pending_completions(long timeout_usec) { return ret > 0; } -void iothread_service_completion_with_timeout(long timeout_usec) { - if (iothread_wait_for_pending_completions(timeout_usec)) { - iothread_service_completion(); +void iothread_service_main_with_timeout(long timeout_usec) { + if (iothread_wait_for_main_requests(timeout_usec)) { + iothread_service_main(); } } @@ -344,9 +327,7 @@ int iothread_drain_all() { // Nasty polling via select(). while (pool.req_data.acquire()->total_threads > 0) { - if (iothread_wait_for_pending_completions(1000)) { - iothread_service_completion(); - } + iothread_service_main_with_timeout(1000); } // Clear the drain flag. @@ -364,33 +345,29 @@ int iothread_drain_all() { return thread_count; } -/// "Do on main thread" support. -static void iothread_service_main_thread_requests() { +// Service the main thread queue, by invoking any functions enqueued for the main thread. +void iothread_service_main() { ASSERT_IS_MAIN_THREAD(); + // Note the order here is important: we must consume events before handling requests, as posting + // uses the opposite order. + (void)get_notify_signaller().try_consume(); // Move the queue to a local variable. - std::vector request_queue; - s_main_thread_request_queue.acquire()->swap(request_queue); + // Note the s_main_thread_queue lock is not held after this. + main_thread_queue_t queue = s_main_thread_queue.acquire()->take(); - // Perform each of the functions. Note we are NOT responsible for deleting these. They are - // stack allocated in their respective threads! - for (main_thread_request_t *req : request_queue) { - req->func(); - req->done.set_value(); - } -} - -// Service the queue of results -static void iothread_service_result_queue() { - // Move the queue to a local variable. - std::vector result_queue; - s_result_queue.acquire()->swap(result_queue); - - // Perform each completion in order - for (const auto &func : result_queue) { + // Perform each completion in order. + for (const void_function_t &func : queue.completions) { // ensure we don't invoke empty functions, that raises an exception if (func) func(); } + + // Perform each main thread request. Note we are NOT responsible for deleting these. They are + // stack allocated in their respective threads! + for (main_thread_request_t *req : queue.requests) { + req->func(); + req->done.set_value(); + } } void iothread_perform_on_main(void_function_t &&func) { @@ -403,12 +380,10 @@ void iothread_perform_on_main(void_function_t &&func) { main_thread_request_t req(std::move(func)); // Append it. Ensure we don't hold the lock after. - s_main_thread_request_queue.acquire()->push_back(&req); + s_main_thread_queue.acquire()->requests.push_back(&req); - // Tell the pipe and then wait until our future is set. - const char wakeup_byte = IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE; - int notify_fd = get_notify_pipes().write; - assert_with_errno(write_loop(notify_fd, &wakeup_byte, sizeof wakeup_byte) != -1); + // Tell the signaller and then wait until our future is set. + get_notify_signaller().post(); req.done.get_future().wait(); } diff --git a/src/iothread.h b/src/iothread.h index c68af8b6b..22edd2a74 100644 --- a/src/iothread.h +++ b/src/iothread.h @@ -14,11 +14,12 @@ /// \return the fd on which to listen for completion callbacks. int iothread_port(); -/// Services iothread completion callbacks. -void iothread_service_completion(); +/// Services iothread main thread completions and requests. +/// This does not block. +void iothread_service_main(); -/// Services completions, except does not wait more than \p timeout_usec. -void iothread_service_completion_with_timeout(long timeout_usec); +// Services any main thread requests. Does not wait more than \p timeout_usec. +void iothread_service_main_with_timeout(long timeout_usec); /// Waits for all iothreads to terminate. /// \return the number of threads that were running. diff --git a/src/reader.cpp b/src/reader.cpp index dc7c27b4f..7180cdeee 100644 --- a/src/reader.cpp +++ b/src/reader.cpp @@ -2480,9 +2480,9 @@ void reader_data_t::finish_highlighting_before_exec() { auto deadline = now + sc::milliseconds(kHighlightTimeoutForExecutionMs); while (now < deadline) { long timeout_usec = sc::duration_cast(deadline - now).count(); - iothread_service_completion_with_timeout(timeout_usec); + iothread_service_main_with_timeout(timeout_usec); - // Note iothread_service_completion_with_timeout will reentrantly modify us, + // Note iothread_service_main_with_timeout will reentrantly modify us, // by invoking a completion. if (in_flight_highlight_request.empty()) break; now = sc::steady_clock::now();