From 6f682c84052a8bd52f14122a36e878037cbb868e Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Fri, 1 Feb 2019 01:58:06 -0800 Subject: [PATCH] Fill io_buffer via background thread This is a large change to how io_buffers are filled. The essential problem comes about with code like (example): echo ( /bin/pwd ) The output of /bin/pwd must go to fish, not the tty. To arrange for this, fish does the following: 1. Invoke pipe() to create a pipe. 2. Add an io_bufferfill_t redirection that owns the write end of the pipe. 3. After fork (or equiv), call dup2() to replace pwd's stdout with this pipe. Now when /bin/pwd writes, it will send output to the read end of the pipe. But who reads it? Prior to this fix, fish would do the following in a loop: 1. select() on the pipe with a 10 msec timeout 2. waitpid(WNOHANG) on the pwd proc This polling is ugly and confusing and is what is replaced here. With this new change, fish now reads from the pipe via a background thread: 1. Spawn a background pthread, which select()s on the pipe's read end with a long (100 msec) timeout. 2. In the foreground, waitpid() (allowing hanging) on the pwd proc. The big win here is a major simplification of job_t::continue_job() since it no longer has to worry about filling buffers. This will make things easier for concurrent execution. It may not be obvious why the background thread still needs a poll (100 msec). The answer is for cases where the write end of the fd escapes, in particular background processes invoked inside command substitutions. psub is perhaps the only important case of this (other shells typically just hang here). --- src/exec.cpp | 12 +--- src/fish_tests.cpp | 4 +- src/io.cpp | 170 +++++++++++++++++++++++++++++++-------------- src/io.h | 76 ++++++++++++-------- src/proc.cpp | 85 +---------------------- tests/count.err | 3 + tests/count.in | 12 ++++ tests/count.out | 3 + 8 files changed, 191 insertions(+), 174 deletions(-) diff --git a/src/exec.cpp b/src/exec.cpp index 7de91f5be..eb1a88259 100644 --- a/src/exec.cpp +++ b/src/exec.cpp @@ -782,12 +782,8 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr // Remove our write pipe and forget it. This may close the pipe, unless another thread has // claimed it (background write) or another process has inherited it. - auto block_output_buffer = block_output_bufferfill->buffer(); io_chain.remove(block_output_bufferfill); - block_output_bufferfill.reset(); - - // Make the buffer populate itself from whatever was written to the write pipe. - block_output_buffer->read_to_wouldblock(); + auto block_output_buffer = io_bufferfill_t::finish(std::move(block_output_bufferfill)); // Resolve our IO chain to a sequence of dup2s. auto dup2s = dup2_list_t::resolve_chain(io_chain); @@ -969,7 +965,7 @@ bool exec_job(parser_t &parser, shared_ptr j) { if ((io->io_mode == io_mode_t::bufferfill)) { // The read limit is dictated by the last bufferfill. const auto *bf = static_cast(io.get()); - stdout_read_limit = bf->buffer()->buffer().limit(); + stdout_read_limit = bf->buffer()->read_limit(); } } @@ -1040,9 +1036,7 @@ static int exec_subshell_internal(const wcstring &cmd, parser_t &parser, wcstrin if (parser.eval(cmd, io_chain_t{bufferfill}, SUBST) == 0) { subcommand_status = proc_get_last_status(); } - buffer = bufferfill->buffer(); - bufferfill.reset(); - buffer->read_to_wouldblock(); + buffer = io_bufferfill_t::finish(std::move(bufferfill)); } if (buffer && buffer->buffer().discarded()) subcommand_status = STATUS_READ_TOO_MUCH; diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index e45f22e34..cecb566f3 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -943,9 +943,7 @@ static void test_1_cancellation(const wchar_t *src) { pthread_kill(thread, SIGINT); }); parser_t::principal_parser().eval(src, io_chain_t{filler}, TOP); - auto buffer = filler->buffer(); - filler.reset(); - buffer->read_to_wouldblock(); + auto buffer = io_bufferfill_t::finish(std::move(filler)); if (buffer->buffer().size() != 0) { err(L"Expected 0 bytes in out_buff, but instead found %lu bytes\n", buffer->buffer().size()); diff --git a/src/io.cpp b/src/io.cpp index 32731776c..713bc1432 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -11,6 +11,7 @@ #include "exec.h" #include "fallback.h" // IWYU pragma: keep #include "io.h" +#include "iothread.h" #include "wutil.h" // IWYU pragma: keep io_data_t::~io_data_t() = default; @@ -28,6 +29,7 @@ void io_pipe_t::print() const { void io_bufferfill_t::print() const { fwprintf(stderr, L"bufferfill {%d}\n", write_fd_.fd()); } void io_buffer_t::append_from_stream(const output_stream_t &stream) { + scoped_lock locker(append_lock_); if (buffer_.discarded()) return; if (stream.buffer().discarded()) { buffer_.set_discard(); @@ -36,54 +38,108 @@ void io_buffer_t::append_from_stream(const output_stream_t &stream) { buffer_.append_wide_buffer(stream.buffer()); } -long io_buffer_t::read_some() { - int fd = read_.fd(); - assert(fd >= 0 && "Should have a valid fd"); - debug(4, L"io_buffer_t::read: blocking read on fd %d", fd); - long len; - char b[4096]; - do { - len = read(fd, b, sizeof b); - } while (len < 0 && errno == EINTR); - if (len > 0) { - buffer_.append(&b[0], &b[len]); - } - return len; -} +void io_buffer_t::run_background_fillthread(autoclose_fd_t readfd) { + // Here we are running the background fillthread, executing in a background thread. + // Our plan is: + // 1. poll via select() until the fd is readable. + // 2. Acquire the append lock. + // 3. read until EAGAIN (would block), appending + // 4. release the lock + // The purpose of holding the lock around the read calls is to ensure that data from background + // processes isn't weirdly interspersed with data directly transferred (from a builtin to a buffer). -void io_buffer_t::read_to_wouldblock() { - long len; - do { - len = read_some(); - } while (len > 0); - if (len < 0 && errno != EAGAIN) { - debug(1, _(L"An error occured while reading output from code block on fd %d"), read_.fd()); - wperror(L"io_buffer_t::read"); - } -} + const int fd = readfd.fd(); -bool io_buffer_t::try_read(unsigned long timeout_usec) { - struct timeval timeout; - timeout.tv_sec = 0; - timeout.tv_usec = timeout_usec; - int fd = read_.fd(); - assert(fd >= 0 && "Should have a valid fd"); - fd_set fds; - FD_ZERO(&fds); - FD_SET(fd, &fds); - int ret = select(fd + 1, &fds, nullptr, nullptr, &timeout); - if (ret < 0) { - // Treat EINTR as timeout. - if (errno != EINTR) { - debug(1, _(L"An error occured inside select on fd %d"), fd); - wperror(L"io_buffer_t::try_read"); + // 100 msec poll rate. Note that in most cases, the write end of the pipe will be closed so + // select() will return; the polling is important only for weird cases like a background process + // launched in a command substitution. + const long poll_timeout_usec = 100000; + struct timeval tv = {}; + tv.tv_usec = poll_timeout_usec; + + bool shutdown = false; + while (!shutdown) { + bool readable = false; + + // Poll if our fd is readable. + // Do this even if the shutdown flag is set. It's important we wait for the fd at least + // once. For short-lived processes, it's possible for the process to execute, produce output + // (fits in the pipe buffer) and be reaped before we are even scheduled. So always wait at + // least once on the fd. Note that doesn't mean we will wait for the full poll duration; + // typically what will happen is our pipe will be widowed and so this will return quickly. + // It's only for weird cases (e.g. a background process launched inside a command + // substitution) that we'll wait out the entire poll time. + fd_set fds; + FD_ZERO(&fds); + FD_SET(fd, &fds); + int ret = select(fd + 1, &fds, NULL, NULL, &tv); + readable = ret > 0; + if (ret < 0 && errno != EINTR) { + // Surprising error. + wperror(L"select"); + return; + } + + // Only check the shutdown flag if we timed out. + // It's important that if select() indicated we were readable, that we call select() again + // allowing it to time out. Note the typical case is that the fd will be closed, in which + // case select will return immediately. + if (! readable) { + shutdown = this->shutdown_fillthread_.load(std::memory_order_relaxed); + } + + if (readable || shutdown) { + // Now either our fd is readable, or we have set the shutdown flag. + // Either way acquire the lock and read until we reach EOF, or EAGAIN / EINTR. + scoped_lock locker(append_lock_); + ssize_t ret; + do { + char buff[4096]; + ret = read(fd, buff, sizeof buff); + if (ret > 0) { + buffer_.append(&buff[0], &buff[ret]); + } else if (ret == 0) { + shutdown = true; + } else if (errno != EINTR && errno != EAGAIN) { + wperror(L"read"); + return; + } + } while (ret > 0); } - return false; } - if (ret > 0) { - read_some(); + assert(shutdown && "Should only exit loop if shutdown flag is set"); +} + +void io_buffer_t::begin_background_fillthread(autoclose_fd_t fd) { + ASSERT_IS_MAIN_THREAD(); + assert(!fillthread_ && "Already have a fillthread"); + + // We want our background thread to own the fd but it's not easy to move into a std::function. + // Use a shared_ptr. + auto fdref = std::make_shared(std::move(fd)); + + // Our function to read until the receiver is closed. + // It's OK to capture 'this' by value because 'this' owns the background thread and joins it + // before dtor. + std::function func = [this, fdref]() { + this->run_background_fillthread(std::move(*fdref)); + }; + + pthread_t fillthread{}; + if (!make_pthread(&fillthread, std::move(func))) { + wperror(L"make_pthread"); } - return ret > 0; + fillthread_ = fillthread; +} + +void io_buffer_t::complete_background_fillthread() { + ASSERT_IS_MAIN_THREAD(); + assert(fillthread_ && "Should have a fillthread"); + shutdown_fillthread_.store(true, std::memory_order_relaxed); + void *ignored = nullptr; + int err = pthread_join(*fillthread_, &ignored); + DIE_ON_FAILURE(err); + fillthread_.reset(); } shared_ptr io_bufferfill_t::create(const io_chain_t &conflicts, @@ -93,27 +149,39 @@ shared_ptr io_bufferfill_t::create(const io_chain_t &conflicts, if (!pipes) { return nullptr; } - // Our buffer will read from the read end of the pipe. This end must be non-blocking. This is - // because we retain the write end of the pipe in this process (even after handing it off to a - // child process); therefore a read on the pipe may block forever. What we should do is arrange - // for the write end of the pipe to be closed at the right time; then the read could just block. + // because our fillthread needs to poll to decide if it should shut down, and also accept input + // from direct buffer transfers. if (make_fd_nonblocking(pipes->read.fd())) { debug(1, PIPE_ERROR); wperror(L"fcntl"); return nullptr; } - - // Our buffer gets the read end of the pipe; out_pipe gets the write end. - auto buffer = std::make_shared(std::move(pipes->read), buffer_limit); + // Our fillthread gets the read end of the pipe; out_pipe gets the write end. + auto buffer = std::make_shared(buffer_limit); + buffer->begin_background_fillthread(std::move(pipes->read)); return std::make_shared(std::move(pipes->write), buffer); } +std::shared_ptr io_bufferfill_t::finish(std::shared_ptr &&filler) { + // The io filler is passed in. This typically holds the only instance of the write side of the + // pipe used by the buffer's fillthread (except for that side held by other processes). Get the + // buffer out of the bufferfill and clear the shared_ptr; this will typically widow the pipe. + // Then allow the buffer to finish. + assert(filler && "Null pointer in finish"); + auto buffer = filler->buffer(); + filler.reset(); + buffer->complete_background_fillthread(); + return buffer; +} + io_pipe_t::~io_pipe_t() = default; io_bufferfill_t::~io_bufferfill_t() = default; -io_buffer_t::~io_buffer_t() = default; +io_buffer_t::~io_buffer_t() { + assert(! fillthread_ && "io_buffer_t destroyed with outstanding fillthread"); +} void io_chain_t::remove(const shared_ptr &element) { // See if you can guess why std::find doesn't work here. diff --git a/src/io.h b/src/io.h index 6599790ff..fbc44c4fb 100644 --- a/src/io.h +++ b/src/io.h @@ -1,26 +1,21 @@ #ifndef FISH_IO_H #define FISH_IO_H +#include #include #include #include -#include -// Note that we have to include something to get any _LIBCPP_VERSION defined so we can detect libc++ -// So it's key that vector go above. If we didn't need vector for other reasons, we might include -// ciso646, which does nothing -#if defined(_LIBCPP_VERSION) || __cplusplus > 199711L -// C++11 or libc++ (which is a C++11-only library, but the memory header works OK in C++03) +#include #include -using std::shared_ptr; -#else -// C++03 or libstdc++ -#include -using std::tr1::shared_ptr; -#endif +#include +#include #include "common.h" #include "env.h" +#include "maybe.h" + +using std::shared_ptr; /// separated_buffer_t is composed of a sequence of elements, some of which may be explicitly /// separated (e.g. through string spit0) and some of which the separation is inferred. This enum @@ -244,6 +239,8 @@ class io_bufferfill_t : public io_data_t { public: void print() const override; + // The ctor is public to support make_shared() in the static create function below. + // Do not invoke this directly. io_bufferfill_t(autoclose_fd_t write_fd, std::shared_ptr buffer) : io_data_t(io_mode_t::bufferfill, STDOUT_FILENO), write_fd_(std::move(write_fd)), @@ -253,14 +250,19 @@ class io_bufferfill_t : public io_data_t { std::shared_ptr buffer() const { return buffer_; } + /// \return the fd that, when written to, fills the buffer. int write_fd() const { return write_fd_.fd(); } - /// Create an io_bufferfill_t which, when written from, populates a buffer (also created). + /// Create an io_bufferfill_t which, when written from, fills a buffer with the contents. /// \returns nullptr on failure, e.g. too many open fds. /// /// \param conflicts A set of IO redirections. The function ensures that any pipe it makes does /// not conflict with an fd redirection in this list. static shared_ptr create(const io_chain_t &conflicts, size_t buffer_limit = 0); + + /// Reset the receiver (possibly closing the write end of the pipe), and complete the fillthread + /// of the buffer. \return the buffer. + static std::shared_ptr finish(std::shared_ptr &&filler); }; class output_stream_t; @@ -269,19 +271,34 @@ class output_stream_t; /// It is not an io_data_t. class io_buffer_t { private: - /// fd from which to read. - autoclose_fd_t read_; + friend io_bufferfill_t; /// Buffer storing what we have read. separated_buffer_t buffer_; - /// Read some. Append it to our buffer. - /// \return positive if we read, 0 on EOF, -1 on error. - long read_some(); + /// Atomic flag indicating our fillthread should shut down. + std::atomic shutdown_fillthread_; + + /// The background fillthread itself, if any. + maybe_t fillthread_{}; + + /// Read limit of the buffer. + const size_t read_limit_; + + /// Lock for appending. + std::mutex append_lock_{}; + + /// Called in the background thread to run it. + void run_background_fillthread(autoclose_fd_t readfd); + + /// Begin the background fillthread operation, reading from the given fd. + void begin_background_fillthread(autoclose_fd_t readfd); + + /// End the background fillthread operation. + void complete_background_fillthread(); public: - explicit io_buffer_t(autoclose_fd_t read, size_t limit) - : read_(std::move(read)), buffer_(limit) { + explicit io_buffer_t(size_t limit) : buffer_(limit), read_limit_(limit) { // Explicitly reset the discard flag because we share this buffer. buffer_.reset_discard(); } @@ -289,17 +306,20 @@ class io_buffer_t { ~io_buffer_t(); /// Access the underlying buffer. - const separated_buffer_t &buffer() const { return buffer_; } + /// This requires that the background fillthread be none. + const separated_buffer_t &buffer() const { + assert(!fillthread_ && "Cannot access buffer during background fill"); + return buffer_; + } /// Function to append to the buffer. - void append(const char *ptr, size_t count) { buffer_.append(ptr, ptr + count); } + void append(const char *ptr, size_t count) { + scoped_lock locker(append_lock_); + buffer_.append(ptr, ptr + count); + } - /// Read from input pipe until EOF or EAGAIN (i.e. would block). - void read_to_wouldblock(); - - /// Read a bit, if our fd is readable, with the given timeout. - /// \return true if we read some, false on timeout. - bool try_read(unsigned long timeout_usec); + /// \return the read limit. + size_t read_limit() const { return read_limit_; } /// Appends data from a given output_stream_t. /// Marks the receiver as discarded if the stream was discarded. diff --git a/src/proc.cpp b/src/proc.cpp index a58217f66..a440d1cda 100644 --- a/src/proc.cpp +++ b/src/proc.cpp @@ -836,27 +836,6 @@ void proc_update_jiffies() { #endif -/// The return value of select_try(), indicating IO readiness or an error -enum class block_receive_try_t { - /// There is no buffer to select on. - NO_BUFFER, - /// We have a block buffer, and we read some. - DATA_READ, - /// We have a block buffer, but we were unable to read any. - TIMEOUT, -}; - -/// \return the last IO buffer in job j, or nullptr if none. -std::shared_ptr last_buffer(job_t *j) { - std::shared_ptr buff{}; - for (const auto &io : j->all_io_redirections()) { - if (io->io_mode == io_mode_t::bufferfill) { - buff = static_cast(io.get())->buffer(); - } - } - return buff; -} - // Return control of the terminal to a job's process group. restore_attrs is true if we are restoring // a previously-stopped job, in which case we need to restore terminal attributes. bool terminal_give_to_job(const job_t *j, bool restore_attrs) { @@ -1038,7 +1017,6 @@ void job_t::continue_job(bool send_sigcont) { } }); - bool read_attempted = false; if (!is_completed()) { if (get_flag(job_flag_t::TERMINAL) && is_foreground()) { // Put the job into the foreground and give it control of the terminal. @@ -1071,74 +1049,15 @@ void job_t::continue_job(bool send_sigcont) { } if (is_foreground()) { - // This is an optimization to not call select_try() in case a process has exited. While - // it may seem silly, unless there is IO (and there usually isn't in terms of total CPU - // time), select_try() will wait for 10ms (our timeout) before returning. If during - // these 10ms a process exited, the shell will basically hang until the timeout happens - // and we are free to call `process_mark_finished_children()` to discover that fact. By - // calling it here before calling `select_try()` below, shell responsiveness can be - // dramatically improved (noticably so, not just "theoretically speaking" per the - // discussion in #5219). - process_mark_finished_children(false); - - // If this is a child job and the parent job is still under construction (i.e. job1 | - // some_func), we can't block on execution of the nested job for `some_func`. Doing - // so can cause hangs if job1 emits more data than fits in the OS pipe buffer. - // The solution is to to not block on fg from the initial call in exec_job(), which - // is also the only place that send_sigcont is false. parent_job.is_constructed() - // must also be true, which coincides with WAIT_BY_PROCESS (which will have to do - // since we don't store a reference to the parent job in the job_t structure). - bool block_on_fg = send_sigcont && job_chain_is_fully_constructed(); - - // Wait for data to become available or the status of our own job to change + // Wait for the status of our own job to change. while (!reader_exit_forced() && !is_stopped() && !is_completed()) { - read_attempted = true; - auto read_result = block_receive_try_t::NO_BUFFER; - if (auto buff = last_buffer(this)) { - const unsigned long SELECT_TIMEOUT_USEC = 10000; - bool did_read = buff->try_read(SELECT_TIMEOUT_USEC); - read_result = did_read ? block_receive_try_t::DATA_READ : block_receive_try_t::TIMEOUT; - } - switch (read_result) { - case block_receive_try_t::DATA_READ: - // We read some data. - process_mark_finished_children(false); - break; - - case block_receive_try_t::TIMEOUT: - // We read some data or timed out. Poll for finished processes. - debug(4, L"select_try: no fds returned valid data within the timeout" ); - process_mark_finished_children(block_on_fg); - break; - - case block_receive_try_t::NO_BUFFER: - // We are not populating a buffer. - debug(4, L"select_try: no IO fds" ); - process_mark_finished_children(true); - - // If it turns out that we encountered this because the file descriptor we were - // reading from has died, process_mark_finished_children() should take care of - // changing the status of our is_completed() (assuming it is appropriate to do - // so), in which case we will break out of this loop. - break; - } + process_mark_finished_children(true); } } } if (is_foreground()) { if (is_completed()) { - // It's possible that the job will produce output and exit before we've even read from - // it. In that case, make sure we read that output now, before we've executed any - // subsequent calls. This is why prompt colors were getting screwed up - the builtin - // `echo` calls were sometimes having their output combined with the `set_color` calls - // in the wrong order! - if (!read_attempted) { - if (auto buff = last_buffer(this)) { - buff->read_to_wouldblock(); - } - } - // Set $status only if we are in the foreground and the last process in the job has // finished and is not a short-circuited builtin. auto &p = processes.back(); diff --git a/tests/count.err b/tests/count.err index a4d49599c..b9b02b750 100644 --- a/tests/count.err +++ b/tests/count.err @@ -10,3 +10,6 @@ #################### # args that look like flags or are otherwise special + +#################### +# big counts diff --git a/tests/count.in b/tests/count.in index ede959942..81b54ce7d 100644 --- a/tests/count.in +++ b/tests/count.in @@ -15,3 +15,15 @@ count --help count -- count -- abc count def -- abc + +logmsg big counts + +# See #5611 +for i in seq 500 + set c1 (count (seq 1 10000)) + test $c1 -eq 10000 + or begin + echo "Count $c1 is not 10000" + break + end +end diff --git a/tests/count.out b/tests/count.out index a81214195..f3c2bb10e 100644 --- a/tests/count.out +++ b/tests/count.out @@ -18,3 +18,6 @@ 1 2 3 + +#################### +# big counts