From 8a263952adb3275be54cd59c04a657109fe341b7 Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Mon, 5 May 2014 23:33:05 -0700 Subject: [PATCH] Fix named pipe universal notifier. No more threads. Tests now pass. --- env_universal_common.cpp | 226 +++++++++++++++++++++++++-------------- env_universal_common.h | 4 +- fish_tests.cpp | 41 ++++--- input_common.cpp | 2 +- 4 files changed, 172 insertions(+), 101 deletions(-) diff --git a/env_universal_common.cpp b/env_universal_common.cpp index f64753c95..f165f7204 100644 --- a/env_universal_common.cpp +++ b/env_universal_common.cpp @@ -1472,11 +1472,6 @@ class universal_notifier_shmem_poller_t : public universal_notifier_t } } - bool needs_polling() const - { - return true; - } - bool poll() { bool result = false; @@ -1502,7 +1497,7 @@ class universal_notifier_shmem_poller_t : public universal_notifier_t unsigned long usec_per_sec = 1000000; if (get_time() - last_change_time < 5LL * usec_per_sec) { - return usec_per_sec / 25; //10 times a second + return usec_per_sec / 10; //10 times a second } else { @@ -1560,7 +1555,7 @@ public: return notify_fd; } - bool drain_notification_fd(int fd) + bool notification_fd_became_readable(int fd) { /* notifyd notifications come in as 32 bit values. We don't care about the value. We set ourselves as non-blocking, so just read until we can't read any more. */ assert(fd == notify_fd); @@ -1656,7 +1651,7 @@ public: return watch_fd; } - bool drain_notification_fd(int fd) + bool notification_fd_became_readable(int fd) { assert(fd == watch_fd); bool result = false; @@ -1689,14 +1684,18 @@ public: } }; +#define NAMED_PIPE_FLASH_DURATION_USEC (1000000 / 10) + +#define SUSTAINED_READABILITY_CLEANUP_DURATION_USEC (1000000 * 5) + class universal_notifier_named_pipe_t : public universal_notifier_t { int pipe_fd; + long long readback_time_usec; + size_t readback_amount; - // Remaining variables are protected by this lock - lock_t lock; - unsigned notification_seed; - bool notifier_thread_running; + bool is_readable; + long long drain_if_still_readable_time_usec; void make_pipe(const wchar_t *test_path) { @@ -1727,101 +1726,162 @@ class universal_notifier_named_pipe_t : public universal_notifier_t } } - static int notify_in_background(universal_notifier_named_pipe_t *self) + void drain_excessive_data() { - // We need to write some data (any data) to the pipe, then sleep for a while, then read it back. - // Nobody is expected to read it except us. - // For debugging, we write our pid. - // Because we are in a background thread with all signals masked, we do not expect to get EINTR - const int pid_nbo = htonl(getpid()); - scoped_lock locker(self->lock); - assert(self->notifier_thread_running); - for (;;) - { - // Determine the seed at the time we post our request - const unsigned initial_seed = self->notification_seed; - - // Perform a notification for that seed - locker.unlock(); - errno = 0; - ssize_t amt_written = write(self->pipe_fd, &pid_nbo, sizeof pid_nbo); - bool wrote_all = (amt_written == sizeof pid_nbo); - int err = errno; - - if (! wrote_all) - { - // Paranoia. If for some reason our pipe is filled up, then we drain it. - // This might happen if there's a bug, or if the user manually redirects something into our pipe - bool wrote_partial = (amt_written >= 0 && amt_written < sizeof pid_nbo); - bool pipe_full = (wrote_partial || err == EWOULDBLOCK || err == EAGAIN); - if (pipe_full) - { - // Drain the pipe - unsigned char buff[256]; - while (read(pid_nbo, buff, sizeof buff) > 0) - { - // Keep reading - } - } - } - - // Now sleep a little - const long useconds_per_second = 1000000; - usleep(useconds_per_second / 25); - - // Read back what we we wrote - int read_back; - read_ignore(self->pipe_fd, &read_back, sizeof read_back); - - // See if we need to go around again - locker.lock(); - if (initial_seed == self->notification_seed) - { - // No more notifications came in, we're done - break; - } - } - // Now we're done - // Note that we're still locked, so it's safe to manipulate this variable - self->notifier_thread_running = false; - return 0; + // The pipe seems to have data on it, that won't go away + // Read a big chunk out of it. + // We don't read until it's exhausted, because if someone were to pipe say /dev/null, that would cause us to hang! + size_t read_amt = 64 * 1024; + void *buff = malloc(read_amt); + read_ignore(this->pipe_fd, buff, read_amt); + free(buff); } public: - universal_notifier_named_pipe_t(const wchar_t *test_path) : pipe_fd(-1), notification_seed(0), notifier_thread_running(false) + universal_notifier_named_pipe_t(const wchar_t *test_path) : pipe_fd(-1), readback_time_usec(0), readback_amount(0), is_readable(false), drain_if_still_readable_time_usec(0) { make_pipe(test_path); } - int notification_fd() + ~universal_notifier_named_pipe_t() { - return pipe_fd; + if (pipe_fd >= 0) + { + close(pipe_fd); + } } - bool drain_notification_fd(int fd) + int notification_fd() { - // We deliberately do nothing here - return false; + int result = -1; + if (! is_readable) + { + result = pipe_fd; + } + return result; + } + + bool notification_fd_became_readable(int fd) + { + // Our fd is readable. We deliberately do not read anything out of it: if we did, other sessions may miss the notification. + // Instead, we go into "polling mode:" we do not select() on our fd for a while, and then sync in the future. + // However, if we remain readable for too long, we'll read out data + if (readback_time_usec > 0) + { + is_readable = true; + drain_if_still_readable_time_usec = get_time() + SUSTAINED_READABILITY_CLEANUP_DURATION_USEC; + } + return true; } void post_notification() { if (pipe_fd >= 0) { - scoped_lock locker(lock); - notification_seed++; - if (! notifier_thread_running) + // We need to write some data (any data) to the pipe, then wait for a while, then read it back. + // Nobody is expected to read it except us. + int pid_nbo = htonl(getpid()); + ssize_t amt_written = write(this->pipe_fd, &pid_nbo, sizeof pid_nbo); + if (amt_written < 0) { - // Need to kick it off - notifier_thread_running = true; - iothread_perform(notify_in_background, this); + if (errno == EWOULDBLOCK || errno == EAGAIN) + { + // Very unsual: the pipe is full! + drain_excessive_data(); + } + } + + // Now schedule a read for some time in the future + readback_time_usec = get_time() + NAMED_PIPE_FLASH_DURATION_USEC; + readback_amount += sizeof pid_nbo; + } + } + + unsigned long usec_delay_between_polls() const + { + unsigned long result = 0; + if (this->readback_time_usec > 0) + { + // How long until the readback? + long long now = get_time(); + if (now >= this->readback_time_usec) + { + // Oops, it already passed! Return something tiny. + result = 1000; + } + else + { + result = (unsigned long)(this->readback_time_usec - now); } } + + if (is_readable) + { + // We're in polling mode + // Don't return a value less than our polling interval + if (result == 0 || result > NAMED_PIPE_FLASH_DURATION_USEC) + { + result = NAMED_PIPE_FLASH_DURATION_USEC; + } + } + + return result; + } + + bool poll() + { + bool result = false; + + // Check if we are past the readback time + if (this->readback_time_usec > 0 && get_time() >= this->readback_time_usec) + { + // Read back what we wrote. We do nothing with the value. + while (this->readback_amount > 0) + { + char buff[64]; + size_t amt_to_read = mini(this->readback_amount, sizeof buff); + read_ignore(this->pipe_fd, buff, amt_to_read); + this->readback_amount -= amt_to_read; + } + assert(this->readback_amount == 0); + this->readback_time_usec = 0; + } + + // Check to see if we are doing readability polling + if (is_readable && pipe_fd >= 0) + { + // See if this is still readable + fd_set fds; + FD_ZERO(&fds); + FD_SET(this->pipe_fd, &fds); + struct timeval timeout = {}; + select(this->pipe_fd + 1, &fds, NULL, NULL, &timeout); + if (! FD_ISSET(this->pipe_fd, &fds)) + { + // No longer readable + is_readable = false; + drain_if_still_readable_time_usec = 0; + // Sync with the file to pick up any changes + result = true; + } + else + { + // Still readable. If it's been readable for a long time, there is probably lingering data on the pipe + if (get_time() >= drain_if_still_readable_time_usec) + { + drain_excessive_data(); + } + } + } + + // We use poll() as just a way to clean up our own messes. The real magic happens in notification_fd_became_readable + return false; } }; universal_notifier_t::notifier_strategy_t universal_notifier_t::resolve_default_strategy() { + return strategy_named_pipe; #if FISH_NOTIFYD_AVAILABLE return strategy_notifyd; #else @@ -1886,7 +1946,7 @@ bool universal_notifier_t::poll() bool universal_notifier_t::needs_polling() const { - return false; + return this->usec_delay_between_polls() > 0; } unsigned long universal_notifier_t::usec_delay_between_polls() const @@ -1894,7 +1954,7 @@ unsigned long universal_notifier_t::usec_delay_between_polls() const return 0; } -bool universal_notifier_t::drain_notification_fd(int fd) +bool universal_notifier_t::notification_fd_became_readable(int fd) { return false; } diff --git a/env_universal_common.h b/env_universal_common.h index 2c7fef1f0..05e9fc76e 100644 --- a/env_universal_common.h +++ b/env_universal_common.h @@ -308,7 +308,7 @@ public: virtual bool poll(); /* Indicates whether this notifier requires polling. */ - virtual bool needs_polling() const; + bool needs_polling() const; /* Triggers a notification */ virtual void post_notification(); @@ -320,7 +320,7 @@ public: virtual int notification_fd(); /* The notification_fd is readable; drain it. Returns true if a notification is considered to have been posted. */ - virtual bool drain_notification_fd(int fd); + virtual bool notification_fd_became_readable(int fd); }; std::string get_machine_identifier(); diff --git a/fish_tests.cpp b/fish_tests.cpp index f6feda773..828b0c4b2 100644 --- a/fish_tests.cpp +++ b/fish_tests.cpp @@ -2252,19 +2252,17 @@ bool poll_notifier(universal_notifier_t *note) { result = note->poll(); } - else + + int fd = note->notification_fd(); + if (fd >= 0) { - int fd = note->notification_fd(); - if (fd > 0) + fd_set fds; + FD_ZERO(&fds); + FD_SET(fd, &fds); + struct timeval tv = {0, 0}; + if (select(fd + 1, &fds, NULL, NULL, &tv) > 0 && FD_ISSET(fd, &fds)) { - fd_set fds; - FD_ZERO(&fds); - FD_SET(fd, &fds); - struct timeval tv = {0, 0}; - if (select(fd + 1, &fds, NULL, NULL, &tv) > 0 && FD_ISSET(fd, &fds)) - { - result = note->drain_notification_fd(fd); - } + result = note->notification_fd_became_readable(fd); } } return result; @@ -2288,6 +2286,9 @@ static void trigger_or_wait_for_notification(universal_notifier_t *notifier, uni usleep(1000000 / 25); break; + case universal_notifier_t::strategy_named_pipe: + break; + case universal_notifier_t::strategy_inotify: { // Hacktastic. Replace the file, then wait @@ -2342,7 +2343,17 @@ static void test_notifiers_with_strategy(universal_notifier_t::notifier_strategy if (! poll_notifier(notifiers[i])) { - err(L"Universal variable notifier polled failed to notice changes, with strategy %d", (int)strategy); + err(L"Universal variable notifier (%lu) %p polled failed to notice changes, with strategy %d", i, notifiers[i], (int)strategy); + } + } + + // Named pipes have special cleanup requirements + if (strategy == universal_notifier_t::strategy_named_pipe) + { + usleep(1000000 / 10); + for (size_t i=0; i < notifier_count; i++) + { + poll_notifier(notifiers[i]); } } } @@ -2361,20 +2372,20 @@ static void test_notifiers_with_strategy(universal_notifier_t::notifier_strategy { delete notifiers[i]; } - } static void test_universal_notifiers() { + if (system("mkdir -p /tmp/fish_uvars_test/ && touch /tmp/fish_uvars_test/varsfile.txt")) err(L"mkdir failed"); test_notifiers_with_strategy(universal_notifier_t::strategy_shmem_polling); + test_notifiers_with_strategy(universal_notifier_t::strategy_named_pipe); #if __APPLE__ test_notifiers_with_strategy(universal_notifier_t::strategy_notifyd); #endif #if __linux || linux - if (system("mkdir -p /tmp/fish_uvars_test/ && touch /tmp/fish_uvars_test/varsfile.txt")) err(L"mkdir failed"); test_notifiers_with_strategy(universal_notifier_t::strategy_inotify); - if (system("rm -Rf /tmp/fish_uvars_test/")) err(L"rm failed"); #endif + if (system("rm -Rf /tmp/fish_uvars_test/")) err(L"rm failed"); } class history_tests_t diff --git a/input_common.cpp b/input_common.cpp index a81c58153..570930cbc 100644 --- a/input_common.cpp +++ b/input_common.cpp @@ -192,7 +192,7 @@ static wint_t readb() if (notifier_fd > 0 && FD_ISSET(notifier_fd, &fdset)) { - bool notified = notifier.drain_notification_fd(notifier_fd); + bool notified = notifier.notification_fd_became_readable(notifier_fd); if (notified) { env_universal_barrier();