Add FLOG logging to the topic monitor

This commit is contained in:
ridiculousfish 2019-05-29 14:20:07 -07:00
parent b20bdcebfa
commit d920a618de
3 changed files with 49 additions and 26 deletions

View File

@ -62,6 +62,8 @@ class category_list_t {
category_t proc_internal_proc{L"proc-internal-proc", L"Internal (non-forked) process events"};
category_t env_locale{L"env-locale", L"Changes to locale variables"};
category_t topic_monitor{L"topic-monitor", L"Internal details of the topic monitor"};
};
/// The class responsible for logging.

View File

@ -1,9 +1,11 @@
#include "config.h" // IWYU pragma: keep
#include "limits.h"
#include "flog.h"
#include "iothread.h"
#include "topic_monitor.h"
#include "wutil.h"
#include <limits.h>
#include <unistd.h>
// Whoof. Thread Sanitizer swallows signals and replays them at its leisure, at the point where
@ -69,13 +71,44 @@ void topic_monitor_t::post(topic_t topic) {
// Ignore EAGAIN and other errors (which conceivably could occur during shutdown).
}
generation_list_t topic_monitor_t::updated_gens() {
auto current_gens = current_gen_.acquire();
// Atomically acquire the pending updates, swapping in 0.
// If there are no pending updates (likely), just return.
// Otherwise CAS in 0 and update our topics.
const auto relaxed = std::memory_order_relaxed;
topic_set_raw_t raw;
bool cas_success;
do {
raw = pending_updates_.load(relaxed);
if (raw == 0) return *current_gens;
cas_success = pending_updates_.compare_exchange_weak(raw, 0, relaxed, relaxed);
} while (!cas_success);
// Update the current generation with our topics and return it.
auto topics = topic_set_t::from_raw(raw);
for (topic_t topic : topic_iter_t{}) {
if (topics.get(topic)) {
current_gens->at(topic) += 1;
FLOG(topic_monitor, "Updating topic", (int)topic, "to", current_gens->at(topic));
}
}
return *current_gens;
}
void topic_monitor_t::await_metagen(generation_t mgen) {
// Fast check of the metagen before taking the lock. If it's changed we're done.
if (mgen != current_metagen()) return;
generation_t current = current_metagen();
FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, ": current", current);
if (mgen != current) return;
// Take the lock (which may take a long time) and then check again.
std::unique_lock<std::mutex> locker{wait_queue_lock_};
if (mgen != current_metagen()) return;
current = current_metagen();
FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, ": current", current,
"acquired lock");
if (mgen != current) return;
// Our metagen hasn't changed. Push our metagen onto the queue, then wait until we're the
// lowest. If multiple waiters are the lowest, then anyone can be the observer.
@ -84,6 +117,8 @@ void topic_monitor_t::await_metagen(generation_t mgen) {
// (who has already seen the changes).
wait_queue_.push(mgen);
while (wait_queue_.top() != mgen) {
FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, "releasing lock for",
wait_queue_.top());
wait_queue_notifier_.wait(locker);
}
wait_queue_.pop();
@ -91,7 +126,10 @@ void topic_monitor_t::await_metagen(generation_t mgen) {
// We now have the lowest metagen in the wait queue. Notice we still hold the lock.
// Read until the metagen changes. It may already have changed.
// Note because changes are coalesced, we can read a lot, potentially draining the pipe.
while (mgen == current_metagen()) {
current = current_metagen();
FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, "considering waiting for mgen",
current);
while (mgen == current) {
int fd = pipes_.read.fd();
#if TOPIC_MONITOR_TSAN_WORKAROUND
// Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read() call
@ -104,9 +142,13 @@ void topic_monitor_t::await_metagen(generation_t mgen) {
#endif
uint8_t ignored[PIPE_BUF];
(void)read(fd, ignored, sizeof ignored);
current = current_metagen();
FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen,
"read() complete, current mgen is", current);
}
// Release the lock and wake up the remaining waiters.
FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, "awakening all waiters");
locker.unlock();
wait_queue_notifier_.notify_all();
}

View File

@ -109,28 +109,7 @@ class topic_monitor_t {
void await_metagen(generation_t gen);
/// Return the current generation list, opportunistically applying any pending updates.
generation_list_t updated_gens() {
auto current_gens = current_gen_.acquire();
// Atomically acquire the pending updates, swapping in 0.
// If there are no pending updates (likely), just return.
// Otherwise CAS in 0 and update our topics.
const auto relaxed = std::memory_order_relaxed;
topic_set_raw_t raw;
bool cas_success;
do {
raw = pending_updates_.load(relaxed);
if (raw == 0) return *current_gens;
cas_success = pending_updates_.compare_exchange_weak(raw, 0, relaxed, relaxed);
} while (!cas_success);
// Update the current generation with our topics and return it.
auto topics = topic_set_t::from_raw(raw);
for (topic_t topic : topic_iter_t{}) {
current_gens->at(topic) += topics.get(topic) ? 1 : 0;
}
return *current_gens;
}
generation_list_t updated_gens();
/// \return the metagen for the current topic generation list.
inline generation_t current_metagen() { return metagen_for(updated_gens()); }