Adopt fd_monitor in bufferfill

This switches bufferfills from using an exclusively-owned thread, to
sharing an fd_monitor. This allows multiple bufferfills to all use the same
thread.
This commit is contained in:
ridiculousfish 2020-02-04 17:49:07 -08:00
parent 057c3a9e75
commit a765026c4c
2 changed files with 82 additions and 92 deletions

View File

@ -15,6 +15,7 @@
#include "common.h"
#include "exec.h"
#include "fallback.h" // IWYU pragma: keep
#include "fd_monitor.h"
#include "iothread.h"
#include "path.h"
#include "redirection.h"
@ -27,6 +28,13 @@
/// Base open mode to pass to calls to open.
#define OPEN_MASK 0666
/// Provide the fd monitor used for background fillthread operations.
static fd_monitor_t &fd_monitor() {
// Deliberately leaked to avoid shutdown dtors.
static fd_monitor_t *fdm = new fd_monitor_t();
return *fdm;
}
io_data_t::~io_data_t() = default;
io_pipe_t::~io_pipe_t() = default;
io_fd_t::~io_fd_t() = default;
@ -58,94 +66,43 @@ void io_buffer_t::append_from_stream(const output_stream_t &stream) {
buffer_.append_wide_buffer(input);
}
void io_buffer_t::run_background_fillthread(autoclose_fd_t readfd) {
// Here we are running the background fillthread, executing in a background thread.
// Our plan is:
// 1. poll via select() until the fd is readable.
// 2. Acquire the append lock.
// 3. read until EAGAIN (would block), appending
// 4. release the lock
// The purpose of holding the lock around the read calls is to ensure that data from background
// processes isn't weirdly interspersed with data directly transferred (from a builtin to a
// buffer).
ssize_t io_buffer_t::read_once(int fd) {
assert(fd >= 0 && "Invalid fd");
ASSERT_IS_LOCKED(append_lock_);
errno = 0;
char buff[4096 * 4];
const int fd = readfd.fd();
// 100 msec poll rate. Note that in most cases, the write end of the pipe will be closed so
// select() will return; the polling is important only for weird cases like a background process
// launched in a command substitution.
const long poll_timeout_usec = 100000;
struct timeval tv = {};
tv.tv_usec = poll_timeout_usec;
bool shutdown = false;
while (!shutdown) {
bool readable = false;
// Poll if our fd is readable.
// Do this even if the shutdown flag is set. It's important we wait for the fd at least
// once. For short-lived processes, it's possible for the process to execute, produce output
// (fits in the pipe buffer) and be reaped before we are even scheduled. So always wait at
// least once on the fd. Note that doesn't mean we will wait for the full poll duration;
// typically what will happen is our pipe will be widowed and so this will return quickly.
// It's only for weird cases (e.g. a background process launched inside a command
// substitution) that we'll wait out the entire poll time.
fd_set fds;
FD_ZERO(&fds);
FD_SET(fd, &fds);
int ret = select(fd + 1, &fds, nullptr, nullptr, &tv);
// select(2) is allowed to (and does) update `tv` to indicate how much time was left, so we
// need to restore the desired value each time.
tv.tv_usec = poll_timeout_usec;
readable = ret > 0;
if (ret < 0 && errno != EINTR) {
// Surprising error.
wperror(L"select");
return;
}
// Only check the shutdown flag if we timed out.
// It's important that if select() indicated we were readable, that we call select() again
// allowing it to time out. Note the typical case is that the fd will be closed, in which
// case select will return immediately.
if (!readable) {
shutdown = this->shutdown_fillthread_;
}
if (readable || shutdown) {
// Now either our fd is readable, or we have set the shutdown flag.
// Either way acquire the lock and read until we reach EOF, or EAGAIN / EINTR.
scoped_lock locker(append_lock_);
ssize_t ret;
do {
errno = 0;
char buff[4096];
ret = read(fd, buff, sizeof buff);
if (ret > 0) {
buffer_.append(&buff[0], &buff[ret]);
} else if (ret == 0) {
shutdown = true;
} else if (ret == -1 && errno == 0) {
// No specific error. We assume we just return,
// since that's what we do in read_blocked.
return;
} else if (errno != EINTR && errno != EAGAIN) {
wperror(L"read");
return;
}
} while (ret > 0);
}
// We want to swallow EINTR only; in particular EAGAIN needs to be returned back to the caller.
ssize_t ret;
do {
ret = read(fd, buff, sizeof buff);
} while (ret < 0 && errno == EINTR);
if (ret < 0 && errno != EAGAIN) {
wperror(L"read");
} else if (ret > 0) {
buffer_.append(&buff[0], &buff[ret]);
}
assert(shutdown && "Should only exit loop if shutdown flag is set");
return ret;
}
void io_buffer_t::begin_background_fillthread(autoclose_fd_t fd) {
void io_buffer_t::begin_filling(autoclose_fd_t fd) {
ASSERT_IS_MAIN_THREAD();
assert(!fillthread_running() && "Already have a fillthread");
// We want our background thread to own the fd but it's not easy to move into a std::function.
// Use a shared_ptr.
auto fdref = move_to_sharedptr(std::move(fd));
// We want to fill buffer_ by reading from fd. fd is the read end of a pipe; the write end is
// owned by another process, or something else writing in fish.
// Pass fd to an fd_monitor. It will add fd to its select() loop, and give us a callback when
// the fd is readable, or when our timeout is hit. The usual path is that we will get called
// back, read a bit from the fd, and append it to the buffer. Eventually the write end of the
// pipe will be closed - probably the other process exited - and fd will be widowed; read() will
// then return 0 and we will stop reading.
// In exotic circumstances the write end of the pipe will not be closed; this may happen in
// e.g.:
// cmd ( background & ; echo hi )
// Here the background process will inherit the write end of the pipe and hold onto it forever.
// In this case, we will hit the timeout on waiting for more data and notice that the shutdown
// flag is set (this indicates that the command substitution is done); in this case we will read
// until we get EAGAIN and then give up.
// Construct a promise that can go into our background thread.
auto promise = std::make_shared<std::promise<void>>();
@ -154,13 +111,45 @@ void io_buffer_t::begin_background_fillthread(autoclose_fd_t fd) {
// Note this should only ever be called once.
fillthread_waiter_ = promise->get_future();
// 100 msec poll rate. Note that in most cases, the write end of the pipe will be closed so
// select() will return; the polling is important only for weird cases like a background process
// launched in a command substitution.
constexpr uint64_t usec_per_msec = 1000;
uint64_t poll_usec = 100 * usec_per_msec;
// Run our function to read until the receiver is closed.
// It's OK to capture 'this' by value because 'this' owns the background thread and waits for it
// before dtor.
iothread_perform_cantwait([this, promise, fdref]() {
this->run_background_fillthread(std::move(*fdref));
promise->set_value();
});
// It's OK to capture 'this' by value because 'this' waits for the promise in its dtor.
fd_monitor_item_t item;
item.fd = std::move(fd);
item.timeout_usec = poll_usec;
item.callback = [this, promise](autoclose_fd_t &fd, bool timed_out) {
ASSERT_IS_BACKGROUND_THREAD();
// Only check the shutdown flag if we timed out.
// It's important that if select() indicated we were readable, that we call select() again
// allowing it to time out. Note the typical case is that the fd will be closed, in which
// case select will return immediately.
bool done = false;
if (!timed_out) {
// select() reported us as readable; read a bit.
scoped_lock locker(append_lock_);
ssize_t ret = read_once(fd.fd());
done = (ret == 0 || (ret < 0 && errno != EAGAIN));
} else if (shutdown_fillthread_) {
// Here our caller asked us to shut down; read while we keep getting data.
// This will stop when the fd is closed or if we get EAGAIN.
scoped_lock locker(append_lock_);
ssize_t ret;
do {
ret = read_once(fd.fd());
} while (ret > 0);
done = true;
}
if (done) {
fd.close();
promise->set_value();
}
};
fd_monitor().add(std::move(item));
}
void io_buffer_t::complete_background_fillthread() {
@ -191,7 +180,7 @@ shared_ptr<io_bufferfill_t> io_bufferfill_t::create(const fd_set_t &conflicts,
}
// Our fillthread gets the read end of the pipe; out_pipe gets the write end.
auto buffer = std::make_shared<io_buffer_t>(buffer_limit);
buffer->begin_background_fillthread(std::move(pipes->read));
buffer->begin_filling(std::move(pipes->read));
return std::make_shared<io_bufferfill_t>(std::move(pipes->write), buffer);
}

View File

@ -308,11 +308,12 @@ class io_buffer_t {
/// Lock for appending.
std::mutex append_lock_{};
/// Called in the background thread to run it.
void run_background_fillthread(autoclose_fd_t readfd);
/// Read a bit, filling the buffer. The append lock must be held.
/// \return positive on success, 0 if closed, -1 on error (in which case errno will be set).
ssize_t read_once(int fd);
/// Begin the background fillthread operation, reading from the given fd.
void begin_background_fillthread(autoclose_fd_t readfd);
/// Begin the fill operation, reading from the given fd in the background.
void begin_filling(autoclose_fd_t readfd);
/// End the background fillthread operation.
void complete_background_fillthread();