Use iothread pool for background fillthreads

Background fillthreads are used when we want to populate a buffer from an
external command. The most common is command substitution.

Prior to this commit, fish would spin up a fillthread whenever required.
This ended up being quite expensive.

Switch to using the iothread pool instead. This enables reusing the same
thread(s), which prevents needing to spawn new threads. This shows a big
perf win on the alias benchmark (766 -> 378 ms).
This commit is contained in:
ridiculousfish 2019-11-23 15:52:53 -08:00
parent 106af5f56a
commit b5d0075406
2 changed files with 29 additions and 21 deletions

View File

@ -127,34 +127,37 @@ void io_buffer_t::run_background_fillthread(autoclose_fd_t readfd) {
void io_buffer_t::begin_background_fillthread(autoclose_fd_t fd) { void io_buffer_t::begin_background_fillthread(autoclose_fd_t fd) {
ASSERT_IS_MAIN_THREAD(); ASSERT_IS_MAIN_THREAD();
assert(!fillthread_ && "Already have a fillthread"); assert(!fillthread_running() && "Already have a fillthread");
// We want our background thread to own the fd but it's not easy to move into a std::function. // We want our background thread to own the fd but it's not easy to move into a std::function.
// Use a shared_ptr. // Use a shared_ptr.
auto fdref = move_to_sharedptr(std::move(fd)); auto fdref = move_to_sharedptr(std::move(fd));
// Our function to read until the receiver is closed. // Construct a promise that can go into our background thread.
// It's OK to capture 'this' by value because 'this' owns the background thread and joins it auto promise = std::make_shared<std::promise<void>>();
// before dtor.
std::function<void(void)> func = [this, fdref]() {
this->run_background_fillthread(std::move(*fdref));
};
pthread_t fillthread{}; // Get the future associated with our promise.
if (!make_pthread(&fillthread, std::move(func))) { // Note this should only ever be called once.
wperror(L"make_pthread"); fillthread_waiter_ = promise->get_future();
}
fillthread_ = fillthread; // Run our function to read until the receiver is closed.
// It's OK to capture 'this' by value because 'this' owns the background thread and waits for it
// before dtor.
iothread_perform([this, promise, fdref]() {
this->run_background_fillthread(std::move(*fdref));
promise->set_value();
});
} }
void io_buffer_t::complete_background_fillthread() { void io_buffer_t::complete_background_fillthread() {
ASSERT_IS_MAIN_THREAD(); ASSERT_IS_MAIN_THREAD();
assert(fillthread_ && "Should have a fillthread"); assert(fillthread_running() && "Should have a fillthread");
shutdown_fillthread_ = true; shutdown_fillthread_ = true;
void *ignored = nullptr;
int err = pthread_join(*fillthread_, &ignored); // Wait for the fillthread to fulfill its promise, and then clear the future so we know we no
DIE_ON_FAILURE(err); // longer have one.
fillthread_.reset(); fillthread_waiter_.wait();
fillthread_waiter_ = {};
} }
shared_ptr<io_bufferfill_t> io_bufferfill_t::create(const io_chain_t &conflicts, shared_ptr<io_bufferfill_t> io_bufferfill_t::create(const io_chain_t &conflicts,
@ -195,7 +198,7 @@ io_pipe_t::~io_pipe_t() = default;
io_bufferfill_t::~io_bufferfill_t() = default; io_bufferfill_t::~io_bufferfill_t() = default;
io_buffer_t::~io_buffer_t() { io_buffer_t::~io_buffer_t() {
assert(!fillthread_ && "io_buffer_t destroyed with outstanding fillthread"); assert(!fillthread_running() && "io_buffer_t destroyed with outstanding fillthread");
} }
void io_chain_t::remove(const shared_ptr<const io_data_t> &element) { void io_chain_t::remove(const shared_ptr<const io_data_t> &element) {

View File

@ -7,6 +7,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <atomic> #include <atomic>
#include <future>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <vector> #include <vector>
@ -279,8 +280,9 @@ class io_buffer_t {
/// Atomic flag indicating our fillthread should shut down. /// Atomic flag indicating our fillthread should shut down.
relaxed_atomic_bool_t shutdown_fillthread_{false}; relaxed_atomic_bool_t shutdown_fillthread_{false};
/// The background fillthread itself, if any. /// The future allowing synchronization with the background fillthread, if the fillthread is
maybe_t<pthread_t> fillthread_{}; /// running. The fillthread fulfills the corresponding promise when it exits.
std::future<void> fillthread_waiter_{};
/// Read limit of the buffer. /// Read limit of the buffer.
const size_t read_limit_; const size_t read_limit_;
@ -297,6 +299,9 @@ class io_buffer_t {
/// End the background fillthread operation. /// End the background fillthread operation.
void complete_background_fillthread(); void complete_background_fillthread();
/// Helper to return whether the fillthread is running.
bool fillthread_running() const { return fillthread_waiter_.valid(); }
public: public:
explicit io_buffer_t(size_t limit) : buffer_(limit), read_limit_(limit) { explicit io_buffer_t(size_t limit) : buffer_(limit), read_limit_(limit) {
// Explicitly reset the discard flag because we share this buffer. // Explicitly reset the discard flag because we share this buffer.
@ -308,7 +313,7 @@ class io_buffer_t {
/// Access the underlying buffer. /// Access the underlying buffer.
/// This requires that the background fillthread be none. /// This requires that the background fillthread be none.
const separated_buffer_t<std::string> &buffer() const { const separated_buffer_t<std::string> &buffer() const {
assert(!fillthread_ && "Cannot access buffer during background fill"); assert(!fillthread_running() && "Cannot access buffer during background fill");
return buffer_; return buffer_;
} }