Use fd_event_signaller_t in fd_monitor_t

fd_monitor_t allows observing a collection of fds. It also has its own
fd, which it uses to awaken itself when there are changes. Switch to
using fd_event_signaller_t instead of a pipe; this reduces the number of
file descriptors and is more efficient under Linux.
This commit is contained in:
ridiculousfish 2021-02-06 19:21:03 -08:00
parent e004930947
commit a942df3886
2 changed files with 37 additions and 69 deletions

View File

@ -11,45 +11,14 @@
static constexpr uint64_t kUsecPerMsec = 1000;
static constexpr uint64_t kUsecPerSec = 1000 * kUsecPerMsec;
fd_monitor_t::fd_monitor_t() {
auto self_pipe = make_autoclose_pipes();
if (!self_pipe) {
DIE("Unable to create pipes");
}
fd_monitor_t::fd_monitor_t() = default;
// Ensure the write side is nonblocking to avoid deadlock.
notify_write_fd_ = std::move(self_pipe->write);
if (make_fd_nonblocking(notify_write_fd_.fd())) {
wperror(L"fcntl");
}
// Add an item for ourselves.
// We don't need to go through 'pending' because we have not yet launched the thread, and don't
// want to yet.
auto callback = [this](const autoclose_fd_t &fd, item_wake_reason_t reason) {
ASSERT_IS_BACKGROUND_THREAD();
assert(reason == item_wake_reason_t::readable &&
"Should not be poked, or time out with kNoTimeout");
(void)reason;
// Read some to take data off of the notifier.
char buff[4096];
ssize_t amt = read(fd.fd(), buff, sizeof buff);
if (amt > 0) {
this->has_pending_or_pokes_ = true;
} else if (amt == 0) {
this->terminate_ = true;
} else {
wperror(L"read");
}
};
items_.push_back(fd_monitor_item_t(std::move(self_pipe->read), std::move(callback)));
}
// Extremely hacky destructor to clean up.
// This is used in the tests to not leave stale fds around.
// In fish shell, we never invoke the dtor so it doesn't matter that this is very dumb.
fd_monitor_t::~fd_monitor_t() {
notify_write_fd_.close();
// In orindary usage, we never invoke the dtor.
// This is used in the tests to not leave stale fds around.
// That is why this is very hacky!
data_.acquire()->terminate = true;
change_signaller_.post();
while (data_.acquire()->running) {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
@ -87,26 +56,23 @@ fd_monitor_item_id_t fd_monitor_t::add(fd_monitor_item_t &&item) {
DIE("Unable to create a new pthread");
}
}
// Tickle our notifier.
char byte = 0;
(void)write_loop(notify_write_fd_.fd(), &byte, 1);
// Tickle our signaller.
change_signaller_.post();
return item_id;
}
void fd_monitor_t::poke_item(fd_monitor_item_id_t item_id) {
assert(item_id > 0 && "Invalid item ID");
bool needs_notifier_byte = false;
bool needs_notification = false;
{
auto data = data_.acquire();
needs_notifier_byte = data->pokelist.empty();
needs_notification = data->pokelist.empty();
// Insert it, sorted.
auto where = std::lower_bound(data->pokelist.begin(), data->pokelist.end(), item_id);
data->pokelist.insert(where, item_id);
}
if (needs_notifier_byte) {
// Tickle our notifier.
char byte = 0;
(void)write_loop(notify_write_fd_.fd(), &byte, 1);
if (needs_notification) {
change_signaller_.post();
}
}
@ -161,11 +127,16 @@ void fd_monitor_t::run_in_background() {
pokelist.clear();
}
uint64_t timeout_usec = fd_monitor_item_t::kNoTimeout;
int max_fd = -1;
fd_set fds;
FD_ZERO(&fds);
// Our change_signaller is special cased.
int change_signal_fd = change_signaller_.read_fd();
FD_SET(change_signal_fd, &fds);
int max_fd = change_signal_fd;
auto now = std::chrono::steady_clock::now();
uint64_t timeout_usec = fd_monitor_item_t::kNoTimeout;
for (auto &item : items_) {
FD_SET(item.fd.fd(), &fds);
@ -207,16 +178,13 @@ void fd_monitor_t::run_in_background() {
now = std::chrono::steady_clock::now();
items_.erase(std::remove_if(items_.begin(), items_.end(), servicer), items_.end());
if (terminate_) {
// Time to go.
data_.acquire()->running = false;
return;
}
// Maybe we got some new items. Check if our callback says so, or if this is the wait
// Handle any changes if the change signaller was set. Alternatively this may be the wait
// lap, in which case we might want to commit to exiting.
if (has_pending_or_pokes_ || is_wait_lap) {
if (FD_ISSET(change_signal_fd, &fds) || is_wait_lap) {
// Clear the change signaller before processing incoming changes.
change_signaller_.try_consume();
auto data = data_.acquire();
// Move from 'pending' to 'items'.
items_.insert(items_.end(), std::make_move_iterator(data->pending.begin()),
std::make_move_iterator(data->pending.end()));
@ -227,10 +195,9 @@ void fd_monitor_t::run_in_background() {
pokelist = std::move(data->pokelist);
data->pokelist.clear();
has_pending_or_pokes_ = false;
if (is_wait_lap && items_.size() == 1) {
// We had no items, waited a bit, and still have no items. We're going to shut down.
if (data->terminate || (is_wait_lap && items_.empty())) {
// Maybe terminate is set.
// Alternatively, maybe we had no items, waited a bit, and still have no items.
// It's important to do this while holding the lock, otherwise we race with new
// items being added.
assert(data->running && "Thread should be running because we're that thread");

View File

@ -103,6 +103,10 @@ class fd_monitor_t {
// The background thread runner.
void run_in_background();
// If our self-signaller is reported as ready, this reads from it and handles any changes.
// Called in the background thread.
void handle_self_signal_in_background();
// Poke items in the pokelist, removing any items that close their FD.
// The pokelist is consumed after this.
// This is only called in the background thread.
@ -111,13 +115,6 @@ class fd_monitor_t {
// The list of items to monitor. This is only accessed on the background thread.
item_list_t items_{};
// Set to true by the background thread when our self-pipe becomes readable.
bool has_pending_or_pokes_{false};
// Latched to true by the background thread if our self-pipe is closed, which indicates we are
// in the destructor and so should terminate.
bool terminate_{false};
struct data_t {
/// Pending items. This is set under the lock, then the background thread grabs them.
item_list_t pending{};
@ -130,11 +127,15 @@ class fd_monitor_t {
/// Whether the thread is running.
bool running{false};
// Set if we should terminate.
bool terminate{false};
};
owning_lock<data_t> data_;
/// The write end of our self-pipe.
autoclose_fd_t notify_write_fd_{};
/// Our self-signaller. When this is written to, it means there are new items pending, or new
/// items in the pokelist, or terminate is set.
fd_event_signaller_t change_signaller_;
};
#endif