From d1fc8d5f7108a46f21eb09562af97ec774c6ee3a Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Mon, 3 Jun 2019 16:42:51 -0700 Subject: [PATCH] Improve the iothread port structure Mark both fds in the ioport as non-blocking, and allow bulk reads. --- src/iothread.cpp | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/iothread.cpp b/src/iothread.cpp index fe0c05049..f2ed12d52 100644 --- a/src/iothread.cpp +++ b/src/iothread.cpp @@ -96,6 +96,12 @@ static const notify_pipes_t &get_notify_pipes() { assert_with_errno(pipe(pipes) != -1); set_cloexec(pipes[0]); set_cloexec(pipes[1]); + // Mark both ends as non-blocking. + for (int fd : pipes) { + if (make_fd_nonblocking(fd)) { + wperror(L"fcntl"); + } + } return notify_pipes_t{pipes[0], pipes[1]}; }(); return s_notify_pipes; @@ -194,16 +200,16 @@ int iothread_port() { return get_notify_pipes().read; } void iothread_service_completion() { ASSERT_IS_MAIN_THREAD(); - char wakeup_byte; - - assert_with_errno(read_loop(iothread_port(), &wakeup_byte, sizeof wakeup_byte) == 1); - if (wakeup_byte == IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE) { - iothread_service_main_thread_requests(); - } else if (wakeup_byte == IO_SERVICE_RESULT_QUEUE) { - iothread_service_result_queue(); - } else { - FLOGF(error, L"Unknown wakeup byte %02x in %s", wakeup_byte, __FUNCTION__); + // Drain the read buffer, and then service completions. + // The order is important. + int port = iothread_port(); + char buff[256]; + while (read(port, buff, sizeof buff) > 0) { + // pass } + + iothread_service_main_thread_requests(); + iothread_service_result_queue(); } static bool iothread_wait_for_pending_completions(long timeout_usec) { @@ -278,6 +284,7 @@ static void iothread_service_main_thread_requests() { // // Because the waiting thread performs step 1 under the lock, if we take the lock, we avoid // posting before the waiting thread is waiting. + // TODO: revisit this logic, this feels sketchy. scoped_lock broadcast_lock(s_main_thread_performer_lock); s_main_thread_performer_cond.notify_all(); } @@ -287,7 +294,7 @@ static void iothread_service_main_thread_requests() { static void iothread_service_result_queue() { // Move the queue to a local variable. std::queue result_queue; - (*s_result_queue.acquire()).swap(result_queue); + s_result_queue.acquire()->swap(result_queue); // Perform each completion in order while (!result_queue.empty()) {