diff --git a/src/flog.h b/src/flog.h index 3e7838d65..94c2a236c 100644 --- a/src/flog.h +++ b/src/flog.h @@ -62,6 +62,8 @@ class category_list_t { category_t proc_internal_proc{L"proc-internal-proc", L"Internal (non-forked) process events"}; category_t env_locale{L"env-locale", L"Changes to locale variables"}; + + category_t topic_monitor{L"topic-monitor", L"Internal details of the topic monitor"}; }; /// The class responsible for logging. diff --git a/src/topic_monitor.cpp b/src/topic_monitor.cpp index a948f75bc..70d05b6a7 100644 --- a/src/topic_monitor.cpp +++ b/src/topic_monitor.cpp @@ -1,9 +1,11 @@ #include "config.h" // IWYU pragma: keep -#include "limits.h" +#include "flog.h" +#include "iothread.h" #include "topic_monitor.h" #include "wutil.h" +#include #include // Whoof. Thread Sanitizer swallows signals and replays them at its leisure, at the point where @@ -69,13 +71,44 @@ void topic_monitor_t::post(topic_t topic) { // Ignore EAGAIN and other errors (which conceivably could occur during shutdown). } +generation_list_t topic_monitor_t::updated_gens() { + auto current_gens = current_gen_.acquire(); + + // Atomically acquire the pending updates, swapping in 0. + // If there are no pending updates (likely), just return. + // Otherwise CAS in 0 and update our topics. + const auto relaxed = std::memory_order_relaxed; + topic_set_raw_t raw; + bool cas_success; + do { + raw = pending_updates_.load(relaxed); + if (raw == 0) return *current_gens; + cas_success = pending_updates_.compare_exchange_weak(raw, 0, relaxed, relaxed); + } while (!cas_success); + + // Update the current generation with our topics and return it. + auto topics = topic_set_t::from_raw(raw); + for (topic_t topic : topic_iter_t{}) { + if (topics.get(topic)) { + current_gens->at(topic) += 1; + FLOG(topic_monitor, "Updating topic", (int)topic, "to", current_gens->at(topic)); + } + } + return *current_gens; +} + void topic_monitor_t::await_metagen(generation_t mgen) { // Fast check of the metagen before taking the lock. If it's changed we're done. - if (mgen != current_metagen()) return; + generation_t current = current_metagen(); + FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, ": current", current); + if (mgen != current) return; // Take the lock (which may take a long time) and then check again. std::unique_lock locker{wait_queue_lock_}; - if (mgen != current_metagen()) return; + current = current_metagen(); + FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, ": current", current, + "acquired lock"); + if (mgen != current) return; // Our metagen hasn't changed. Push our metagen onto the queue, then wait until we're the // lowest. If multiple waiters are the lowest, then anyone can be the observer. @@ -84,6 +117,8 @@ void topic_monitor_t::await_metagen(generation_t mgen) { // (who has already seen the changes). wait_queue_.push(mgen); while (wait_queue_.top() != mgen) { + FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, "releasing lock for", + wait_queue_.top()); wait_queue_notifier_.wait(locker); } wait_queue_.pop(); @@ -91,7 +126,10 @@ void topic_monitor_t::await_metagen(generation_t mgen) { // We now have the lowest metagen in the wait queue. Notice we still hold the lock. // Read until the metagen changes. It may already have changed. // Note because changes are coalesced, we can read a lot, potentially draining the pipe. - while (mgen == current_metagen()) { + current = current_metagen(); + FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, "considering waiting for mgen", + current); + while (mgen == current) { int fd = pipes_.read.fd(); #if TOPIC_MONITOR_TSAN_WORKAROUND // Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read() call @@ -104,9 +142,13 @@ void topic_monitor_t::await_metagen(generation_t mgen) { #endif uint8_t ignored[PIPE_BUF]; (void)read(fd, ignored, sizeof ignored); + current = current_metagen(); + FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, + "read() complete, current mgen is", current); } // Release the lock and wake up the remaining waiters. + FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, "awakening all waiters"); locker.unlock(); wait_queue_notifier_.notify_all(); } diff --git a/src/topic_monitor.h b/src/topic_monitor.h index 78498c4cc..574e72c21 100644 --- a/src/topic_monitor.h +++ b/src/topic_monitor.h @@ -109,28 +109,7 @@ class topic_monitor_t { void await_metagen(generation_t gen); /// Return the current generation list, opportunistically applying any pending updates. - generation_list_t updated_gens() { - auto current_gens = current_gen_.acquire(); - - // Atomically acquire the pending updates, swapping in 0. - // If there are no pending updates (likely), just return. - // Otherwise CAS in 0 and update our topics. - const auto relaxed = std::memory_order_relaxed; - topic_set_raw_t raw; - bool cas_success; - do { - raw = pending_updates_.load(relaxed); - if (raw == 0) return *current_gens; - cas_success = pending_updates_.compare_exchange_weak(raw, 0, relaxed, relaxed); - } while (!cas_success); - - // Update the current generation with our topics and return it. - auto topics = topic_set_t::from_raw(raw); - for (topic_t topic : topic_iter_t{}) { - current_gens->at(topic) += topics.get(topic) ? 1 : 0; - } - return *current_gens; - } + generation_list_t updated_gens(); /// \return the metagen for the current topic generation list. inline generation_t current_metagen() { return metagen_for(updated_gens()); }