diff --git a/src/builtin_eval.cpp b/src/builtin_eval.cpp index d095cf61b..e9b0512b3 100644 --- a/src/builtin_eval.cpp +++ b/src/builtin_eval.cpp @@ -75,11 +75,11 @@ maybe_t builtin_eval(parser_t &parser, io_streams_t &streams, wchar_t **arg ios.clear(); if (stdout_fill) { std::shared_ptr output = io_bufferfill_t::finish(std::move(stdout_fill)); - streams.out.append_narrow_buffer(output->buffer()); + streams.out.append_narrow_buffer(output->take_buffer()); } if (stderr_fill) { std::shared_ptr errput = io_bufferfill_t::finish(std::move(stderr_fill)); - streams.err.append_narrow_buffer(errput->buffer()); + streams.err.append_narrow_buffer(errput->take_buffer()); } return status; } diff --git a/src/exec.cpp b/src/exec.cpp index 3cee39824..9677fb591 100644 --- a/src/exec.cpp +++ b/src/exec.cpp @@ -691,7 +691,7 @@ static launch_result_t exec_block_or_func_process(parser_t &parser, const std::s // claimed it (background write) or another process has inherited it. io_chain.remove(block_output_bufferfill); auto block_output_buffer = io_bufferfill_t::finish(std::move(block_output_bufferfill)); - buffer_contents = block_output_buffer->buffer().newline_serialized(); + buffer_contents = block_output_buffer->take_buffer().newline_serialized(); } run_internal_process_or_short_circuit(parser, j, p, std::move(buffer_contents), @@ -1036,9 +1036,10 @@ bool exec_job(parser_t &parser, const shared_ptr &j, const io_chain_t &bl } /// Populate \p lst with the output of \p buffer, perhaps splitting lines according to \p split. -static void populate_subshell_output(wcstring_list_t *lst, const io_buffer_t &buffer, bool split) { +static void populate_subshell_output(wcstring_list_t *lst, const separated_buffer_t &buffer, + bool split) { // Walk over all the elements. - for (const auto &elem : buffer.buffer().elements()) { + for (const auto &elem : buffer.elements()) { if (elem.is_explicitly_separated()) { // Just append this one. lst->push_back(str2wcstring(elem.contents)); @@ -1109,8 +1110,8 @@ static int exec_subshell_internal(const wcstring &cmd, parser_t &parser, return STATUS_CMD_ERROR; } eval_res_t eval_res = parser.eval(cmd, io_chain_t{bufferfill}, job_group, block_type_t::subst); - std::shared_ptr buffer = io_bufferfill_t::finish(std::move(bufferfill)); - if (buffer->buffer().discarded()) { + separated_buffer_t buffer = io_bufferfill_t::finish(std::move(bufferfill))->take_buffer(); + if (buffer.discarded()) { *break_expand = true; return STATUS_READ_TOO_MUCH; } @@ -1121,7 +1122,7 @@ static int exec_subshell_internal(const wcstring &cmd, parser_t &parser, } if (lst) { - populate_subshell_output(lst, *buffer, split_output); + populate_subshell_output(lst, buffer, split_output); } *break_expand = false; return eval_res.status.status_value(); diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index fcd1b7c70..7bab36104 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -1302,10 +1302,10 @@ static void test_1_cancellation(const wchar_t *src) { pthread_kill(thread, SIGINT); }); eval_res_t res = parser_t::principal_parser().eval(src, io_chain_t{filler}); - auto buffer = io_bufferfill_t::finish(std::move(filler)); - if (buffer->buffer().size() != 0) { + auto buffer = io_bufferfill_t::finish(std::move(filler))->take_buffer(); + if (buffer.size() != 0) { err(L"Expected 0 bytes in out_buff, but instead found %lu bytes, for command %ls\n", - buffer->buffer().size(), src); + buffer.size(), src); } do_test(res.status.signal_exited() && res.status.signal_code() == SIGINT); iothread_drain_all(); diff --git a/src/io.cpp b/src/io.cpp index 933a0e856..afbca1d91 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -57,23 +57,22 @@ void io_bufferfill_t::print() const { std::fwprintf(stderr, L"bufferfill %d -> %d\n", write_fd_.fd(), fd); } -ssize_t io_buffer_t::read_once(int fd) { +ssize_t io_buffer_t::read_once(int fd, acquired_lock &buffer) { assert(fd >= 0 && "Invalid fd"); - ASSERT_IS_LOCKED(append_lock_); errno = 0; - char buff[4096 * 4]; + char bytes[4096 * 4]; // We want to swallow EINTR only; in particular EAGAIN needs to be returned back to the caller. - ssize_t ret; + ssize_t amt; do { - ret = read(fd, buff, sizeof buff); - } while (ret < 0 && errno == EINTR); - if (ret < 0 && errno != EAGAIN) { + amt = read(fd, bytes, sizeof bytes); + } while (amt < 0 && errno == EINTR); + if (amt < 0 && errno != EAGAIN) { wperror(L"read"); - } else if (ret > 0) { - buffer_.append(buff, static_cast(ret)); + } else if (amt > 0) { + buffer->append(bytes, static_cast(amt)); } - return ret; + return amt; } void io_buffer_t::begin_filling(autoclose_fd_t fd) { @@ -116,16 +115,16 @@ void io_buffer_t::begin_filling(autoclose_fd_t fd) { bool done = false; if (reason == item_wake_reason_t::readable) { // select() reported us as readable; read a bit. - scoped_lock locker(append_lock_); - ssize_t ret = read_once(fd.fd()); + auto buffer = buffer_.acquire(); + ssize_t ret = read_once(fd.fd(), buffer); done = (ret == 0 || (ret < 0 && errno != EAGAIN)); } else if (shutdown_fillthread_) { // Here our caller asked us to shut down; read while we keep getting data. // This will stop when the fd is closed or if we get EAGAIN. - scoped_lock locker(append_lock_); + auto buffer = buffer_.acquire(); ssize_t ret; do { - ret = read_once(fd.fd()); + ret = read_once(fd.fd(), buffer); } while (ret > 0); done = true; } diff --git a/src/io.h b/src/io.h index b69be48c0..59807cd81 100644 --- a/src/io.h +++ b/src/io.h @@ -74,6 +74,11 @@ class separated_buffer_t { separated_buffer_t(const separated_buffer_t &) = delete; void operator=(const separated_buffer_t &) = delete; + /// We may be moved. + /// Note this leaves the moved-from value in a bogus state, until clear() is called on it. + separated_buffer_t(separated_buffer_t &&rhs) = default; + separated_buffer_t &operator=(separated_buffer_t &&) = default; + /// Construct a separated_buffer_t with the given buffer limit \p limit, or 0 for no limit. separated_buffer_t(size_t limit) : buffer_limit_(limit) {} @@ -125,6 +130,13 @@ class separated_buffer_t { } } + /// Remove all elements and unset the discard flag. + void clear() { + elements_.clear(); + contents_size_ = 0; + discard_ = false; + } + private: /// \return true if our last element has an inferred separation type. bool last_inferred() const { @@ -146,8 +158,7 @@ class separated_buffer_t { if (discard_) return false; size_t proposed_size = contents_size_ + delta; if ((proposed_size < delta) || (buffer_limit_ > 0 && proposed_size > buffer_limit_)) { - elements_.clear(); - contents_size_ = 0; + clear(); discard_ = true; return false; } @@ -305,29 +316,29 @@ public: ~io_buffer_t(); - /// Access the underlying buffer. - /// This requires that the background fillthread be none. - const separated_buffer_t &buffer() const { + /// Take the underlying buffer, transferring ownership to the caller. + /// This should only be called after the fillthread operation is complete. + separated_buffer_t take_buffer() { assert(!fillthread_running() && "Cannot access buffer during background fill"); - return buffer_; + auto locked_buff = buffer_.acquire(); + separated_buffer_t result = std::move(*locked_buff); + locked_buff->clear(); + return result; } /// Append a string to the buffer. void append(std::string &&str, separation_type_t type = separation_type_t::inferred) { - scoped_lock locker(append_lock_); - buffer_.append(std::move(str), type); + buffer_.acquire()->append(std::move(str), type); } /// \return true if output was discarded due to exceeding the read limit. - bool discarded() const { - scoped_lock locker(append_lock_); - return buffer_.discarded(); - } + bool discarded() { return buffer_.acquire()->discarded(); } private: - /// Read some, filling the buffer. The append lock must be held. - /// \return positive on success, 0 if closed, -1 on error (in which case errno will be set). - ssize_t read_once(int fd); + /// Read some, filling the buffer. The buffer is passed in to enforce that the append lock is + /// held. \return positive on success, 0 if closed, -1 on error (in which case errno will be + /// set). + ssize_t read_once(int fd, acquired_lock &buff); /// Begin the fill operation, reading from the given fd in the background. void begin_filling(autoclose_fd_t readfd); @@ -338,10 +349,8 @@ public: /// Helper to return whether the fillthread is running. bool fillthread_running() const { return fillthread_waiter_.valid(); } - friend io_bufferfill_t; - /// Buffer storing what we have read. - separated_buffer_t buffer_; + owning_lock buffer_; /// Atomic flag indicating our fillthread should shut down. relaxed_atomic_bool_t shutdown_fillthread_{false}; @@ -353,8 +362,7 @@ public: /// The item id of our background fillthread fd monitor item. uint64_t item_id_{0}; - /// Lock for appending. Mutable since we take it in const functions. - mutable std::mutex append_lock_{}; + friend io_bufferfill_t; }; using io_data_ref_t = std::shared_ptr;