Teach each job about its parent

The parent of a job is the parent pipeline that executed the function or
block corresponding to this job. This will help simplify
process_mark_finished_children().
This commit is contained in:
ridiculousfish 2018-11-04 00:58:44 -07:00
parent 93aa95d8c4
commit 3770d9fb7a
8 changed files with 63 additions and 56 deletions

View File

@ -309,7 +309,7 @@ static bool io_transmogrify(const io_chain_t &in_chain, io_chain_t *out_chain,
/// \param ios the io redirections to be performed on this block
template <typename T>
void internal_exec_helper(parser_t &parser, parsed_source_ref_t parsed_source, tnode_t<T> node,
const io_chain_t &ios) {
const io_chain_t &ios, std::shared_ptr<job_t> parent_job) {
assert(parsed_source && node && "exec_helper missing source or without node");
io_chain_t morphed_chain;
@ -322,7 +322,7 @@ void internal_exec_helper(parser_t &parser, parsed_source_ref_t parsed_source, t
return;
}
parser.eval_node(parsed_source, node, morphed_chain, TOP);
parser.eval_node(parsed_source, node, morphed_chain, TOP, parent_job);
morphed_chain.clear();
io_cleanup_fds(opened_fds);
@ -336,7 +336,8 @@ void internal_exec_helper(parser_t &parser, parsed_source_ref_t parsed_source, t
// Furthermore, to avoid the race between the caller calling tcsetpgrp() and the client checking the
// foreground process group, we don't use posix_spawn if we're going to foreground the process. (If
// we use fork(), we can call tcsetpgrp after the fork, before the exec, and avoid the race).
static bool can_use_posix_spawn_for_job(const job_t *job, const process_t *process) {
static bool can_use_posix_spawn_for_job(const std::shared_ptr<job_t> &job,
const process_t *process) {
if (job->get_flag(job_flag_t::JOB_CONTROL)) { //!OCLINT(collapsible if statements)
// We are going to use job control; therefore when we launch this job it will get its own
// process group ID. But will it be foregrounded?
@ -387,7 +388,7 @@ void internal_exec(job_t *j, const io_chain_t &&all_ios) {
}
}
static void on_process_created(job_t *j, pid_t child_pid) {
static void on_process_created(const std::shared_ptr<job_t> &j, pid_t child_pid) {
// We only need to do this the first time a child is forked/spawned
if (j->pgid != INVALID_PID) {
return;
@ -402,15 +403,16 @@ static void on_process_created(job_t *j, pid_t child_pid) {
/// Call fork() as part of executing a process \p p in a job \j. Execute \p child_action in the
/// context of the child. Returns true if fork succeeded, false if fork failed.
static bool fork_child_for_process(job_t *j, process_t *p, const io_chain_t &io_chain,
bool drain_threads, const char *fork_type,
static bool fork_child_for_process(const std::shared_ptr<job_t> &job, process_t *p,
const io_chain_t &io_chain, bool drain_threads,
const char *fork_type,
const std::function<void()> &child_action) {
pid_t pid = execute_fork(drain_threads);
if (pid == 0) {
// This is the child process. Setup redirections, print correct output to
// stdout and stderr, and then exit.
p->pid = getpid();
child_set_group(j, p);
child_set_group(job.get(), p);
setup_child_process(p, io_chain);
child_action();
DIE("Child process returned control to fork_child lambda!");
@ -418,7 +420,7 @@ static bool fork_child_for_process(job_t *j, process_t *p, const io_chain_t &io_
if (pid < 0) {
debug(1, L"Failed to fork %s!\n", fork_type);
job_mark_process_as_failed(j, p);
job_mark_process_as_failed(job, p);
return false;
}
@ -427,9 +429,9 @@ static bool fork_child_for_process(job_t *j, process_t *p, const io_chain_t &io_
debug(4, L"Fork #%d, pid %d: %s for '%ls'", g_fork_count, pid, fork_type, p->argv0());
p->pid = pid;
on_process_created(j, p->pid);
set_child_group(j, p->pid);
maybe_assign_terminal(j);
on_process_created(job, p->pid);
set_child_group(job.get(), p->pid);
maybe_assign_terminal(job.get());
return true;
}
@ -437,9 +439,9 @@ static bool fork_child_for_process(job_t *j, process_t *p, const io_chain_t &io_
/// job corresponding to a builtin, execute the builtin with the given streams. If pipe_read is set,
/// assign stdin to it; otherwise infer stdin from the IO chain.
/// \return true on success, false if there is an exec error.
static bool exec_internal_builtin_proc(parser_t &parser, job_t *j, process_t *p,
const io_pipe_t *pipe_read, const io_chain_t &proc_io_chain,
io_streams_t &streams) {
static bool exec_internal_builtin_proc(parser_t &parser, const std::shared_ptr<job_t> &j,
process_t *p, const io_pipe_t *pipe_read,
const io_chain_t &proc_io_chain, io_streams_t &streams) {
assert(p->type == INTERNAL_BUILTIN && "Process must be a builtin");
int local_builtin_stdin = STDIN_FILENO;
bool close_stdin = false;
@ -544,8 +546,8 @@ static bool exec_internal_builtin_proc(parser_t &parser, job_t *j, process_t *p,
/// Handle output from a builtin, by printing the contents of builtin_io_streams to the redirections
/// given in io_chain.
static bool handle_builtin_output(job_t *j, process_t *p, io_chain_t *io_chain,
const io_streams_t &builtin_io_streams) {
static bool handle_builtin_output(const std::shared_ptr<job_t> &j, process_t *p,
io_chain_t *io_chain, const io_streams_t &builtin_io_streams) {
assert(p->type == INTERNAL_BUILTIN && "Process is not a builtin");
// Handle output from builtin commands. In the general case, this means forking of a
// worker process, that will write out the contents of the stdout and stderr buffers
@ -646,7 +648,8 @@ static bool handle_builtin_output(job_t *j, process_t *p, io_chain_t *io_chain,
/// Executes an external command.
/// \return true on success, false if there is an exec error.
static bool exec_external_command(job_t *j, process_t *p, const io_chain_t &proc_io_chain) {
static bool exec_external_command(const std::shared_ptr<job_t> &j, process_t *p,
const io_chain_t &proc_io_chain) {
assert(p->type == EXTERNAL && "Process is not external");
// Get argv and envv before we fork.
null_terminated_array_t<char> argv_array;
@ -675,7 +678,8 @@ static bool exec_external_command(job_t *j, process_t *p, const io_chain_t &proc
pid_t pid = 0;
posix_spawnattr_t attr = posix_spawnattr_t();
posix_spawn_file_actions_t actions = posix_spawn_file_actions_t();
bool made_it = fork_actions_make_spawn_properties(&attr, &actions, j, p, proc_io_chain);
bool made_it =
fork_actions_make_spawn_properties(&attr, &actions, j.get(), p, proc_io_chain);
if (made_it) {
// We successfully made the attributes and actions; actually call
// posix_spawn.
@ -722,7 +726,7 @@ static bool exec_external_command(job_t *j, process_t *p, const io_chain_t &proc
// https://github.com/Microsoft/WSL/issues/2997 And confirmation that this persists
// past glibc 2.24+ here: https://github.com/fish-shell/fish-shell/issues/4715
if (j->get_flag(job_flag_t::JOB_CONTROL) && getpgid(p->pid) != j->pgid) {
set_child_group(j, p->pid);
set_child_group(j.get(), p->pid);
}
#else
// In do_fork, the pid of the child process is used as the group leader if j->pgid
@ -733,7 +737,7 @@ static bool exec_external_command(job_t *j, process_t *p, const io_chain_t &proc
}
#endif
maybe_assign_terminal(j);
maybe_assign_terminal(j.get());
} else
#endif
{
@ -749,7 +753,7 @@ static bool exec_external_command(job_t *j, process_t *p, const io_chain_t &proc
/// Execute a block node or function "process".
/// \p user_ios contains the list of user-specified ios, used so we can avoid stomping on them with
/// our pipes. \return true on success, false on error.
static bool exec_block_or_func_process(parser_t &parser, job_t *j, process_t *p,
static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr<job_t> j, process_t *p,
const io_chain_t &user_ios, io_chain_t io_chain) {
assert((p->type == INTERNAL_FUNCTION || p->type == INTERNAL_BLOCK_NODE) &&
"Unexpected process type");
@ -786,14 +790,14 @@ static bool exec_block_or_func_process(parser_t &parser, job_t *j, process_t *p,
function_prepare_environment(func_name, p->get_argv() + 1, inherit_vars);
parser.forbid_function(func_name);
internal_exec_helper(parser, props->parsed_source, props->body_node, io_chain);
internal_exec_helper(parser, props->parsed_source, props->body_node, io_chain, j);
parser.allow_function();
parser.pop_block(fb);
} else {
assert(p->type == INTERNAL_BLOCK_NODE);
assert(p->block_node_source && p->internal_block_node && "Process is missing node info");
internal_exec_helper(parser, p->block_node_source, p->internal_block_node, io_chain);
internal_exec_helper(parser, p->block_node_source, p->internal_block_node, io_chain, j);
}
int status = proc_get_last_status();
@ -840,7 +844,7 @@ static bool exec_block_or_func_process(parser_t &parser, job_t *j, process_t *p,
/// Executes a process \p in job \j, using the read pipe \p pipe_current_read.
/// If the process pipes to a command, the read end of the created pipe is returned in
/// out_pipe_next_read. \returns true on success, false on exec error.
static bool exec_process_in_job(parser_t &parser, process_t *p, job_t *j,
static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr<job_t> j,
autoclose_fd_t pipe_current_read,
autoclose_fd_t *out_pipe_next_read, const io_chain_t &all_ios,
size_t stdout_read_limit) {
@ -1068,7 +1072,7 @@ void exec_job(parser_t &parser, shared_ptr<job_t> j) {
if (!io_buffer->avoid_conflicts_with_io_chain(all_ios)) {
// We could not avoid conflicts, probably due to fd exhaustion. Mark an error.
exec_error = true;
job_mark_process_as_failed(j.get(), j->processes.front().get());
job_mark_process_as_failed(j, j->processes.front().get());
break;
}
}
@ -1088,7 +1092,7 @@ void exec_job(parser_t &parser, shared_ptr<job_t> j) {
autoclose_fd_t pipe_next_read;
for (std::unique_ptr<process_t> &unique_p : j->processes) {
autoclose_fd_t current_read = std::move(pipe_next_read);
if (!exec_process_in_job(parser, unique_p.get(), j.get(), std::move(current_read),
if (!exec_process_in_job(parser, unique_p.get(), j, std::move(current_read),
&pipe_next_read, all_ios, stdout_read_limit)) {
exec_error = true;
break;

View File

@ -11,19 +11,7 @@
/// Pipe redirection error message.
#define PIPE_ERROR _(L"An error occurred while setting up pipe")
/// Execute the processes specified by j.
///
/// I've put a fair bit of work into making builtins behave like other programs as far as pipes are
/// concerned. Unlike i.e. bash, builtins can pipe to other builtins with arbitrary amounts of data,
/// and so on. To do this, after a builtin is run in the real process, it forks and a dummy process
/// is created, responsible for writing the output of the builtin. This is surprisingly cheap on my
/// computer, probably because of the marvels of copy on write forking.
///
/// This rule is short circuited in the case where a builtin does not output to a pipe and does in
/// fact not output anything. The speed improvement from this optimization is not noticable on a
/// normal computer/OS in regular use, but the promiscous amounts of forking that resulted was
/// responsible for a huge slowdown when using Valgrind as well as when doing complex
/// command-specific completions.
/// Execute the processes specified by \p j in the parser \p.
class job_t;
class parser_t;
void exec_job(parser_t &parser, std::shared_ptr<job_t> j);

View File

@ -79,8 +79,9 @@ static wcstring profiling_cmd_name_for_redirectable_block(const parse_node_t &no
return result;
}
parse_execution_context_t::parse_execution_context_t(parsed_source_ref_t pstree, parser_t *p)
: pstree(std::move(pstree)), parser(p) {}
parse_execution_context_t::parse_execution_context_t(parsed_source_ref_t pstree, parser_t *p,
std::shared_ptr<job_t> parent)
: pstree(std::move(pstree)), parser(p), parent_job(std::move(parent)) {}
// Utilities
@ -1182,7 +1183,7 @@ parse_execution_result_t parse_execution_context_t::run_1_job(tnode_t<g::job> jo
return result;
}
shared_ptr<job_t> job = std::make_shared<job_t>(acquire_job_id(), block_io);
shared_ptr<job_t> job = std::make_shared<job_t>(acquire_job_id(), block_io, parent_job);
job->tmodes = tmodes;
job->set_flag(job_flag_t::JOB_CONTROL,
(job_control_mode == JOB_CONTROL_ALL) ||

View File

@ -36,9 +36,11 @@ class parse_execution_context_t {
// Cached line number information.
size_t cached_lineno_offset = 0;
int cached_lineno_count = 0;
// The parent job for any jobs created by this context.
const std::shared_ptr<job_t> parent_job;
// No copying allowed.
parse_execution_context_t(const parse_execution_context_t &);
parse_execution_context_t &operator=(const parse_execution_context_t &);
parse_execution_context_t(const parse_execution_context_t &) = delete;
parse_execution_context_t &operator=(const parse_execution_context_t &) = delete;
// Should I cancel?
bool should_cancel_execution(const block_t *block) const;
@ -137,7 +139,8 @@ class parse_execution_context_t {
int line_offset_of_character_at_offset(size_t char_idx);
public:
parse_execution_context_t(parsed_source_ref_t pstree, parser_t *p);
parse_execution_context_t(parsed_source_ref_t pstree, parser_t *p,
std::shared_ptr<job_t> parent);
/// Returns the current line number, indexed from 1. Not const since it touches
/// cached_lineno_offset.

View File

@ -655,13 +655,13 @@ void parser_t::eval(parsed_source_ref_t ps, const io_chain_t &io, enum block_typ
if (!ps->tree.empty()) {
// Execute the first node.
tnode_t<grammar::job_list> start{&ps->tree, &ps->tree.front()};
this->eval_node(ps, start, io, block_type);
this->eval_node(ps, start, io, block_type, nullptr /* parent */);
}
}
template <typename T>
int parser_t::eval_node(parsed_source_ref_t ps, tnode_t<T> node, const io_chain_t &io,
enum block_type_t block_type) {
block_type_t block_type, std::shared_ptr<job_t> parent_job) {
static_assert(
std::is_same<T, grammar::statement>::value || std::is_same<T, grammar::job_list>::value,
"Unexpected node type");
@ -692,7 +692,7 @@ int parser_t::eval_node(parsed_source_ref_t ps, tnode_t<T> node, const io_chain_
// Create and set a new execution context.
using exc_ctx_ref_t = std::unique_ptr<parse_execution_context_t>;
scoped_push<exc_ctx_ref_t> exc(&execution_context,
make_unique<parse_execution_context_t>(ps, this));
make_unique<parse_execution_context_t>(ps, this, parent_job));
int result = execution_context->eval_node(node, scope_block, io);
exc.restore();
this->pop_block(scope_block);
@ -703,9 +703,11 @@ int parser_t::eval_node(parsed_source_ref_t ps, tnode_t<T> node, const io_chain_
// Explicit instantiations. TODO: use overloads instead?
template int parser_t::eval_node(parsed_source_ref_t, tnode_t<grammar::statement>,
const io_chain_t &, enum block_type_t);
const io_chain_t &, enum block_type_t,
std::shared_ptr<job_t> parent_job);
template int parser_t::eval_node(parsed_source_ref_t, tnode_t<grammar::job_list>,
const io_chain_t &, enum block_type_t);
const io_chain_t &, enum block_type_t,
std::shared_ptr<job_t> parent_job);
bool parser_t::detect_errors_in_argument_list(const wcstring &arg_list_src, wcstring *out,
const wchar_t *prefix) const {

View File

@ -236,7 +236,7 @@ class parser_t {
/// The node type must be grammar::statement or grammar::job_list.
template <typename T>
int eval_node(parsed_source_ref_t ps, tnode_t<T> node, const io_chain_t &io,
enum block_type_t block_type);
block_type_t block_type, std::shared_ptr<job_t> parent_job);
/// Evaluate line as a list of parameters, i.e. tokenize it and perform parameter expansion and
/// cmdsubst execution on the tokens. The output is inserted into output. Errors are ignored.

View File

@ -263,7 +263,7 @@ static void mark_process_status(process_t *p, int status) {
}
}
void job_mark_process_as_failed(job_t *job, const process_t *failed_proc) {
void job_mark_process_as_failed(const std::shared_ptr<job_t> &job, const process_t *failed_proc) {
// The given process failed to even lift off (e.g. posix_spawn failed) and so doesn't have a
// valid pid. Mark it and everything after it as dead.
bool found = false;
@ -319,8 +319,13 @@ static void handle_child_status(pid_t pid, int status) {
process_t::process_t() {}
job_t::job_t(job_id_t jobid, io_chain_t bio)
: block_io(std::move(bio)), pgid(INVALID_PID), tmodes(), job_id(jobid), flags{} {}
job_t::job_t(job_id_t jobid, io_chain_t bio, std::shared_ptr<job_t> parent)
: block_io(std::move(bio)),
parent_job(std::move(parent)),
pgid(INVALID_PID),
tmodes(),
job_id(jobid),
flags{} {}
job_t::~job_t() { release_job_id(job_id); }

View File

@ -180,12 +180,16 @@ class job_t {
// The IO chain associated with the block.
const io_chain_t block_io;
// The parent job. If we were created as a nested job due to execution of a block or function in
// a pipeline, then this refers to the job corresponding to that pipeline. Otherwise it is null.
const std::shared_ptr<job_t> parent_job;
// No copying.
job_t(const job_t &rhs) = delete;
void operator=(const job_t &) = delete;
public:
job_t(job_id_t jobid, io_chain_t bio);
job_t(job_id_t jobid, io_chain_t bio, std::shared_ptr<job_t> parent);
~job_t();
/// Returns whether the command is empty.
@ -352,7 +356,7 @@ int job_reap(bool interactive);
void job_handle_signal(int signal, siginfo_t *info, void *con);
/// Mark a process as failed to execute (and therefore completed).
void job_mark_process_as_failed(job_t *job, const process_t *p);
void job_mark_process_as_failed(const std::shared_ptr<job_t> &job, const process_t *p);
#ifdef HAVE__PROC_SELF_STAT
/// Use the procfs filesystem to look up how many jiffies of cpu time was used by this process. This