diff --git a/CMakeLists.txt b/CMakeLists.txt index 32dee2061..eb7949f7e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -80,7 +80,7 @@ SET(FISH_SRCS src/postfork.cpp src/proc.cpp src/reader.cpp src/sanity.cpp src/screen.cpp src/signal.cpp src/tinyexpr.cpp src/tnode.cpp src/tokenizer.cpp src/utf8.cpp src/util.cpp src/wcstringutil.cpp src/wgetopt.cpp src/wildcard.cpp src/wutil.cpp - src/future_feature_flags.cpp src/redirection.cpp + src/future_feature_flags.cpp src/redirection.cpp src/topic_monitor.cpp ) # Header files are just globbed. diff --git a/Makefile.in b/Makefile.in index eba0b41be..beb3b9253 100644 --- a/Makefile.in +++ b/Makefile.in @@ -119,7 +119,7 @@ FISH_OBJS := obj/autoload.o obj/builtin.o obj/builtin_bg.o obj/builtin_bind.o ob obj/parser_keywords.o obj/path.o obj/postfork.o obj/proc.o obj/reader.o \ obj/sanity.o obj/screen.o obj/signal.o obj/tinyexpr.o obj/tokenizer.o obj/tnode.o obj/utf8.o \ obj/util.o obj/wcstringutil.o obj/wgetopt.o obj/wildcard.o obj/wutil.o \ - obj/future_feature_flags.o obj/redirection.o + obj/future_feature_flags.o obj/redirection.o obj/topic_monitor.o FISH_INDENT_OBJS := obj/fish_indent.o obj/print_help.o $(FISH_OBJS) diff --git a/src/enum_set.h b/src/enum_set.h index ea6e90b40..22eb8c66d 100644 --- a/src/enum_set.h +++ b/src/enum_set.h @@ -29,7 +29,7 @@ class enum_set_t : private std::bitset()> { public: enum_set_t() = default; - explicit enum_set_t(T v) { set(v); } + /*implicit*/ enum_set_t(T v) { set(v); } static enum_set_t from_raw(unsigned long v) { return enum_set_t{v}; } diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index 7b02115c0..dc2522784 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -69,6 +70,7 @@ #include "signal.h" #include "tnode.h" #include "tokenizer.h" +#include "topic_monitor.h" #include "utf8.h" #include "util.h" #include "wcstringutil.h" @@ -5073,6 +5075,66 @@ void test_normalize_path() { do_test(path_normalize_for_cd(L"/abc/def/", L"../ghi/..") == L"/abc/ghi/.."); } +static void test_topic_monitor() { + say(L"Testing topic monitor"); + topic_monitor_t monitor; + generation_list_t gens{}; + constexpr auto t = topic_t::sigchld; + do_test(gens[t] == 0); + do_test(monitor.generation_for_topic(t) == 0); + auto changed = monitor.check(&gens, {t}, false /* wait */); + do_test(changed.none()); + do_test(gens[t] == 0); + + monitor.post(t); + changed = monitor.check(&gens, topic_set_t{t}, true /* wait */); + do_test(changed == topic_set_t{t}); + do_test(gens[t] == 1); + do_test(monitor.generation_for_topic(t) == 1); + + monitor.post(t); + do_test(monitor.generation_for_topic(t) == 2); + changed = monitor.check(&gens, topic_set_t{t}, true /* wait */); + do_test(changed == topic_set_t{t}); +} + +static void test_topic_monitor_torture() { + say(L"Torture-testing topic monitor"); + topic_monitor_t monitor; + const size_t thread_count = 64; + constexpr auto t = topic_t::sigchld; + std::vector gens; + gens.resize(thread_count, generation_list_t{}); + std::atomic post_count{}; + for (auto &gen : gens) { + gen = monitor.current_generations(); + post_count += 1; + monitor.post(t); + } + + std::atomic completed{}; + std::vector threads; + for (size_t i = 0; i < thread_count; i++) { + threads.emplace_back( + [&](size_t i) { + for (size_t j = 0; j < (1 << 11); j++) { + auto before = gens[i]; + auto changed = monitor.check(&gens[i], topic_set_t{t}, true /* wait */); + do_test(before[t] < gens[i][t]); + do_test(gens[i][t] <= post_count); + } + auto amt = completed.fetch_add(1, std::memory_order_relaxed); + }, + i); + } + + while (completed.load(std::memory_order_relaxed) < thread_count) { + post_count += 1; + monitor.post(t); + } + for (auto &t : threads) t.join(); +} + /// Main test. int main(int argc, char **argv) { UNUSED(argc); @@ -5192,6 +5254,8 @@ int main(int argc, char **argv) { if (should_test_function("maybe")) test_maybe(); if (should_test_function("layout_cache")) test_layout_cache(); if (should_test_function("normalize")) test_normalize_path(); + if (should_test_function("topics")) test_topic_monitor(); + if (should_test_function("topics")) test_topic_monitor_torture(); // history_tests_t::test_history_speed(); say(L"Encountered %d errors in low-level tests", err_count); diff --git a/src/iothread.h b/src/iothread.h index 5e677b639..39d99e539 100644 --- a/src/iothread.h +++ b/src/iothread.h @@ -2,6 +2,7 @@ #ifndef FISH_IOTHREAD_H #define FISH_IOTHREAD_H +#include #include #include diff --git a/src/signal.cpp b/src/signal.cpp index f1ab10581..8ca202330 100644 --- a/src/signal.cpp +++ b/src/signal.cpp @@ -15,6 +15,7 @@ #include "parser.h" #include "proc.h" #include "reader.h" +#include "topic_monitor.h" #include "wutil.h" // IWYU pragma: keep /// Struct describing an entry for the lookup table used to convert between signal names and signal @@ -262,6 +263,7 @@ static void handle_chld(int sig, siginfo_t *info, void *context) { if (reraise_if_forked_child(sig)) return; job_handle_signal(sig, info, context); default_handler(sig, info, context); + topic_monitor_t::principal().post(topic_t::sigchld); } // We have a sigalarm handler that does nothing. This is used in the signal torture test, to verify diff --git a/src/topic_monitor.cpp b/src/topic_monitor.cpp new file mode 100644 index 000000000..2c28a7517 --- /dev/null +++ b/src/topic_monitor.cpp @@ -0,0 +1,114 @@ +#include "config.h" // IWYU pragma: keep + +#include "limits.h" +#include "topic_monitor.h" +#include "wutil.h" + +#include + +/// Implementation of the principal monitor. This uses new (and leaks) to avoid registering a +/// pointless at-exit handler for the dtor. +static topic_monitor_t *const s_principal = new topic_monitor_t(); + +topic_monitor_t &topic_monitor_t::principal() { + // Do not attempt to move s_principal to a function-level static, it needs to be accessed from a + // signal handler so it must not be lazily created. + return *s_principal; +} + +topic_monitor_t::topic_monitor_t() { + // Set up our pipes. Assert it succeeds. + auto pipes = make_autoclose_pipes({}); + assert(pipes.has_value() && "Failed to make pubsub pipes"); + pipes_ = pipes.acquire(); + + // Make sure that our write side doesn't block, else we risk hanging in a signal handler. + // The read end must block to avoid spinning in await. + DIE_ON_FAILURE(make_fd_nonblocking(pipes_.write.fd())); +} + +topic_monitor_t::~topic_monitor_t() = default; + +void topic_monitor_t::post(topic_t topic) { + // Beware, we may be in a signal handler! + // Atomically update the pending topics. + auto rawtopics = topic_set_t{topic}.to_raw(); + auto oldtopics = pending_updates_.fetch_or(rawtopics, std::memory_order_relaxed); + if ((oldtopics & rawtopics) == rawtopics) { + // No new bits were set. + return; + } + + // Ok, we changed one or more bits. Ensure the topic change is visible, and announce the change + // by writing a byte to the pipe. + std::atomic_thread_fence(std::memory_order_release); + ssize_t ret; + do { + // write() is async signal safe. + const uint8_t v = 0; + ret = write(pipes_.write.fd(), &v, sizeof v); + } while (ret < 0 && errno == EINTR); + // Ignore EAGAIN and other errors (which conceivably could occur during shutdown). +} + +void topic_monitor_t::await_metagen(generation_t mgen) { + // Fast check of the metagen before taking the lock. If it's changed we're done. + if (mgen != current_metagen()) return; + + // Take the lock (which may take a long time) and then check again. + std::unique_lock locker{wait_queue_lock_}; + if (mgen != current_metagen()) return; + + // Our metagen hasn't changed. Push our metagen onto the queue, then wait until we're the + // lowest. If multiple waiters are the lowest, then anyone can be the observer. + // Note the reason for picking the lowest metagen is to avoid a priority inversion where a lower + // metagen (therefore someone who should see changes) is blocked waiting for a higher metagen + // (who has already seen the changes). + wait_queue_.push(mgen); + while (wait_queue_.top() != mgen) { + wait_queue_notifier_.wait(locker); + } + wait_queue_.pop(); + + // We now have the lowest metagen in the wait queue. Notice we still hold the lock. + // Read until the metagen changes. It may already have changed. + // Note because changes are coalesced, we can read a lot, potentially draining the pipe. + while (mgen == current_metagen()) { + uint8_t ignored[PIPE_BUF]; + (void)read(pipes_.read.fd(), ignored, sizeof ignored); + } + + // Release the lock and wake up the remaining waiters. + locker.unlock(); + wait_queue_notifier_.notify_all(); +} + +topic_set_t topic_monitor_t::check(generation_list_t *gens, topic_set_t topics, bool wait) { + if (topics.none()) return topics; + + topic_set_t changed{}; + for (;;) { + // Load the topic list and see if anything has changed. + generation_list_t current = updated_gens(); + for (topic_t topic : topic_iter_t{}) { + if (topics.get(topic)) { + assert(gens->at(topic) <= current.at(topic) && + "Incoming gen count exceeded published count"); + if (gens->at(topic) < current.at(topic)) { + gens->at(topic) = current.at(topic); + changed.set(topic); + } + } + } + + // If we're not waiting, or something changed, then we're done. + if (!wait || changed.any()) { + break; + } + + // Try again. Note that we use the metagen corresponding to the topic list we just + // inspected, not the current one (which may have updates since we checked). + await_metagen(metagen_for(current)); + } + return changed; +} diff --git a/src/topic_monitor.h b/src/topic_monitor.h new file mode 100644 index 000000000..fb61f06e5 --- /dev/null +++ b/src/topic_monitor.h @@ -0,0 +1,165 @@ +#ifndef FISH_TOPIC_MONITOR_H +#define FISH_TOPIC_MONITOR_H + +#include "common.h" +#include "enum_set.h" +#include "io.h" + +#include +#include +#include +#include +#include +#include +#include + +/** Topic monitoring support. Topics are conceptually "a thing that can happen." For example, + delivery of a SIGINT, a child process exits, etc. It is possible to post to a topic, which means + that that thing happened. + + Associated with each topic is a current generation, which is a 64 bit value. When you query a + topic, you get back a generation. If on the next query the generation has increased, then it + indicates someone posted to the topic. + + For example, if you are monitoring a child process, you can query the sigchld topic. If it has + increased since your last query, it is possible that your child process has exited. + + Topic postings may be coalesced. That is there may be two posts to a given topic, yet the + generation only increases by 1. The only guarantee is that after a topic post, the current + generation value is larger than any value previously queried. + + Tying this all together is the topic_monitor_t. This provides the current topic generations, and + also provides the ability to perform a blocking wait for any topic to change in a particular topic + set. This is the real power of topics: you can wait for a sigchld signal OR a thread exit. + */ + +/// The list of topics that may be observed. +enum class topic_t : uint8_t { + sigchld, // Corresponds to SIGCHLD signal. + COUNT +}; + +/// Allow enum_iter to be used. +template <> +struct enum_info_t { + static constexpr auto count = topic_t::COUNT; +}; + +/// Set of topics. +using topic_set_t = enum_set_t; + +/// Counting iterator for topics. +using topic_iter_t = enum_iter_t; + +/// A generation is a counter incremented every time the value of a topic changes. +/// It is 64 bit so it will never wrap. +using generation_t = uint64_t; + +/// A generation value which is guaranteed to never be set and be larger than any valid generation. +constexpr generation_t invalid_generation = std::numeric_limits::max(); + +/// List of generation values, indexed by topics. +/// The initial value of a generation is always 0. +using generation_list_t = enum_array_t; + +/// Teh topic monitor class. This permits querying the current generation values for topics, +/// optionally blocking until they increase. +class topic_monitor_t { + private: + using topic_set_raw_t = uint8_t; + + static_assert(sizeof(topic_set_raw_t) * CHAR_BIT >= enum_count(), + "topic_set_raw is too small"); + + /// The current topic generation list, protected by a mutex. Note this may be opportunistically + /// updated at the point it is queried. + owning_lock current_gen_{{}}; + + /// The set of topics which have pending increments. + /// This is managed via atomics. + std::atomic pending_updates_{}; + + /// When a topic set is queried in a blocking way, the waiters are put into a queue. The waiter + /// with the smallest metagen is responsible for announcing the change to the rest of the + /// waiters. (The metagen is just the sum of the current generations.) Note that this is a + /// max-heap that defaults to std::less; by using std::greater it becomes a min heap. This is + /// protected by wait_queue_lock_. + std::priority_queue, std::greater> + wait_queue_; + + /// Mutex guarding the wait queue. + std::mutex wait_queue_lock_{}; + + /// Condition variable for broadcasting notifications. + std::condition_variable wait_queue_notifier_{}; + + /// Pipes used to communicate changes from the signal handler. + autoclose_pipes_t pipes_; + + /// \return the metagen for a topic generation list. + /// The metagen is simply the sum of topic generations. Note it is monotone. + static inline generation_t metagen_for(const generation_list_t &lst) { + return std::accumulate(lst.begin(), lst.end(), generation_t{0}); + } + + /// Wait for the current metagen to become different from \p gen. + /// If it is already different, return immediately. + void await_metagen(generation_t gen); + + /// Return the current generation list, opportunistically applying any pending updates. + generation_list_t updated_gens() { + auto current_gens = current_gen_.acquire(); + + // Atomically acquire the pending updates, swapping in 0. + // If there are no pending updates (likely), just return. + // Otherwise CAS in 0 and update our topics. + const auto relaxed = std::memory_order_relaxed; + topic_set_raw_t raw; + bool cas_success; + do { + raw = pending_updates_.load(relaxed); + if (raw == 0) return *current_gens; + cas_success = pending_updates_.compare_exchange_weak(raw, 0, relaxed, relaxed); + } while (!cas_success); + + // Update the current generation with our topics and return it. + auto topics = topic_set_t::from_raw(raw); + for (topic_t topic : topic_iter_t{}) { + current_gens->at(topic) += topics.get(topic) ? 1 : 0; + } + return *current_gens; + } + + /// \return the metagen for the current topic generation list. + inline generation_t current_metagen() { return metagen_for(updated_gens()); } + + public: + topic_monitor_t(); + ~topic_monitor_t(); + + /// topic_monitors should not be copied, and there should be no reason to move one. + void operator=(const topic_monitor_t &) = delete; + topic_monitor_t(const topic_monitor_t &) = delete; + void operator=(topic_monitor_t &&) = delete; + topic_monitor_t(topic_monitor_t &&) = delete; + + /// The principal topic_monitor. This may be fetched from a signal handler. + static topic_monitor_t &principal(); + + /// Post to a topic, potentially from a signal handler. + void post(topic_t topic); + + /// Access the current generations. + generation_list_t current_generations() { return updated_gens(); } + + /// Access the generation for a topic. + generation_t generation_for_topic(topic_t topic) { return current_generations().at(topic); } + + /// See if for any topic (specified in \p topics) has changed from the values in the generation + /// list \p gens. If \p wait is set, then wait if there are no changes; otherwise return + /// immediately. + /// \return the set of topics that changed, updating the generation list \p gens. + topic_set_t check(generation_list_t *gens, topic_set_t topics, bool wait); +}; + +#endif