From bcfc54fdaa2f0f76a6d7e20cf7e49677b669a3ff Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Wed, 29 Jul 2020 16:03:29 -0700 Subject: [PATCH] Do not buffer builtin output if avoidable builtins output to stdout and stderr via io_streams_t. Prior to this fix, it contained an output_stream_t which just wraps a buffer. So all builtin output went to this buffer (except for eval). Switch output_stream_t to become a new abstract class which can output to a buffer, file descriptor, or nowhere. This allows for example `string` to stream its output as it is produced, instead of buffering it. --- CHANGELOG.rst | 1 + src/builtin_string.cpp | 21 ++++----- src/exec.cpp | 56 ++++++++++++++++++++---- src/fish_tests.cpp | 14 +++--- src/io.cpp | 31 ++++++++++++-- src/io.h | 95 +++++++++++++++++++++++++++++++---------- src/parse_execution.cpp | 7 ++- 7 files changed, 174 insertions(+), 51 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index b228588fa..ad2abf746 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -20,6 +20,7 @@ Notable improvements and fixes - A new ``fish_add_path`` helper function to add paths to $PATH without producing duplicates, to be used interactively or in ``config.fish`` (#6960). - ``fish_preexec`` and ``fish_postexec`` events are no longer triggered for empty commands. - The ``test`` builtin now better shows where an error occured (#6030). +- builtins may now output before all data is read. For example, `string replace` no longer has to read all of stdin before it can begin to output. Syntax changes and new commands ------------------------------- diff --git a/src/builtin_string.cpp b/src/builtin_string.cpp index db8de1f11..e4b3c9a44 100644 --- a/src/builtin_string.cpp +++ b/src/builtin_string.cpp @@ -1185,7 +1185,6 @@ static int string_split_maybe0(parser_t &parser, io_streams_t &streams, int argc // Remove the last element if it is empty. if (splits.back().empty()) splits.pop_back(); } - auto &buff = streams.out.buffer(); if (opts.fields.size() > 0) { // Print nothing and return error if any of the supplied // fields do not exist, unless `--allow-empty` is used. @@ -1199,12 +1198,13 @@ static int string_split_maybe0(parser_t &parser, io_streams_t &streams, int argc } for (const auto &field : opts.fields) { if (field - 1 < (long)splits.size()) { - buff.append(splits.at(field - 1), separation_type_t::explicitly); + streams.out.append_with_separation(splits.at(field - 1), + separation_type_t::explicitly); } } } else { for (const wcstring &split : splits) { - buff.append(split, separation_type_t::explicitly); + streams.out.append_with_separation(split, separation_type_t::explicitly); } } } @@ -1228,20 +1228,21 @@ static int string_collect(parser_t &parser, io_streams_t &streams, int argc, wch int retval = parse_opts(&opts, &optind, 0, argc, argv, parser, streams); if (retval != STATUS_CMD_OK) return retval; - auto &buff = streams.out.buffer(); arg_iterator_t aiter(argv, optind, streams, /* don't split */ false); + size_t appended = 0; while (const wcstring *arg = aiter.nextstr()) { - auto begin = arg->cbegin(), end = arg->cend(); + const wchar_t *s = arg->c_str(); + size_t len = arg->size(); if (!opts.no_trim_newlines) { - while (end > begin && *(end - 1) == L'\n') { - --end; + while (len > 0 && s[len - 1] == L'\n') { + len -= 1; } } - - buff.append(begin, end, separation_type_t::explicitly); + streams.out.append_with_separation(s, len, separation_type_t::explicitly); + appended += len; } - return buff.size() > 0 ? STATUS_CMD_OK : STATUS_CMD_ERROR; + return appended > 0 ? STATUS_CMD_OK : STATUS_CMD_ERROR; } // Helper function to abstract the repeat logic from string_repeat diff --git a/src/exec.cpp b/src/exec.cpp index 57393b24f..a17e59c25 100644 --- a/src/exec.cpp +++ b/src/exec.cpp @@ -419,17 +419,47 @@ static bool exec_internal_builtin_proc(parser_t &parser, process_t *p, const io_ return true; // "success" } +/// \return an newly allocated output stream for the given fd, which is typically stdout or stderr. +/// This inspects the io_chain and decides what sort of output stream to return. +static std::unique_ptr create_output_stream_for_builtin(const parser_t &parser, + const io_chain_t &io_chain, + int fd) { + const shared_ptr io = io_chain.io_for_fd(fd); + if (io == nullptr) { + // Common case of no redirections. + // Just write to the fd directly. + return make_unique(fd); + } + switch (io->io_mode) { + case io_mode_t::bufferfill: + // Write to a buffer. + return make_unique(parser.libdata().read_limit); + + case io_mode_t::close: + return make_unique(); + + // TODO: reconsider these. + case io_mode_t::file: + case io_mode_t::pipe: + case io_mode_t::fd: + return make_unique(parser.libdata().read_limit); + } + DIE("Unreachable"); +} + /// Handle output from a builtin, by printing the contents of builtin_io_streams to the redirections /// given in io_chain. static bool handle_builtin_output(parser_t &parser, const std::shared_ptr &j, process_t *p, io_chain_t *io_chain, const io_streams_t &builtin_io_streams) { assert(p->type == process_type_t::builtin && "Process is not a builtin"); - const output_stream_t &stdout_stream = builtin_io_streams.out; - const output_stream_t &stderr_stream = builtin_io_streams.err; + const separated_buffer_t *output_buffer = + builtin_io_streams.out.get_separated_buffer(); + const separated_buffer_t *errput_buffer = + builtin_io_streams.err.get_separated_buffer(); // Mark if we discarded output. - if (stdout_stream.buffer().discarded()) { + if (output_buffer && output_buffer->discarded()) { p->status = proc_status_t::from_exit_code(STATUS_READ_TOO_MUCH); } @@ -446,15 +476,21 @@ static bool handle_builtin_output(parser_t &parser, const std::shared_ptr // doesn't also produce stderr. Also note that we never send stderr to a buffer, so there's no // need for a similar check for stderr. bool stdout_done = false; - if (stdout_io && stdout_io->io_mode == io_mode_t::bufferfill) { + if (output_buffer && stdout_io && stdout_io->io_mode == io_mode_t::bufferfill) { auto stdout_buffer = dynamic_cast(stdout_io.get())->buffer(); - stdout_buffer->append_from_stream(stdout_stream); + stdout_buffer->append_from_wide_buffer(*output_buffer); stdout_done = true; } // Figure out any data remaining to write. We may have none in which case we can short-circuit. - std::string outbuff = stdout_done ? std::string{} : wcs2string(stdout_stream.contents()); - std::string errbuff = wcs2string(stderr_stream.contents()); + std::string outbuff; + if (output_buffer && !stdout_done) { + outbuff = wcs2string(output_buffer->newline_serialized()); + } + std::string errbuff; + if (errput_buffer) { + errbuff = wcs2string(errput_buffer->newline_serialized()); + } // If we have no redirections for stdout/stderr, just write them directly. if (!stdout_io && !stderr_io) { @@ -819,7 +855,11 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, const std::share } case process_type_t::builtin: { - io_streams_t builtin_io_streams{parser.libdata().read_limit}; + std::unique_ptr output_stream = + create_output_stream_for_builtin(parser, process_net_io_chain, STDOUT_FILENO); + std::unique_ptr errput_stream = + create_output_stream_for_builtin(parser, process_net_io_chain, STDERR_FILENO); + io_streams_t builtin_io_streams{*output_stream, *errput_stream}; builtin_io_streams.job_group = j->group; if (!exec_internal_builtin_proc(parser, p, pipe_read.get(), process_net_io_chain, builtin_io_streams)) { diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index db558b8fe..34a783e3c 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -2533,7 +2533,8 @@ static bool run_one_test_test(int expected, wcstring_list_t &lst, bool bracket) i++; } argv[i + 1] = NULL; - io_streams_t streams(0); + null_output_stream_t null{}; + io_streams_t streams(null, null); int result = builtin_test(parser, streams, argv); if (expected != result) err(L"expected builtin_test() to return %d, got %d", expected, result); @@ -2565,7 +2566,8 @@ static bool run_test_test(int expected, const wcstring &str) { static void test_test_brackets() { // Ensure [ knows it needs a ]. parser_t &parser = parser_t::principal_parser(); - io_streams_t streams(0); + null_output_stream_t null{}; + io_streams_t streams(null, null); null_terminated_array_t args; @@ -5100,7 +5102,9 @@ int builtin_string(parser_t &parser, io_streams_t &streams, wchar_t **argv); static void run_one_string_test(const wchar_t *const *argv, int expected_rc, const wchar_t *expected_out) { parser_t &parser = parser_t::principal_parser(); - io_streams_t streams(0); + buffered_output_stream_t outs{0}; + null_output_stream_t errs{}; + io_streams_t streams(outs, errs); streams.stdin_is_directly_redirected = false; // read from argv instead of stdin int rc = builtin_string(parser, streams, const_cast(argv)); wcstring args; @@ -5111,10 +5115,10 @@ static void run_one_string_test(const wchar_t *const *argv, int expected_rc, if (rc != expected_rc) { err(L"Test failed on line %lu: [%ls]: expected return code %d but got %d", __LINE__, args.c_str(), expected_rc, rc); - } else if (streams.out.contents() != expected_out) { + } else if (outs.contents() != expected_out) { err(L"Test failed on line %lu: [%ls]: expected [%ls] but got [%ls]", __LINE__, args.c_str(), escape_string(expected_out, ESCAPE_ALL).c_str(), - escape_string(streams.out.contents(), ESCAPE_ALL).c_str()); + escape_string(outs.contents(), ESCAPE_ALL).c_str()); } } diff --git a/src/io.cpp b/src/io.cpp index 28186f1ae..584c10b61 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -57,8 +57,7 @@ void io_bufferfill_t::print() const { std::fwprintf(stderr, L"bufferfill %d -> %d\n", write_fd_.fd(), fd); } -void io_buffer_t::append_from_stream(const output_stream_t &stream) { - const separated_buffer_t &input = stream.buffer(); +void io_buffer_t::append_from_wide_buffer(const separated_buffer_t &input) { if (input.elements().empty() && !input.discarded()) return; scoped_lock locker(append_lock_); if (buffer_.discarded()) return; @@ -349,6 +348,32 @@ shared_ptr io_chain_t::io_for_fd(int fd) const { void output_stream_t::append_narrow_buffer(const separated_buffer_t &buffer) { for (const auto &rhs_elem : buffer.elements()) { - buffer_.append(str2wcstring(rhs_elem.contents), rhs_elem.separation); + append_with_separation(str2wcstring(rhs_elem.contents), rhs_elem.separation); } } + +void output_stream_t::append_with_separation(const wchar_t *s, size_t len, separation_type_t type) { + append(s, len); + if (type == separation_type_t::explicitly) { + append(L'\n'); + } +} + +void fd_output_stream_t::append(const wchar_t *s, size_t amt) { + if (errored_) return; + int res = wwrite_to_fd(s, amt, this->fd_); + if (res < 0) { + // TODO: this error is too aggressive, e.g. if we got SIGINT we should not complain. + wperror(L"write"); + errored_ = true; + } +} + +void null_output_stream_t::append(const wchar_t *, size_t) {} + +void buffered_output_stream_t::append(const wchar_t *s, size_t amt) { buffer_.append(s, s + amt); } + +void buffered_output_stream_t::append_with_separation(const wchar_t *s, size_t len, + separation_type_t type) { + buffer_.append(s, s + len, type); +} diff --git a/src/io.h b/src/io.h index d2c039589..f31c73e97 100644 --- a/src/io.h +++ b/src/io.h @@ -344,9 +344,9 @@ class io_buffer_t { buffer_.append(ptr, ptr + count); } - /// Appends data from a given output_stream_t. - /// Marks the receiver as discarded if the stream was discarded. - void append_from_stream(const output_stream_t &stream); + /// Appends data from a given separated buffer. + /// Marks the receiver as discarded if the buffer was discarded. + void append_from_wide_buffer(const separated_buffer_t &input); }; using io_data_ref_t = std::shared_ptr; @@ -398,36 +398,36 @@ maybe_t make_autoclose_pipes(const fd_set_t &fdset); /// cloexec. \returns invalid fd on failure (in which case the given fd is still closed). autoclose_fd_t move_fd_to_unused(autoclose_fd_t fd, const fd_set_t &fdset); -/// Class representing the output that a builtin can generate. +/// Base class representing the output that a builtin can generate. +/// This has various subclasses depending on the ultimate output destination. class output_stream_t { - private: - /// Storage for our data. - separated_buffer_t buffer_; - - // No copying. - output_stream_t(const output_stream_t &s) = delete; - void operator=(const output_stream_t &s) = delete; - public: - output_stream_t(size_t buffer_limit) : buffer_(buffer_limit) {} + /// Required override point. The output stream receives a string \p s with \p amt chars. + virtual void append(const wchar_t *s, size_t amt) = 0; - void append(const wcstring &s) { buffer_.append(s.begin(), s.end()); } + /// \return the separated buffer if this holds one, otherwise nullptr. + virtual const separated_buffer_t *get_separated_buffer() const { return nullptr; } - separated_buffer_t &buffer() { return buffer_; } + /// An optional override point. This is for explicit separation. + virtual void append_with_separation(const wchar_t *s, size_t len, separation_type_t type); - const separated_buffer_t &buffer() const { return buffer_; } + /// The following are all convenience overrides. + void append_with_separation(const wcstring &s, separation_type_t type) { + append_with_separation(s.data(), s.size(), type); + } + /// Append a string. + void append(const wcstring &s) { append(s.data(), s.size()); } void append(const wchar_t *s) { append(s, std::wcslen(s)); } + /// Append a char. void append(wchar_t s) { append(&s, 1); } - - void append(const wchar_t *s, size_t amt) { buffer_.append(s, s + amt); } + void push_back(wchar_t c) { append(c); } // Append data from a narrow buffer, widening it. void append_narrow_buffer(const separated_buffer_t &buffer); - void push_back(wchar_t c) { append(c); } - + /// Append a format string. void append_format(const wchar_t *format, ...) { va_list va; va_start(va, format); @@ -437,12 +437,61 @@ class output_stream_t { void append_formatv(const wchar_t *format, va_list va) { append(vformat_string(format, va)); } + // No copying. + output_stream_t(const output_stream_t &s) = delete; + void operator=(const output_stream_t &s) = delete; + + output_stream_t() = default; + virtual ~output_stream_t() = default; +}; + +/// A null output stream which ignores all writes. +class null_output_stream_t final : public output_stream_t { + virtual void append(const wchar_t *s, size_t amt) override; +}; + +/// An output stream for builtins which outputs to an fd. +/// Note the fd may be something like stdout; there is no ownership implied here. +class fd_output_stream_t final : public output_stream_t { + public: + /// Construct from a file descriptor, which must be nonegative. + explicit fd_output_stream_t(int fd) : fd_(fd) { assert(fd_ >= 0 && "Invalid fd"); } + + void append(const wchar_t *s, size_t amt) override; + + private: + /// The file descriptor to write to. + const int fd_; + + /// Whether we have received an error. + bool errored_{false}; +}; + +/// An output stream for builtins which buffers into a separated buffer. +class buffered_output_stream_t final : public output_stream_t { + public: + explicit buffered_output_stream_t(size_t buffer_limit) : buffer_(buffer_limit) {} + + void append(const wchar_t *s, size_t amt) override; + void append_with_separation(const wchar_t *s, size_t len, separation_type_t type) override; + wcstring contents() const { return buffer_.newline_serialized(); } + + /// Access the buffer. + separated_buffer_t &buffer() { return buffer_; } + const separated_buffer_t &buffer() const { return buffer_; } + + const separated_buffer_t *get_separated_buffer() const override { return &buffer_; } + + private: + /// Storage for our data. + separated_buffer_t buffer_; }; struct io_streams_t { - output_stream_t out; - output_stream_t err; + // Streams for out and err. + output_stream_t &out; + output_stream_t &err; // fd representing stdin. This is not closed by the destructor. int stdin_fd{-1}; @@ -473,7 +522,7 @@ struct io_streams_t { io_streams_t(const io_streams_t &) = delete; void operator=(const io_streams_t &) = delete; - explicit io_streams_t(size_t read_limit) : out(read_limit), err(read_limit), stdin_fd(-1) {} + io_streams_t(output_stream_t &out, output_stream_t &err) : out(out), err(err) {} }; #endif diff --git a/src/parse_execution.cpp b/src/parse_execution.cpp index cdc3495e0..5e383eb8f 100644 --- a/src/parse_execution.cpp +++ b/src/parse_execution.cpp @@ -388,11 +388,14 @@ end_execution_reason_t parse_execution_context_t::run_function_statement( return result; } trace_if_enabled(*parser, L"function", arguments); - io_streams_t streams(0); // no limit on the amount of output from builtin_function() + // no limit on the amount of output from builtin_function() + buffered_output_stream_t outs(0); + buffered_output_stream_t errs(0); + io_streams_t streams(outs, errs); int err = builtin_function(*parser, streams, arguments, pstree, statement); parser->set_last_statuses(statuses_t::just(err)); - wcstring errtext = streams.err.contents(); + wcstring errtext = errs.contents(); if (!errtext.empty()) { return this->report_error(err, header, L"%ls", errtext.c_str()); }