Improve the iothread port structure

Mark both fds in the ioport as non-blocking, and allow bulk reads.
This commit is contained in:
ridiculousfish 2019-06-03 16:42:51 -07:00
parent b478f877ee
commit d1fc8d5f71

View File

@ -96,6 +96,12 @@ static const notify_pipes_t &get_notify_pipes() {
assert_with_errno(pipe(pipes) != -1); assert_with_errno(pipe(pipes) != -1);
set_cloexec(pipes[0]); set_cloexec(pipes[0]);
set_cloexec(pipes[1]); set_cloexec(pipes[1]);
// Mark both ends as non-blocking.
for (int fd : pipes) {
if (make_fd_nonblocking(fd)) {
wperror(L"fcntl");
}
}
return notify_pipes_t{pipes[0], pipes[1]}; return notify_pipes_t{pipes[0], pipes[1]};
}(); }();
return s_notify_pipes; return s_notify_pipes;
@ -194,16 +200,16 @@ int iothread_port() { return get_notify_pipes().read; }
void iothread_service_completion() { void iothread_service_completion() {
ASSERT_IS_MAIN_THREAD(); ASSERT_IS_MAIN_THREAD();
char wakeup_byte; // Drain the read buffer, and then service completions.
// The order is important.
assert_with_errno(read_loop(iothread_port(), &wakeup_byte, sizeof wakeup_byte) == 1); int port = iothread_port();
if (wakeup_byte == IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE) { char buff[256];
iothread_service_main_thread_requests(); while (read(port, buff, sizeof buff) > 0) {
} else if (wakeup_byte == IO_SERVICE_RESULT_QUEUE) { // pass
iothread_service_result_queue();
} else {
FLOGF(error, L"Unknown wakeup byte %02x in %s", wakeup_byte, __FUNCTION__);
} }
iothread_service_main_thread_requests();
iothread_service_result_queue();
} }
static bool iothread_wait_for_pending_completions(long timeout_usec) { static bool iothread_wait_for_pending_completions(long timeout_usec) {
@ -278,6 +284,7 @@ static void iothread_service_main_thread_requests() {
// //
// Because the waiting thread performs step 1 under the lock, if we take the lock, we avoid // Because the waiting thread performs step 1 under the lock, if we take the lock, we avoid
// posting before the waiting thread is waiting. // posting before the waiting thread is waiting.
// TODO: revisit this logic, this feels sketchy.
scoped_lock broadcast_lock(s_main_thread_performer_lock); scoped_lock broadcast_lock(s_main_thread_performer_lock);
s_main_thread_performer_cond.notify_all(); s_main_thread_performer_cond.notify_all();
} }
@ -287,7 +294,7 @@ static void iothread_service_main_thread_requests() {
static void iothread_service_result_queue() { static void iothread_service_result_queue() {
// Move the queue to a local variable. // Move the queue to a local variable.
std::queue<spawn_request_t> result_queue; std::queue<spawn_request_t> result_queue;
(*s_result_queue.acquire()).swap(result_queue); s_result_queue.acquire()->swap(result_queue);
// Perform each completion in order // Perform each completion in order
while (!result_queue.empty()) { while (!result_queue.empty()) {