mirror of
https://github.com/fish-shell/fish-shell.git
synced 2025-02-21 01:04:10 +08:00
Correct a race in topic monitor
This fixes a race condition in the topic monitor. A thread may decide to enter the wait queue, but before it does the generation list changes, and so our thread will wait forever, resulting in a hang. It also simplifies the implementation of the topic monitor considerably; on reflection the whole "metagen" thing isn't providing any value and we should just compare generations directly. In the new design, we have a lock-protected list of current generations, along with a boolean as to whether someone is reading from the pipe. The reader (only one at a time) is responsible for broadcasting notifications via a condition variable.
This commit is contained in:
parent
d920a618de
commit
4e03d3c264
@ -635,6 +635,9 @@ class acquired_lock {
|
||||
/// Create from a global lock.
|
||||
/// This is used in weird cases where a global lock protects more than one piece of data.
|
||||
static acquired_lock from_global(std::mutex &lk, Data *v) { return acquired_lock{lk, v}; }
|
||||
|
||||
/// \return a reference to the lock, for use with a condition variable.
|
||||
std::unique_lock<std::mutex> &get_lock() { return lock; }
|
||||
};
|
||||
|
||||
// A lock that owns a piece of data
|
||||
|
@ -26,6 +26,12 @@
|
||||
/// pointless at-exit handler for the dtor.
|
||||
static topic_monitor_t *const s_principal = new topic_monitor_t();
|
||||
|
||||
/// \return the metagen for a topic generation list.
|
||||
/// The metagen is simply the sum of topic generations. Note it is monotone.
|
||||
static generation_t metagen_for(const generation_list_t &lst) {
|
||||
return std::accumulate(lst.begin(), lst.end(), generation_t{0});
|
||||
}
|
||||
|
||||
topic_monitor_t &topic_monitor_t::principal() {
|
||||
// Do not attempt to move s_principal to a function-level static, it needs to be accessed from a
|
||||
// signal handler so it must not be lazily created.
|
||||
@ -71,9 +77,7 @@ void topic_monitor_t::post(topic_t topic) {
|
||||
// Ignore EAGAIN and other errors (which conceivably could occur during shutdown).
|
||||
}
|
||||
|
||||
generation_list_t topic_monitor_t::updated_gens() {
|
||||
auto current_gens = current_gen_.acquire();
|
||||
|
||||
generation_list_t topic_monitor_t::updated_gens_in_data(acquired_lock<data_t> &data) {
|
||||
// Atomically acquire the pending updates, swapping in 0.
|
||||
// If there are no pending updates (likely), just return.
|
||||
// Otherwise CAS in 0 and update our topics.
|
||||
@ -82,7 +86,7 @@ generation_list_t topic_monitor_t::updated_gens() {
|
||||
bool cas_success;
|
||||
do {
|
||||
raw = pending_updates_.load(relaxed);
|
||||
if (raw == 0) return *current_gens;
|
||||
if (raw == 0) return data->current_gens;
|
||||
cas_success = pending_updates_.compare_exchange_weak(raw, 0, relaxed, relaxed);
|
||||
} while (!cas_success);
|
||||
|
||||
@ -90,76 +94,88 @@ generation_list_t topic_monitor_t::updated_gens() {
|
||||
auto topics = topic_set_t::from_raw(raw);
|
||||
for (topic_t topic : topic_iter_t{}) {
|
||||
if (topics.get(topic)) {
|
||||
current_gens->at(topic) += 1;
|
||||
FLOG(topic_monitor, "Updating topic", (int)topic, "to", current_gens->at(topic));
|
||||
data->current_gens.at(topic) += 1;
|
||||
FLOG(topic_monitor, "Updating topic", (int)topic, "to", data->current_gens.at(topic));
|
||||
}
|
||||
}
|
||||
return *current_gens;
|
||||
// Report our change.
|
||||
data_notifier_.notify_all();
|
||||
return data->current_gens;
|
||||
}
|
||||
|
||||
void topic_monitor_t::await_metagen(generation_t mgen) {
|
||||
// Fast check of the metagen before taking the lock. If it's changed we're done.
|
||||
generation_t current = current_metagen();
|
||||
FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, ": current", current);
|
||||
if (mgen != current) return;
|
||||
generation_list_t topic_monitor_t::updated_gens() {
|
||||
auto data = data_.acquire();
|
||||
return updated_gens_in_data(data);
|
||||
}
|
||||
|
||||
// Take the lock (which may take a long time) and then check again.
|
||||
std::unique_lock<std::mutex> locker{wait_queue_lock_};
|
||||
current = current_metagen();
|
||||
FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, ": current", current,
|
||||
"acquired lock");
|
||||
if (mgen != current) return;
|
||||
bool topic_monitor_t::try_update_gens_maybe_becoming_reader(generation_list_t *gens) {
|
||||
bool become_reader = false;
|
||||
auto data = data_.acquire();
|
||||
for (;;) {
|
||||
// See if the updated gen list has changed. If so we don't need to become the reader.
|
||||
auto current = updated_gens_in_data(data);
|
||||
FLOG(topic_monitor, "TID", thread_id(), "local mgen", metagen_for(*gens), ": current",
|
||||
metagen_for(current));
|
||||
if (*gens != current) {
|
||||
*gens = current;
|
||||
break;
|
||||
}
|
||||
|
||||
// Our metagen hasn't changed. Push our metagen onto the queue, then wait until we're the
|
||||
// lowest. If multiple waiters are the lowest, then anyone can be the observer.
|
||||
// Note the reason for picking the lowest metagen is to avoid a priority inversion where a lower
|
||||
// metagen (therefore someone who should see changes) is blocked waiting for a higher metagen
|
||||
// (who has already seen the changes).
|
||||
wait_queue_.push(mgen);
|
||||
while (wait_queue_.top() != mgen) {
|
||||
FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, "releasing lock for",
|
||||
wait_queue_.top());
|
||||
wait_queue_notifier_.wait(locker);
|
||||
// The generations haven't changed. Perhaps we become the reader.
|
||||
if (!data->has_reader) {
|
||||
become_reader = true;
|
||||
data->has_reader = true;
|
||||
break;
|
||||
}
|
||||
// Not the reader, wait until the reader notifies us and loop again.
|
||||
data_notifier_.wait(data.get_lock());
|
||||
}
|
||||
wait_queue_.pop();
|
||||
return become_reader;
|
||||
}
|
||||
|
||||
// We now have the lowest metagen in the wait queue. Notice we still hold the lock.
|
||||
// Read until the metagen changes. It may already have changed.
|
||||
// Note because changes are coalesced, we can read a lot, potentially draining the pipe.
|
||||
current = current_metagen();
|
||||
FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, "considering waiting for mgen",
|
||||
current);
|
||||
while (mgen == current) {
|
||||
int fd = pipes_.read.fd();
|
||||
generation_list_t topic_monitor_t::await_gens(const generation_list_t &input_gens) {
|
||||
generation_list_t gens = input_gens;
|
||||
while (gens == input_gens) {
|
||||
bool become_reader = try_update_gens_maybe_becoming_reader(&gens);
|
||||
if (become_reader) {
|
||||
// Now we are the reader. Read from the pipe, and then update with any changes.
|
||||
// Note we no longer hold the lock.
|
||||
assert(gens == input_gens &&
|
||||
"Generations should not have changed if we are the reader.");
|
||||
int fd = pipes_.read.fd();
|
||||
#if TOPIC_MONITOR_TSAN_WORKAROUND
|
||||
// Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read() call
|
||||
// until data is available (that is, fish would use 100% cpu while waiting for processes).
|
||||
// The select prevents that.
|
||||
fd_set fds;
|
||||
FD_ZERO(&fds);
|
||||
FD_SET(fd, &fds);
|
||||
(void)select(fd + 1, &fds, nullptr, nullptr, nullptr /* timeout */);
|
||||
// Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read()
|
||||
// call until data is available (that is, fish would use 100% cpu while waiting for
|
||||
// processes). The select prevents that.
|
||||
fd_set fds;
|
||||
FD_ZERO(&fds);
|
||||
FD_SET(fd, &fds);
|
||||
(void)select(fd + 1, &fds, nullptr, nullptr, nullptr /* timeout */);
|
||||
#endif
|
||||
uint8_t ignored[PIPE_BUF];
|
||||
(void)read(fd, ignored, sizeof ignored);
|
||||
current = current_metagen();
|
||||
FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen,
|
||||
"read() complete, current mgen is", current);
|
||||
}
|
||||
uint8_t ignored[PIPE_BUF];
|
||||
(void)read(fd, ignored, sizeof ignored);
|
||||
|
||||
// Release the lock and wake up the remaining waiters.
|
||||
FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, "awakening all waiters");
|
||||
locker.unlock();
|
||||
wait_queue_notifier_.notify_all();
|
||||
// We are finished reading. We must stop being the reader, and post on the condition
|
||||
// variable to wake up any other threads waiting for us to finish reading.
|
||||
auto data = data_.acquire();
|
||||
gens = data->current_gens;
|
||||
FLOG(topic_monitor, "TID", thread_id(), "local mgen", metagen_for(input_gens),
|
||||
"read() complete, current mgen is", metagen_for(gens));
|
||||
assert(data->has_reader && "We should be the reader");
|
||||
data->has_reader = false;
|
||||
data_notifier_.notify_all();
|
||||
}
|
||||
}
|
||||
return gens;
|
||||
}
|
||||
|
||||
topic_set_t topic_monitor_t::check(generation_list_t *gens, topic_set_t topics, bool wait) {
|
||||
if (topics.none()) return topics;
|
||||
|
||||
generation_list_t current = updated_gens();
|
||||
topic_set_t changed{};
|
||||
for (;;) {
|
||||
// Load the topic list and see if anything has changed.
|
||||
generation_list_t current = updated_gens();
|
||||
for (topic_t topic : topic_iter_t{}) {
|
||||
if (topics.get(topic)) {
|
||||
assert(gens->at(topic) <= current.at(topic) &&
|
||||
@ -176,9 +192,8 @@ topic_set_t topic_monitor_t::check(generation_list_t *gens, topic_set_t topics,
|
||||
break;
|
||||
}
|
||||
|
||||
// Try again. Note that we use the metagen corresponding to the topic list we just
|
||||
// inspected, not the current one (which may have updates since we checked).
|
||||
await_metagen(metagen_for(current));
|
||||
// Wait until our gens change.
|
||||
current = await_gens(current);
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
@ -11,7 +11,6 @@
|
||||
#include <condition_variable>
|
||||
#include <limits>
|
||||
#include <numeric>
|
||||
#include <queue>
|
||||
|
||||
/** Topic monitoring support. Topics are conceptually "a thing that can happen." For example,
|
||||
delivery of a SIGINT, a child process exits, etc. It is possible to post to a topic, which means
|
||||
@ -73,47 +72,52 @@ class topic_monitor_t {
|
||||
static_assert(sizeof(topic_set_raw_t) * CHAR_BIT >= enum_count<topic_t>(),
|
||||
"topic_set_raw is too small");
|
||||
|
||||
/// The current topic generation list, protected by a mutex. Note this may be opportunistically
|
||||
/// updated at the point it is queried.
|
||||
owning_lock<generation_list_t> current_gen_{{}};
|
||||
// Some stuff that needs to be protected by the same lock.
|
||||
struct data_t {
|
||||
/// The current generation list.
|
||||
generation_list_t current_gens{};
|
||||
|
||||
/// Whether there is a thread currently reading from the notifier pipe.
|
||||
bool has_reader{false};
|
||||
};
|
||||
owning_lock<data_t> data_{};
|
||||
|
||||
/// Condition variable for broadcasting notifications.
|
||||
/// This is associated with data_'s mutex.
|
||||
std::condition_variable data_notifier_{};
|
||||
|
||||
/// The set of topics which have pending increments.
|
||||
/// This is managed via atomics.
|
||||
std::atomic<topic_set_raw_t> pending_updates_{};
|
||||
|
||||
/// When a topic set is queried in a blocking way, the waiters are put into a queue. The waiter
|
||||
/// with the smallest metagen is responsible for announcing the change to the rest of the
|
||||
/// waiters. (The metagen is just the sum of the current generations.) Note that this is a
|
||||
/// max-heap that defaults to std::less; by using std::greater it becomes a min heap. This is
|
||||
/// protected by wait_queue_lock_.
|
||||
std::priority_queue<generation_t, std::vector<generation_t>, std::greater<generation_t>>
|
||||
wait_queue_;
|
||||
|
||||
/// Mutex guarding the wait queue.
|
||||
std::mutex wait_queue_lock_{};
|
||||
|
||||
/// Condition variable for broadcasting notifications.
|
||||
std::condition_variable wait_queue_notifier_{};
|
||||
|
||||
/// Pipes used to communicate changes from the signal handler.
|
||||
/// Self-pipes used to communicate changes.
|
||||
/// The writer is a signal handler.
|
||||
/// "The reader" refers to a thread that wants to wait for changes. Only one thread can be the
|
||||
/// reader at a given time.
|
||||
autoclose_pipes_t pipes_;
|
||||
|
||||
/// \return the metagen for a topic generation list.
|
||||
/// The metagen is simply the sum of topic generations. Note it is monotone.
|
||||
static inline generation_t metagen_for(const generation_list_t &lst) {
|
||||
return std::accumulate(lst.begin(), lst.end(), generation_t{0});
|
||||
}
|
||||
/// Apply any pending updates to the data.
|
||||
/// This accepts data because it must be locked.
|
||||
/// \return the updated generation list.
|
||||
generation_list_t updated_gens_in_data(acquired_lock<data_t> &data);
|
||||
|
||||
/// Wait for the current metagen to become different from \p gen.
|
||||
/// If it is already different, return immediately.
|
||||
void await_metagen(generation_t gen);
|
||||
/// Given a list of input generations, attempt to update them to something newer.
|
||||
/// If \p gens is older, then just return those by reference, and directly return false (not
|
||||
/// becoming the reader).
|
||||
/// If \p gens is current and there is not a reader, then do not update \p gens and return true,
|
||||
/// indicating we should become the reader. Now it is our responsibility to read from the pipes
|
||||
/// and notify on a change via the condition variable.
|
||||
/// If \p gens is current, and there is already a reader, then wait until the reader notifies us
|
||||
/// and try again.
|
||||
bool try_update_gens_maybe_becoming_reader(generation_list_t *gens);
|
||||
|
||||
/// Return the current generation list, opportunistically applying any pending updates.
|
||||
/// Wait for some entry in the list of generations to change.
|
||||
/// \return the new gens.
|
||||
generation_list_t await_gens(const generation_list_t &input_gens);
|
||||
|
||||
/// \return the current generation list, opportunistically applying any pending updates.
|
||||
generation_list_t updated_gens();
|
||||
|
||||
/// \return the metagen for the current topic generation list.
|
||||
inline generation_t current_metagen() { return metagen_for(updated_gens()); }
|
||||
|
||||
public:
|
||||
topic_monitor_t();
|
||||
~topic_monitor_t();
|
||||
|
Loading…
x
Reference in New Issue
Block a user