Introduce topic monitoring

topic_monitor allows for querying changes posted to one or more topics,
initially sigchld. This will eventually replace the waitpid logic in
process_mark_finished_children().

Comment from the new header:

Topic monitoring support. Topics are conceptually "a thing that can
happen." For example, delivery of a SIGINT, a child process exits, etc. It
is possible to post to a topic, which means that that thing happened.

Associated with each topic is a current generation, which is a 64 bit
value. When you query a topic, you get back a generation. If on the next
query the generation has increased, then it indicates someone posted to
the topic.

For example, if you are monitoring a child process, you can query the
sigchld topic. If it has increased since your last query, it is possible
that your child process has exited.

Topic postings may be coalesced. That is there may be two posts to a given
topic, yet the generation only increases by 1. The only guarantee is that
after a topic post, the current generation value is larger than any value
previously queried.

Tying this all together is the topic_monitor_t. This provides the current
topic generations, and also provides the ability to perform a blocking
wait for any topic to change in a particular topic set. This is the real
power of topics: you can wait for a sigchld signal OR a thread exit.
This commit is contained in:
ridiculousfish 2019-02-02 15:39:04 -08:00
parent ccc45235b0
commit fc9d238642
8 changed files with 349 additions and 3 deletions

View File

@ -80,7 +80,7 @@ SET(FISH_SRCS
src/postfork.cpp src/proc.cpp src/reader.cpp src/sanity.cpp src/screen.cpp
src/signal.cpp src/tinyexpr.cpp src/tnode.cpp src/tokenizer.cpp src/utf8.cpp src/util.cpp
src/wcstringutil.cpp src/wgetopt.cpp src/wildcard.cpp src/wutil.cpp
src/future_feature_flags.cpp src/redirection.cpp
src/future_feature_flags.cpp src/redirection.cpp src/topic_monitor.cpp
)
# Header files are just globbed.

View File

@ -119,7 +119,7 @@ FISH_OBJS := obj/autoload.o obj/builtin.o obj/builtin_bg.o obj/builtin_bind.o ob
obj/parser_keywords.o obj/path.o obj/postfork.o obj/proc.o obj/reader.o \
obj/sanity.o obj/screen.o obj/signal.o obj/tinyexpr.o obj/tokenizer.o obj/tnode.o obj/utf8.o \
obj/util.o obj/wcstringutil.o obj/wgetopt.o obj/wildcard.o obj/wutil.o \
obj/future_feature_flags.o obj/redirection.o
obj/future_feature_flags.o obj/redirection.o obj/topic_monitor.o
FISH_INDENT_OBJS := obj/fish_indent.o obj/print_help.o $(FISH_OBJS)

View File

@ -29,7 +29,7 @@ class enum_set_t : private std::bitset<enum_count<T>()> {
public:
enum_set_t() = default;
explicit enum_set_t(T v) { set(v); }
/*implicit*/ enum_set_t(T v) { set(v); }
static enum_set_t from_raw(unsigned long v) { return enum_set_t{v}; }

View File

@ -25,6 +25,7 @@
#include <unistd.h>
#include <wchar.h>
#include <wctype.h>
#include <thread>
#include <algorithm>
#include <array>
@ -69,6 +70,7 @@
#include "signal.h"
#include "tnode.h"
#include "tokenizer.h"
#include "topic_monitor.h"
#include "utf8.h"
#include "util.h"
#include "wcstringutil.h"
@ -5073,6 +5075,66 @@ void test_normalize_path() {
do_test(path_normalize_for_cd(L"/abc/def/", L"../ghi/..") == L"/abc/ghi/..");
}
static void test_topic_monitor() {
say(L"Testing topic monitor");
topic_monitor_t monitor;
generation_list_t gens{};
constexpr auto t = topic_t::sigchld;
do_test(gens[t] == 0);
do_test(monitor.generation_for_topic(t) == 0);
auto changed = monitor.check(&gens, {t}, false /* wait */);
do_test(changed.none());
do_test(gens[t] == 0);
monitor.post(t);
changed = monitor.check(&gens, topic_set_t{t}, true /* wait */);
do_test(changed == topic_set_t{t});
do_test(gens[t] == 1);
do_test(monitor.generation_for_topic(t) == 1);
monitor.post(t);
do_test(monitor.generation_for_topic(t) == 2);
changed = monitor.check(&gens, topic_set_t{t}, true /* wait */);
do_test(changed == topic_set_t{t});
}
static void test_topic_monitor_torture() {
say(L"Torture-testing topic monitor");
topic_monitor_t monitor;
const size_t thread_count = 64;
constexpr auto t = topic_t::sigchld;
std::vector<generation_list_t> gens;
gens.resize(thread_count, generation_list_t{});
std::atomic<uint32_t> post_count{};
for (auto &gen : gens) {
gen = monitor.current_generations();
post_count += 1;
monitor.post(t);
}
std::atomic<uint32_t> completed{};
std::vector<std::thread> threads;
for (size_t i = 0; i < thread_count; i++) {
threads.emplace_back(
[&](size_t i) {
for (size_t j = 0; j < (1 << 11); j++) {
auto before = gens[i];
auto changed = monitor.check(&gens[i], topic_set_t{t}, true /* wait */);
do_test(before[t] < gens[i][t]);
do_test(gens[i][t] <= post_count);
}
auto amt = completed.fetch_add(1, std::memory_order_relaxed);
},
i);
}
while (completed.load(std::memory_order_relaxed) < thread_count) {
post_count += 1;
monitor.post(t);
}
for (auto &t : threads) t.join();
}
/// Main test.
int main(int argc, char **argv) {
UNUSED(argc);
@ -5192,6 +5254,8 @@ int main(int argc, char **argv) {
if (should_test_function("maybe")) test_maybe();
if (should_test_function("layout_cache")) test_layout_cache();
if (should_test_function("normalize")) test_normalize_path();
if (should_test_function("topics")) test_topic_monitor();
if (should_test_function("topics")) test_topic_monitor_torture();
// history_tests_t::test_history_speed();
say(L"Encountered %d errors in low-level tests", err_count);

View File

@ -2,6 +2,7 @@
#ifndef FISH_IOTHREAD_H
#define FISH_IOTHREAD_H
#include <pthread.h>
#include <functional>
#include <type_traits>

View File

@ -15,6 +15,7 @@
#include "parser.h"
#include "proc.h"
#include "reader.h"
#include "topic_monitor.h"
#include "wutil.h" // IWYU pragma: keep
/// Struct describing an entry for the lookup table used to convert between signal names and signal
@ -262,6 +263,7 @@ static void handle_chld(int sig, siginfo_t *info, void *context) {
if (reraise_if_forked_child(sig)) return;
job_handle_signal(sig, info, context);
default_handler(sig, info, context);
topic_monitor_t::principal().post(topic_t::sigchld);
}
// We have a sigalarm handler that does nothing. This is used in the signal torture test, to verify

114
src/topic_monitor.cpp Normal file
View File

@ -0,0 +1,114 @@
#include "config.h" // IWYU pragma: keep
#include "limits.h"
#include "topic_monitor.h"
#include "wutil.h"
#include <unistd.h>
/// Implementation of the principal monitor. This uses new (and leaks) to avoid registering a
/// pointless at-exit handler for the dtor.
static topic_monitor_t *const s_principal = new topic_monitor_t();
topic_monitor_t &topic_monitor_t::principal() {
// Do not attempt to move s_principal to a function-level static, it needs to be accessed from a
// signal handler so it must not be lazily created.
return *s_principal;
}
topic_monitor_t::topic_monitor_t() {
// Set up our pipes. Assert it succeeds.
auto pipes = make_autoclose_pipes({});
assert(pipes.has_value() && "Failed to make pubsub pipes");
pipes_ = pipes.acquire();
// Make sure that our write side doesn't block, else we risk hanging in a signal handler.
// The read end must block to avoid spinning in await.
DIE_ON_FAILURE(make_fd_nonblocking(pipes_.write.fd()));
}
topic_monitor_t::~topic_monitor_t() = default;
void topic_monitor_t::post(topic_t topic) {
// Beware, we may be in a signal handler!
// Atomically update the pending topics.
auto rawtopics = topic_set_t{topic}.to_raw();
auto oldtopics = pending_updates_.fetch_or(rawtopics, std::memory_order_relaxed);
if ((oldtopics & rawtopics) == rawtopics) {
// No new bits were set.
return;
}
// Ok, we changed one or more bits. Ensure the topic change is visible, and announce the change
// by writing a byte to the pipe.
std::atomic_thread_fence(std::memory_order_release);
ssize_t ret;
do {
// write() is async signal safe.
const uint8_t v = 0;
ret = write(pipes_.write.fd(), &v, sizeof v);
} while (ret < 0 && errno == EINTR);
// Ignore EAGAIN and other errors (which conceivably could occur during shutdown).
}
void topic_monitor_t::await_metagen(generation_t mgen) {
// Fast check of the metagen before taking the lock. If it's changed we're done.
if (mgen != current_metagen()) return;
// Take the lock (which may take a long time) and then check again.
std::unique_lock<std::mutex> locker{wait_queue_lock_};
if (mgen != current_metagen()) return;
// Our metagen hasn't changed. Push our metagen onto the queue, then wait until we're the
// lowest. If multiple waiters are the lowest, then anyone can be the observer.
// Note the reason for picking the lowest metagen is to avoid a priority inversion where a lower
// metagen (therefore someone who should see changes) is blocked waiting for a higher metagen
// (who has already seen the changes).
wait_queue_.push(mgen);
while (wait_queue_.top() != mgen) {
wait_queue_notifier_.wait(locker);
}
wait_queue_.pop();
// We now have the lowest metagen in the wait queue. Notice we still hold the lock.
// Read until the metagen changes. It may already have changed.
// Note because changes are coalesced, we can read a lot, potentially draining the pipe.
while (mgen == current_metagen()) {
uint8_t ignored[PIPE_BUF];
(void)read(pipes_.read.fd(), ignored, sizeof ignored);
}
// Release the lock and wake up the remaining waiters.
locker.unlock();
wait_queue_notifier_.notify_all();
}
topic_set_t topic_monitor_t::check(generation_list_t *gens, topic_set_t topics, bool wait) {
if (topics.none()) return topics;
topic_set_t changed{};
for (;;) {
// Load the topic list and see if anything has changed.
generation_list_t current = updated_gens();
for (topic_t topic : topic_iter_t{}) {
if (topics.get(topic)) {
assert(gens->at(topic) <= current.at(topic) &&
"Incoming gen count exceeded published count");
if (gens->at(topic) < current.at(topic)) {
gens->at(topic) = current.at(topic);
changed.set(topic);
}
}
}
// If we're not waiting, or something changed, then we're done.
if (!wait || changed.any()) {
break;
}
// Try again. Note that we use the metagen corresponding to the topic list we just
// inspected, not the current one (which may have updates since we checked).
await_metagen(metagen_for(current));
}
return changed;
}

165
src/topic_monitor.h Normal file
View File

@ -0,0 +1,165 @@
#ifndef FISH_TOPIC_MONITOR_H
#define FISH_TOPIC_MONITOR_H
#include "common.h"
#include "enum_set.h"
#include "io.h"
#include <array>
#include <atomic>
#include <bitset>
#include <condition_variable>
#include <limits>
#include <numeric>
#include <queue>
/** Topic monitoring support. Topics are conceptually "a thing that can happen." For example,
delivery of a SIGINT, a child process exits, etc. It is possible to post to a topic, which means
that that thing happened.
Associated with each topic is a current generation, which is a 64 bit value. When you query a
topic, you get back a generation. If on the next query the generation has increased, then it
indicates someone posted to the topic.
For example, if you are monitoring a child process, you can query the sigchld topic. If it has
increased since your last query, it is possible that your child process has exited.
Topic postings may be coalesced. That is there may be two posts to a given topic, yet the
generation only increases by 1. The only guarantee is that after a topic post, the current
generation value is larger than any value previously queried.
Tying this all together is the topic_monitor_t. This provides the current topic generations, and
also provides the ability to perform a blocking wait for any topic to change in a particular topic
set. This is the real power of topics: you can wait for a sigchld signal OR a thread exit.
*/
/// The list of topics that may be observed.
enum class topic_t : uint8_t {
sigchld, // Corresponds to SIGCHLD signal.
COUNT
};
/// Allow enum_iter to be used.
template <>
struct enum_info_t<topic_t> {
static constexpr auto count = topic_t::COUNT;
};
/// Set of topics.
using topic_set_t = enum_set_t<topic_t>;
/// Counting iterator for topics.
using topic_iter_t = enum_iter_t<topic_t>;
/// A generation is a counter incremented every time the value of a topic changes.
/// It is 64 bit so it will never wrap.
using generation_t = uint64_t;
/// A generation value which is guaranteed to never be set and be larger than any valid generation.
constexpr generation_t invalid_generation = std::numeric_limits<generation_t>::max();
/// List of generation values, indexed by topics.
/// The initial value of a generation is always 0.
using generation_list_t = enum_array_t<generation_t, topic_t>;
/// Teh topic monitor class. This permits querying the current generation values for topics,
/// optionally blocking until they increase.
class topic_monitor_t {
private:
using topic_set_raw_t = uint8_t;
static_assert(sizeof(topic_set_raw_t) * CHAR_BIT >= enum_count<topic_t>(),
"topic_set_raw is too small");
/// The current topic generation list, protected by a mutex. Note this may be opportunistically
/// updated at the point it is queried.
owning_lock<generation_list_t> current_gen_{{}};
/// The set of topics which have pending increments.
/// This is managed via atomics.
std::atomic<topic_set_raw_t> pending_updates_{};
/// When a topic set is queried in a blocking way, the waiters are put into a queue. The waiter
/// with the smallest metagen is responsible for announcing the change to the rest of the
/// waiters. (The metagen is just the sum of the current generations.) Note that this is a
/// max-heap that defaults to std::less; by using std::greater it becomes a min heap. This is
/// protected by wait_queue_lock_.
std::priority_queue<generation_t, std::vector<generation_t>, std::greater<generation_t>>
wait_queue_;
/// Mutex guarding the wait queue.
std::mutex wait_queue_lock_{};
/// Condition variable for broadcasting notifications.
std::condition_variable wait_queue_notifier_{};
/// Pipes used to communicate changes from the signal handler.
autoclose_pipes_t pipes_;
/// \return the metagen for a topic generation list.
/// The metagen is simply the sum of topic generations. Note it is monotone.
static inline generation_t metagen_for(const generation_list_t &lst) {
return std::accumulate(lst.begin(), lst.end(), generation_t{0});
}
/// Wait for the current metagen to become different from \p gen.
/// If it is already different, return immediately.
void await_metagen(generation_t gen);
/// Return the current generation list, opportunistically applying any pending updates.
generation_list_t updated_gens() {
auto current_gens = current_gen_.acquire();
// Atomically acquire the pending updates, swapping in 0.
// If there are no pending updates (likely), just return.
// Otherwise CAS in 0 and update our topics.
const auto relaxed = std::memory_order_relaxed;
topic_set_raw_t raw;
bool cas_success;
do {
raw = pending_updates_.load(relaxed);
if (raw == 0) return *current_gens;
cas_success = pending_updates_.compare_exchange_weak(raw, 0, relaxed, relaxed);
} while (!cas_success);
// Update the current generation with our topics and return it.
auto topics = topic_set_t::from_raw(raw);
for (topic_t topic : topic_iter_t{}) {
current_gens->at(topic) += topics.get(topic) ? 1 : 0;
}
return *current_gens;
}
/// \return the metagen for the current topic generation list.
inline generation_t current_metagen() { return metagen_for(updated_gens()); }
public:
topic_monitor_t();
~topic_monitor_t();
/// topic_monitors should not be copied, and there should be no reason to move one.
void operator=(const topic_monitor_t &) = delete;
topic_monitor_t(const topic_monitor_t &) = delete;
void operator=(topic_monitor_t &&) = delete;
topic_monitor_t(topic_monitor_t &&) = delete;
/// The principal topic_monitor. This may be fetched from a signal handler.
static topic_monitor_t &principal();
/// Post to a topic, potentially from a signal handler.
void post(topic_t topic);
/// Access the current generations.
generation_list_t current_generations() { return updated_gens(); }
/// Access the generation for a topic.
generation_t generation_for_topic(topic_t topic) { return current_generations().at(topic); }
/// See if for any topic (specified in \p topics) has changed from the values in the generation
/// list \p gens. If \p wait is set, then wait if there are no changes; otherwise return
/// immediately.
/// \return the set of topics that changed, updating the generation list \p gens.
topic_set_t check(generation_list_t *gens, topic_set_t topics, bool wait);
};
#endif