From 65e1c42a2b5bd94f18346c80e1e151e82fc744d1 Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Thu, 20 Aug 2020 13:57:51 -0700 Subject: [PATCH] topic_monitor to use binary semaphore instead of self-pipe With the prior commit, the topic_monitor only writes to the pipe if a thread is known to be waiting. This is effectively a binary semaphore, and on systems that support anon semaphores (yes Linux, but not Mac) we can use them. These are more efficient than self-pipes. We add a binary_semaphore_t class which uses sem_t if sem_init succeeds, and a self-pipe if it fails. On Linux the seq_echo benchmark (run 1024 times) goes from 12.40 seconds to 11.59 seconds, about an 11% improvement. --- src/topic_monitor.cpp | 122 +++++++++++++++++++++++++++--------------- src/topic_monitor.h | 52 ++++++++++++++++-- 2 files changed, 125 insertions(+), 49 deletions(-) diff --git a/src/topic_monitor.cpp b/src/topic_monitor.cpp index e682acf7b..5bc8dacb3 100644 --- a/src/topic_monitor.cpp +++ b/src/topic_monitor.cpp @@ -37,6 +37,78 @@ wcstring generation_list_t::describe() const { return result; } +binary_semaphore_t::binary_semaphore_t() : sem_ok_(false) { + // sem_init always fails with ENOSYS on Mac and has an annoying deprecation warning. +#ifndef __APPLE__ + sem_ok_ = (0 == sem_init(&sem_, 0, 0)); +#endif + if (!sem_ok_) { + auto pipes = make_autoclose_pipes({}); + assert(pipes.has_value() && "Failed to make pubsub pipes"); + pipes_ = pipes.acquire(); + +#ifdef TOPIC_MONITOR_TSAN_WORKAROUND + DIE_ON_FAILURE(make_fd_nonblocking(pipes_.read.fd())); +#endif + } +} + +binary_semaphore_t::~binary_semaphore_t() { +#ifndef __APPLE__ + if (sem_ok_) (void)sem_destroy(&sem_); +#endif +} + +void binary_semaphore_t::die(const wchar_t *msg) const { + wperror(msg); + DIE("unexpected failure"); +} + +void binary_semaphore_t::post() { + if (sem_ok_) { + int res = sem_post(&sem_); + // sem_post is non-interruptible. + if (res < 0) die(L"sem_post"); + } else { + // Write exactly one byte. + ssize_t ret; + do { + const uint8_t v = 0; + ret = write(pipes_.write.fd(), &v, sizeof v); + } while (ret < 0 && errno == EINTR); + if (ret < 0) die(L"write"); + } +} + +void binary_semaphore_t::wait() { + if (sem_ok_) { + int res; + do { + res = sem_wait(&sem_); + } while (res < 0 && errno == EINTR); + // Other errors here are very unexpected. + if (res < 0) die(L"sem_wait"); + } else { + int fd = pipes_.read.fd(); +#ifdef TOPIC_MONITOR_TSAN_WORKAROUND + // Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read() call + // until data is available (that is, fish would use 100% cpu while waiting for processes). + // The select prevents that. + fd_set fds; + FD_ZERO(&fds); + FD_SET(fd, &fds); + (void)select(fd + 1, &fds, nullptr, nullptr, nullptr /* timeout */); +#endif + // We must read exactly one byte. + for (;;) { + uint8_t ignored; + auto amt = read(fd, &ignored, sizeof ignored); + if (amt == 1) break; + if (amt < 0 && errno != EINTR) die(L"read"); + } + } +} + /// Implementation of the principal monitor. This uses new (and leaks) to avoid registering a /// pointless at-exit handler for the dtor. static topic_monitor_t *const s_principal = new topic_monitor_t(); @@ -47,21 +119,7 @@ topic_monitor_t &topic_monitor_t::principal() { return *s_principal; } -topic_monitor_t::topic_monitor_t() { - // Set up our pipes. Assert it succeeds. - auto pipes = make_autoclose_pipes({}); - assert(pipes.has_value() && "Failed to make pubsub pipes"); - pipes_ = pipes.acquire(); - - // Make sure that our write side doesn't block, else we risk hanging in a signal handler. - // The read end must block to avoid spinning in await. - DIE_ON_FAILURE(make_fd_nonblocking(pipes_.write.fd())); - -#ifdef TOPIC_MONITOR_TSAN_WORKAROUND - DIE_ON_FAILURE(make_fd_nonblocking(pipes_.read.fd())); -#endif -} - +topic_monitor_t::topic_monitor_t() = default; topic_monitor_t::~topic_monitor_t() = default; void topic_monitor_t::post(topic_t topic) { @@ -94,14 +152,7 @@ void topic_monitor_t::post(topic_t topic) { // Check if we should wake up a thread because it was waiting. if (oldstatus & STATUS_NEEDS_WAKEUP) { std::atomic_thread_fence(std::memory_order_release); - ssize_t ret; - do { - // We must write exactly one byte. - // write() is async signal safe. - const uint8_t v = 0; - ret = write(pipes_.write.fd(), &v, sizeof v); - } while (ret < 0 && errno == EINTR); - // Ignore EAGAIN and other errors (which conceivably could occur during shutdown). + sema_.post(); } } @@ -190,28 +241,11 @@ generation_list_t topic_monitor_t::await_gens(const generation_list_t &input_gen // Note we no longer hold the lock. assert(gens == input_gens && "Generations should not have changed if we are the reader."); - int fd = pipes_.read.fd(); -#ifdef TOPIC_MONITOR_TSAN_WORKAROUND - // Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read() - // call until data is available (that is, fish would use 100% cpu while waiting for - // processes). The select prevents that. - fd_set fds; - FD_ZERO(&fds); - FD_SET(fd, &fds); - (void)select(fd + 1, &fds, nullptr, nullptr, nullptr /* timeout */); -#endif - // We must read exactly one byte. - for (;;) { - uint8_t ignored; - auto amt = read(fd, &ignored, sizeof ignored); - if (amt == 1) break; - if (amt < 0 && errno != EINTR && errno != EINTR) { - wperror(L"read"); - DIE("self-pipe read unexpected failure"); - } - } - // We are finished reading. We must stop being the reader, and post on the condition + // Wait to be woken up. + sema_.wait(); + + // We are finished waiting. We must stop being the reader, and post on the condition // variable to wake up any other threads waiting for us to finish reading. auto data = data_.acquire(); gens = data->current; diff --git a/src/topic_monitor.h b/src/topic_monitor.h index 76e6818f2..42d2dbe71 100644 --- a/src/topic_monitor.h +++ b/src/topic_monitor.h @@ -1,6 +1,8 @@ #ifndef FISH_TOPIC_MONITOR_H #define FISH_TOPIC_MONITOR_H +#include + #include #include #include @@ -127,8 +129,48 @@ class generation_list_t { : sighupint(sighupint), sigchld(sigchld), internal_exit(internal_exit) {} }; +/// A simple binary semaphore. +/// On systems that do not support unnamed semaphores (macOS in particular) this is built on top of +/// a self-pipe. Note that post() must be async-signal safe. +class binary_semaphore_t { + public: + binary_semaphore_t(); + ~binary_semaphore_t(); + + /// Release a waiting thread. + void post(); + + /// Wait for a post. + /// This loops on EINTR. + void wait(); + + private: + // Print a message and exit. + void die(const wchar_t *msg) const; + + // Whether our semaphore was successfully initialized. + bool sem_ok_{}; + + // The semaphore, if initalized. + sem_t sem_{}; + + // Pipes used to emulate a semaphore, if not initialized. + autoclose_pipes_t pipes_{}; +}; + /// The topic monitor class. This permits querying the current generation values for topics, /// optionally blocking until they increase. +/// What we would like to write is that we have a set of topics, and threads wait for changes on a +/// condition variable which is tickled in post(). But this can't work because post() may be called +/// from a signal handler and condition variables are not async-signal safe. +/// So instead the signal handler announces changes via a binary semaphore. +/// In the wait case, what generally happens is: +/// A thread fetches the generations, see they have not changed, and then decides to try to wait. +/// It does so by atomically swapping in STATUS_NEEDS_WAKEUP to the status bits. +/// If that succeeds, it waits on the binary semaphore. The post() call will then wake the thread +/// up. If if failed, then either a post() call updated the status values (so perhaps there is a +/// new topic post) or some other thread won the race and called wait() on the semaphore. Here our +/// thread will wait on the data_notifier_ queue. class topic_monitor_t { private: using topic_bitmask_t = uint8_t; @@ -139,6 +181,7 @@ class topic_monitor_t { generation_list_t current{}; /// A flag indicating that there is a current reader. + /// The 'reader' is responsible for calling sema_.wait(). bool has_reader{false}; }; owning_lock data_{}; @@ -160,11 +203,10 @@ class topic_monitor_t { /// Note it is an error for this bit to be set and also any topic bit. static constexpr uint8_t STATUS_NEEDS_WAKEUP = 128; - /// Self-pipes used to communicate changes. - /// The writer is a signal handler. - /// "The reader" refers to a thread that wants to wait for changes. Only one thread can be the - /// reader at a given time. - autoclose_pipes_t pipes_; + /// Binary semaphore used to communicate changes. + /// If status_ is STATUS_NEEDS_WAKEUP, then a thread has commited to call wait() on our sema and + /// this must be balanced by the next call to post(). Note only one thread may wait at a time. + binary_semaphore_t sema_{}; /// Apply any pending updates to the data. /// This accepts data because it must be locked.