Use "internal" processes to write buffered output

This introduces "internal processes" which are backed by a pthread instead
of a normal process. Internal processes are reaped using the topic
machinery, plugging in neatly alongside the sigchld topic; this means that
process_mark_finished_children() can wait for internal and external
processes simultaneously.

Initially internal processes replace the forked process that fish uses to
write out the output of blocks and functions.
This commit is contained in:
ridiculousfish 2019-02-13 15:17:07 -08:00
parent 061f8f49c6
commit ada8ea954e
7 changed files with 171 additions and 55 deletions

View File

@ -307,6 +307,12 @@ void vec_append(std::vector<T> &receiver, std::vector<T> &&donator) {
std::make_move_iterator(donator.end()));
}
/// Move an object into a shared_ptr.
template <typename T>
std::shared_ptr<T> move_to_sharedptr(T &&v) {
return std::make_shared<T>(std::move(v));
}
/// Print a stack trace to stderr.
void show_stackframe(const wchar_t msg_level, int frame_count = 100, int skip_levels = 0);

View File

@ -34,6 +34,7 @@
#include "fallback.h" // IWYU pragma: keep
#include "function.h"
#include "io.h"
#include "iothread.h"
#include "parse_tree.h"
#include "parser.h"
#include "postfork.h"
@ -55,16 +56,6 @@
/// Base open mode to pass to calls to open.
#define OPEN_MASK 0666
/// Called in a forked child.
static void exec_write_and_exit(int fd, const char *buff, size_t count, int status) {
if (write_loop(fd, buff, count) == -1) {
debug(0, WRITE_ERROR);
wperror(L"write");
exit_without_destructors(status);
}
exit_without_destructors(status);
}
void exec_close(int fd) {
ASSERT_IS_MAIN_THREAD();
@ -361,6 +352,80 @@ static void on_process_created(const std::shared_ptr<job_t> &j, pid_t child_pid)
}
}
/// Construct an internal process for the process p. In the background, write the data \p outdata to
/// stdout, respecting the io chain \p ios. For example if target_fd is 1 (stdout), and there is a
/// dup2 3->1, then we need to write to fd 3. Then exit the internal process.
static bool run_internal_process(process_t *p, std::string outdata, io_chain_t ios) {
p->check_generations_before_launch();
// We want both the dup2s and the io_chain_ts to be kept alive by the background thread, because
// they may own an fd that we want to write to. Move them all to a shared_ptr. The strings as
// well (they may be long).
// Construct a little helper struct to make it simpler to move into our closure without copying.
struct write_fields_t {
int src_outfd{-1};
std::string outdata{};
io_chain_t ios{};
maybe_t<dup2_list_t> dup2s{};
std::shared_ptr<internal_proc_t> internal_proc{};
int success_status{};
bool skip_out() const { return outdata.empty() || src_outfd < 0; }
};
auto f = std::make_shared<write_fields_t>();
f->outdata = std::move(outdata);
// Construct and assign the internal process to the real process.
p->internal_proc_ = std::make_shared<internal_proc_t>();
f->internal_proc = p->internal_proc_;
// Resolve the IO chain.
// Note it's important we do this even if we have no out or err data, because we may have been
// asked to truncate a file (e.g. `echo -n '' > /tmp/truncateme.txt'). The open() in the dup2
// list resolution will ensure this happens.
f->dup2s = dup2_list_t::resolve_chain(ios);
if (!f->dup2s) {
return false;
}
// Figure out which source fds to write to. If they are closed (unlikely) we just exit
// successfully.
f->src_outfd = f->dup2s->fd_for_target_fd(STDOUT_FILENO);
// If we have nothing to right we can elide the thread.
// TODO: support eliding output to /dev/null.
if (f->skip_out()) {
f->internal_proc->mark_exited(EXIT_SUCCESS);
return true;
}
// Ensure that ios stays alive, it may own fds.
f->ios = ios;
// If our process is a builtin, it will have already set its status value. Make sure we
// propagate that if our I/O succeeds and don't read it on a background thread. TODO: have
// builtin_run provide this directly, rather than setting it in the process.
f->success_status = p->status;
iothread_perform([f]() {
int status = f->success_status;
if (!f->skip_out()) {
ssize_t ret = write_loop(f->src_outfd, f->outdata.data(), f->outdata.size());
if (ret < 0) {
if (errno != EPIPE) {
wperror(L"write");
}
if (!status) status = 1;
}
}
f->internal_proc->mark_exited(status);
});
return true;
}
/// Call fork() as part of executing a process \p p in a job \j. Execute \p child_action in the
/// context of the child. Returns true if fork succeeded, false if fork failed.
static bool fork_child_for_process(const std::shared_ptr<job_t> &job, process_t *p,
@ -784,24 +849,9 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr<job_t>
io_chain.remove(block_output_bufferfill);
auto block_output_buffer = io_bufferfill_t::finish(std::move(block_output_bufferfill));
// Resolve our IO chain to a sequence of dup2s.
auto dup2s = dup2_list_t::resolve_chain(io_chain);
if (!dup2s) {
return false;
}
const std::string buffer_contents = block_output_buffer->buffer().newline_serialized();
const char *buffer = buffer_contents.data();
size_t count = buffer_contents.size();
if (count > 0) {
// We don't have to drain threads here because our child process is simple.
const char *fork_reason =
p->type == INTERNAL_BLOCK_NODE ? "internal block io" : "internal function io";
if (!fork_child_for_process(j, p, *dup2s, false, fork_reason, [&] {
exec_write_and_exit(STDOUT_FILENO, buffer, count, status);
})) {
return false;
}
std::string buffer_contents = block_output_buffer->buffer().newline_serialized();
if (!buffer_contents.empty()) {
return run_internal_process(p, std::move(buffer_contents), io_chain);
} else {
if (p->is_last_in_job) {
proc_set_last_status(j->get_flag(job_flag_t::NEGATE) ? (!status) : status);

View File

@ -5106,7 +5106,7 @@ static void test_topic_monitor() {
constexpr auto t = topic_t::sigchld;
do_test(gens[t] == 0);
do_test(monitor.generation_for_topic(t) == 0);
auto changed = monitor.check(&gens, {t}, false /* wait */);
auto changed = monitor.check(&gens, topic_set_t{t}, false /* wait */);
do_test(changed.none());
do_test(gens[t] == 0);

View File

@ -13,6 +13,7 @@
#include "fallback.h" // IWYU pragma: keep
#include "io.h"
#include "iothread.h"
#include "redirection.h"
#include "wutil.h" // IWYU pragma: keep
io_data_t::~io_data_t() = default;
@ -122,7 +123,7 @@ void io_buffer_t::begin_background_fillthread(autoclose_fd_t fd) {
// We want our background thread to own the fd but it's not easy to move into a std::function.
// Use a shared_ptr.
auto fdref = std::make_shared<autoclose_fd_t>(std::move(fd));
auto fdref = move_to_sharedptr(std::move(fd));
// Our function to read until the receiver is closed.
// It's OK to capture 'this' by value because 'this' owns the background thread and joins it

View File

@ -247,6 +247,13 @@ bool job_t::signal(int signal) {
return true;
}
void internal_proc_t::mark_exited(int status) {
assert(!exited() && "Process is already exited");
exited_.store(true, std::memory_order_relaxed);
status_.store(status, std::memory_order_release);
topic_monitor_t::principal().post(topic_t::internal_exit);
}
static void mark_job_complete(const job_t *j) {
for (auto &p : j->processes) {
p->completed = 1;
@ -374,48 +381,63 @@ static void process_mark_finished_children(bool block_ok) {
// Get the exit and signal generations of all reapable processes.
// The exit generation tells us if we have an exit; the signal generation allows for detecting
// SIGHUP and SIGINT.
// Get the gen count of all reapable processes.
topic_set_t reaptopics{};
generation_list_t gens{};
gens.fill(invalid_generation);
job_iterator_t jobs;
while (auto *j = jobs.next()) {
for (const auto &proc : j->processes) {
if (j->can_reap(proc.get())) {
gens[topic_t::sigchld] =
std::min(gens[topic_t::sigchld], proc->gens_[topic_t::sigchld]);
if (auto mtopic = j->reap_topic_for_process(proc.get())) {
topic_t topic = *mtopic;
reaptopics.set(topic);
gens[topic] = std::min(gens[topic], proc->gens_[topic]);
reaptopics.set(topic_t::sighupint);
gens[topic_t::sighupint] =
std::min(gens[topic_t::sighupint], proc->gens_[topic_t::sighupint]);
}
}
}
if (gens[topic_t::sigchld] == invalid_generation) {
if (reaptopics.none()) {
// No reapable processes, nothing to wait for.
return;
}
// Now check for changes, optionally waiting.
topic_set_t topics{{topic_t::sigchld, topic_t::sighupint}};
auto changed_topics = topic_monitor_t::principal().check(&gens, topics, block_ok);
auto changed_topics = topic_monitor_t::principal().check(&gens, reaptopics, block_ok);
if (changed_topics.none()) return;
// We got some changes. Since we last checked we received SIGCHLD, and or HUP/INT.
// Update the hup/int generations and reap any reapable processes.
jobs.reset();
while (auto *j = jobs.next()) {
for (auto &proc : j->processes) {
// Update the signalhupint generation so we don't break on old sighupints.
proc->gens_[topic_t::sighupint] = gens[topic_t::sighupint];
for (const auto &proc : j->processes) {
if (auto mtopic = j->reap_topic_for_process(proc.get())) {
// Update the signal hup/int gen.
proc->gens_[topic_t::sighupint] = gens[topic_t::sighupint];
// Try reaping processes whose sigchld count is below what was returned.
if (changed_topics.get(topic_t::sigchld)) {
if (j->can_reap(proc.get()) &&
proc->gens_[topic_t::sigchld] < gens[topic_t::sigchld]) {
proc->gens_[topic_t::sigchld] = gens[topic_t::sigchld];
int status = 0;
auto pid = waitpid(proc->pid, &status, WNOHANG | WUNTRACED);
if (pid > 0) {
debug(4, "Reaped PID %d", pid);
handle_child_status(pid, status);
if (proc->gens_[*mtopic] < gens[*mtopic]) {
// Potentially reapable. Update its gen count and try reaping it.
proc->gens_[*mtopic] = gens[*mtopic];
if (proc->internal_proc_) {
// Try reaping an internal process.
if (proc->internal_proc_->exited()) {
proc->status = proc->internal_proc_->get_status();
proc->completed = true;
}
} else if (proc->pid > 0) {
// Try reaping an external process.
int status = -1;
auto pid = waitpid(proc->pid, &status, WNOHANG | WUNTRACED);
if (pid > 0) {
assert(pid == proc->pid && "Unexpcted waitpid() return");
debug(4, "Reaped PID %d", pid);
handle_child_status(pid, status);
}
} else {
assert(0 && "Don't know how to reap this process");
}
}
}

View File

@ -42,6 +42,28 @@ enum {
JOB_CONTROL_NONE,
};
/// A structure representing a "process" internal to fish. This is backed by a pthread instead of a
/// separate process.
class internal_proc_t {
/// Whether the process has exited.
std::atomic<bool> exited_{};
/// If the process has exited, its status code.
std::atomic<int> status_{};
public:
/// \return if this process has exited.
bool exited() const { return exited_.load(std::memory_order_relaxed); }
/// Mark this process as exited, with the given status.
void mark_exited(int status);
int get_status() const {
assert(exited() && "Process is not exited");
return status_.load(std::memory_order_acquire);
}
};
/// A structure representing a single fish process. Contains variables for tracking process state
/// and the process argument list. Actually, a fish process can be either a regular external
/// process, an internal builtin which may or may not spawn a fake IO process during execution, a
@ -126,6 +148,10 @@ class process_t {
/// Process ID
pid_t pid{0};
/// If we are an "internal process," that process.
std::shared_ptr<internal_proc_t> internal_proc_{};
/// File descriptor that pipe output should bind to.
int pipe_write_fd{0};
/// True if process has completed.
@ -214,15 +240,25 @@ class job_t {
/// process if it is the group leader and the job is not yet constructed, because then we might
/// also reap the process group and then we cannot add new processes to the group.
bool can_reap(const process_t *p) const {
if (p->pid <= 0) {
// Internal processes can always be reaped.
if (p->internal_proc_) {
return true;
} else if (p->pid <= 0) {
// Can't reap without a pid.
return false;
}
if (!is_constructed() && pgid > 0 && p->pid == pgid) {
} else if (!is_constructed() && pgid > 0 && p->pid == pgid) {
// p is the the group leader in an under-construction job.
return false;
} else {
return true;
}
return true;
}
/// \returns the reap topic for a process, which describes the manner in which we are reaped. A
/// none returns means don't reap, or perhaps defer reaping.
maybe_t<topic_t> reap_topic_for_process(const process_t *p) const {
if (p->completed || !can_reap(p)) return none();
return p->internal_proc_ ? topic_t::internal_exit : topic_t::sigchld;
}
/// Returns a truncated version of the job string. Used when a message has already been emitted

View File

@ -35,8 +35,9 @@
/// The list of topics that may be observed.
enum class topic_t : uint8_t {
sigchld, // Corresponds to SIGCHLD signal.
sighupint, // Corresponds to both SIGHUP and SIGINT signals.
sigchld, // Corresponds to SIGCHLD signal.
sighupint, // Corresponds to both SIGHUP and SIGINT signals.
internal_exit, // Corresponds to an internal process exit.
COUNT
};