Refactor wait handles

In preparation for using wait handles in --on-process-exit events, factor
wait handles into their own wait handle store. Also switch them to
per-process instead of per-job, which is a simplification.
This commit is contained in:
ridiculousfish 2021-05-11 12:01:08 -07:00
parent b63b511b0a
commit 82fd8fe9fb
10 changed files with 242 additions and 97 deletions

View File

@ -128,7 +128,8 @@ set(FISH_SRCS
src/proc.cpp src/reader.cpp src/redirection.cpp src/sanity.cpp src/screen.cpp
src/signal.cpp src/termsize.cpp src/timer.cpp src/tinyexpr.cpp
src/tokenizer.cpp src/topic_monitor.cpp src/trace.cpp src/utf8.cpp src/util.cpp
src/wcstringutil.cpp src/wgetopt.cpp src/wildcard.cpp src/wutil.cpp src/fds.cpp
src/wait_handle.cpp src/wcstringutil.cpp src/wgetopt.cpp src/wildcard.cpp
src/wutil.cpp src/fds.cpp
)
# Header files are just globbed.

View File

@ -11,6 +11,7 @@
#include "parser.h"
#include "proc.h"
#include "signal.h"
#include "wait_handle.h"
#include "wgetopt.h"
#include "wutil.h"
@ -20,10 +21,11 @@ static bool can_wait_on_job(const std::shared_ptr<job_t> &j) {
}
/// \return true if a wait handle matches a pid or a process name. Exactly one should be passed.
/// For convenience, this returns false if the wait handle is null.
static bool wait_handle_matches(pid_t pid, const wchar_t *proc_name, const wait_handle_ref_t &wh) {
assert((pid > 0 || proc_name) && "Must specify either pid or proc_name");
return (pid > 0 && contains(wh->pids, pid)) ||
(proc_name && contains(wh->proc_base_names, proc_name));
if (!wh) return false;
return (pid > 0 && pid == wh->pid) || (proc_name && proc_name == wh->base_name);
}
/// Walk the list of jobs, looking for a process with \p pid (if nonzero) or \p proc_name (if not
@ -34,8 +36,9 @@ static bool find_wait_handles(pid_t pid, const wchar_t *proc_name, const parser_
assert((pid > 0 || proc_name) && "Must specify either pid or proc_name");
// Has a job already completed?
// TODO: we can avoid traversing this list if searching by pid.
bool matched = false;
for (const auto &wh : parser.get_recorded_wait_handles()) {
for (const auto &wh : parser.get_wait_handles().get_list()) {
if (wait_handle_matches(pid, proc_name, wh)) {
handles->push_back(wh);
matched = true;
@ -44,18 +47,13 @@ static bool find_wait_handles(pid_t pid, const wchar_t *proc_name, const parser_
// Is there a running job match?
for (const auto &j : parser.jobs()) {
if (can_wait_on_job(j) && wait_handle_matches(pid, proc_name, j->get_wait_handle())) {
handles->push_back(j->get_wait_handle());
matched = true;
}
}
if (!matched) {
// Maybe we could have matched, but a job was stopped or otherwise unwaitable.
for (const auto &j : parser.jobs()) {
if (wait_handle_matches(pid, proc_name, j->get_wait_handle())) {
// We want to set 'matched' to true if we could have matched, even if the job was stopped.
bool provide_handle = can_wait_on_job(j);
for (const auto &proc : j->processes) {
auto wh = proc->get_wait_handle();
if (wait_handle_matches(pid, proc_name, wh)) {
matched = true;
break;
if (provide_handle) handles->push_back(std::move(wh));
}
}
}
@ -65,11 +63,17 @@ static bool find_wait_handles(pid_t pid, const wchar_t *proc_name, const parser_
/// \return all wait handles for all jobs, current and already completed (!).
static std::vector<wait_handle_ref_t> get_all_wait_handles(const parser_t &parser) {
std::vector<wait_handle_ref_t> result;
const auto &whs = parser.get_recorded_wait_handles();
// Get wait handles for reaped jobs.
const auto &whs = parser.get_wait_handles().get_list();
result.insert(result.end(), whs.begin(), whs.end());
// Get wait handles for running jobs.
for (const auto &j : parser.jobs()) {
if (can_wait_on_job(j)) {
result.push_back(j->get_wait_handle());
if (!can_wait_on_job(j)) continue;
for (const auto &proc : j->processes) {
if (auto wh = proc->get_wait_handle()) {
result.push_back(std::move(wh));
}
}
}
return result;
@ -91,7 +95,7 @@ static int wait_for_completion(parser_t &parser, const std::vector<wait_handle_r
// Remove completed wait handles (at most 1 if any_flag is set).
for (const auto &wh : whs) {
if (is_completed(wh)) {
parser.wait_handle_remove(wh);
parser.get_wait_handles().remove(wh);
if (any_flag) break;
}
}

View File

@ -930,6 +930,9 @@ static launch_result_t exec_process_in_job(parser_t &parser, process_t *p,
launch_result_t::failed) {
return launch_result_t::failed;
}
// It's possible (though unlikely) that this is a background process which recycled a
// pid from another, previous background process. Forget any such old process.
parser.get_wait_handles().remove_by_pid(p->pid);
break;
}

View File

@ -3375,6 +3375,42 @@ static void test_1_completion(wcstring line, const wcstring &completion, complet
do_test(cursor_pos == out_cursor_pos);
}
static void test_wait_handles() {
say(L"Testing wait handles");
constexpr size_t limit = 4;
wait_handle_store_t whs(limit);
do_test(whs.size() == 0);
// Null handles ignored.
whs.add(wait_handle_ref_t{});
do_test(whs.size() == 0);
do_test(whs.get_by_pid(5) == nullptr);
// Duplicate pids drop oldest.
whs.add(std::make_shared<wait_handle_t>(5, L"first"));
whs.add(std::make_shared<wait_handle_t>(5, L"second"));
do_test(whs.size() == 1);
do_test(whs.get_by_pid(5)->base_name == L"second");
whs.remove_by_pid(123);
do_test(whs.size() == 1);
whs.remove_by_pid(5);
do_test(whs.size() == 0);
// Test evicting oldest.
whs.add(std::make_shared<wait_handle_t>(1, L"1"));
whs.add(std::make_shared<wait_handle_t>(2, L"2"));
whs.add(std::make_shared<wait_handle_t>(3, L"3"));
whs.add(std::make_shared<wait_handle_t>(4, L"4"));
whs.add(std::make_shared<wait_handle_t>(5, L"5"));
do_test(whs.size() == 4);
auto start = whs.get_list().begin();
do_test(std::next(start, 0)->get()->base_name == L"5");
do_test(std::next(start, 1)->get()->base_name == L"4");
do_test(std::next(start, 2)->get()->base_name == L"3");
do_test(std::next(start, 3)->get()->base_name == L"2");
}
static void test_completion_insertions() {
#define TEST_1_COMPLETION(a, b, c, d, e) test_1_completion(a, b, c, d, e, __LINE__)
say(L"Testing completion insertions");
@ -6612,6 +6648,7 @@ int main(int argc, char **argv) {
if (should_test_function("universal")) test_universal_formats();
if (should_test_function("universal")) test_universal_ok_to_save();
if (should_test_function("notifiers")) test_universal_notifiers();
if (should_test_function("wait_handles")) test_wait_handles();
if (should_test_function("completion_insertions")) test_completion_insertions();
if (should_test_function("autosuggestion_ignores")) test_autosuggestion_ignores();
if (should_test_function("autosuggestion_combining")) test_autosuggestion_combining();

View File

@ -553,32 +553,6 @@ void parser_t::job_add(shared_ptr<job_t> job) {
job_list.push_front(std::move(job));
}
void parser_t::save_wait_handle_for_completed_job(job_t *job) {
assert(job && job->is_completed() && "Job null or not completed");
// Are we a background job with an external process?
if (!job->is_foreground() && job->has_external_proc()) {
rec_wait_handles.push_front(job->get_wait_handle(true /* create */));
// Limit how many background jobs we will remember.
// This is CHILD_MAX (controlled by _SC_CHILD_MAX) but we just hard code it.
// 1024 is zsh's fallback.
while (rec_wait_handles.size() > 1024) rec_wait_handles.pop_back();
}
// Mark the job as complete in its wait handle (but don't create it just for this).
if (auto wh = job->get_wait_handle(false /* create */)) {
wh->completed = true;
}
}
void parser_t::wait_handle_remove(const wait_handle_ref_t &handle) {
// Note the handle may not be found, if we exceeded our wait handle limit.
auto iter = std::find(rec_wait_handles.begin(), rec_wait_handles.end(), handle);
if (iter != rec_wait_handles.end()) {
rec_wait_handles.erase(iter);
}
}
void parser_t::job_promote(job_t *job) {
job_list_t::iterator loc;
for (loc = job_list.begin(); loc != job_list.end(); ++loc) {

View File

@ -20,6 +20,7 @@
#include "parse_tree.h"
#include "proc.h"
#include "util.h"
#include "wait_handle.h"
class io_chain_t;
@ -252,9 +253,9 @@ class parser_t : public std::enable_shared_from_this<parser_t> {
/// The jobs associated with this parser.
job_list_t job_list;
/// The list of recorded wait-handles. These are jobs that finished in the background, and have
/// Our store of recorded wait-handles. These are jobs that finished in the background, and have
/// been reaped, but may still be wait'ed on.
std::deque<wait_handle_ref_t> rec_wait_handles;
wait_handle_store_t wait_handles;
/// The list of blocks. This is a deque because we give out raw pointers to callers, who hold
/// them across manipulating this stack.
@ -367,10 +368,9 @@ class parser_t : public std::enable_shared_from_this<parser_t> {
library_data_t &libdata() { return library_data; }
const library_data_t &libdata() const { return library_data; }
/// Access the list of wait handles for jobs that have finished in the background.
const std::deque<wait_handle_ref_t> &get_recorded_wait_handles() const {
return rec_wait_handles;
}
/// Get our wait handle store.
wait_handle_store_t &get_wait_handles() { return wait_handles; }
const wait_handle_store_t &get_wait_handles() const { return wait_handles; }
/// Get and set the last proc statuses.
int get_last_status() const { return vars().get_last_status(); }
@ -408,13 +408,6 @@ class parser_t : public std::enable_shared_from_this<parser_t> {
/// Returns the job with the given pid.
job_t *job_get_from_pid(pid_t pid) const;
/// Given that a job has completed, check if it may be wait'ed on; if so add it to our list of
/// wait handles.
void save_wait_handle_for_completed_job(job_t *job);
/// Remove a wait handle, if present in the list.
void wait_handle_remove(const wait_handle_ref_t &handle);
/// Returns a new profile item if profiling is active. The caller should fill it in.
/// The parser_t will deallocate it.
/// If profiling is not active, this returns nullptr.

View File

@ -31,6 +31,7 @@
#endif
#include <sys/time.h> // IWYU pragma: keep
#include <sys/types.h>
#include <sys/wait.h>
#include <algorithm> // IWYU pragma: keep
#include <memory>
@ -189,21 +190,6 @@ maybe_t<statuses_t> job_t::get_statuses() const {
return st;
}
wait_handle_ref_t job_t::get_wait_handle(bool create) {
if (!wait_handle && create) {
wait_handle = std::make_shared<wait_handle_t>();
for (const auto &proc : processes) {
// Only external processes may be wait'ed upon.
if (proc->type != process_type_t::external) continue;
if (proc->pid > 0) {
wait_handle->pids.push_back(proc->pid);
}
wait_handle->proc_base_names.push_back(wbasename(proc->actual_cmd));
}
}
return wait_handle;
}
void internal_proc_t::mark_exited(proc_status_t status) {
assert(!exited() && "Process is already exited");
status_.store(status, std::memory_order_relaxed);
@ -309,6 +295,16 @@ bool process_t::is_internal() const {
return true;
}
wait_handle_ref_t process_t::get_wait_handle(bool create) {
if (type != process_type_t::external || pid <= 0) {
return nullptr;
}
if (!wait_handle_ && create) {
wait_handle_ = std::make_shared<wait_handle_t>(this->pid, wbasename(this->actual_cmd));
}
return wait_handle_;
}
static uint64_t next_internal_job_id() {
static std::atomic<uint64_t> s_next{};
return ++s_next;
@ -610,6 +606,26 @@ static bool job_wants_message(const shared_ptr<job_t> &j) {
return true;
}
/// Given that a job has completed, check if it may be wait'ed on; if so add it to the wait handle
/// store. Then mark all wait handles as complete.
static void save_wait_handle_for_completed_job(const shared_ptr<job_t> &job,
wait_handle_store_t &store) {
assert(job && job->is_completed() && "Job null or not completed");
// Are we a background job?
if (!job->is_foreground()) {
for (auto &proc : job->processes) {
store.add(proc->get_wait_handle(true));
}
}
// Mark all wait handles as complete (but don't create just for this).
for (auto &proc : job->processes) {
if (wait_handle_ref_t wh = proc->get_wait_handle(false /* create */)) {
wh->completed = true;
}
}
}
/// Remove completed jobs from the job list, printing status messages as appropriate.
/// \return whether something was printed.
static bool process_clean_after_marking(parser_t &parser, bool allow_interactive) {
@ -688,7 +704,7 @@ static bool process_clean_after_marking(parser_t &parser, bool allow_interactive
const shared_ptr<job_t> &j = *iter;
if (should_process_job(j) && j->is_completed()) {
// If this job finished in the background, we have to remember to wait on it.
parser.save_wait_handle_for_completed_job(j.get());
save_wait_handle_for_completed_job(j, parser.get_wait_handles());
iter = jobs.erase(iter);
} else {
++iter;

View File

@ -8,7 +8,6 @@
#include <signal.h>
#include <stddef.h>
#include <sys/time.h> // IWYU pragma: keep
#include <sys/wait.h>
#include <unistd.h>
#include <deque>
@ -21,6 +20,7 @@
#include "io.h"
#include "parse_tree.h"
#include "topic_monitor.h"
#include "wait_handle.h"
/// Types of processes.
enum class process_type_t {
@ -248,6 +248,10 @@ class process_t {
/// \return whether this process type is internal (block, function, or builtin).
bool is_internal() const;
/// \return the wait handle for the process, creating it if \p create is set.
/// This will return nullptr if the process does not have a pid (i.e. is not external).
wait_handle_ref_t get_wait_handle(bool create = true);
/// Actual command to pass to exec in case of process_type_t::external or process_type_t::exec.
wcstring actual_cmd;
@ -285,6 +289,9 @@ class process_t {
private:
wcstring_list_t argv_;
redirection_spec_list_t proc_redirection_specs_;
// The wait handle. This is constructed lazily, and cached.
wait_handle_ref_t wait_handle_{};
};
using process_ptr_t = std::unique_ptr<process_t>;
@ -297,21 +304,6 @@ using job_id_t = int;
/// Every job has a unique positive value for this.
using internal_job_id_t = uint64_t;
/// The bits of a job necessary to support 'wait'.
/// This may outlive the job.
struct wait_handle_t {
/// The list of pids of the processes in this job.
std::vector<pid_t> pids{};
/// The list of "base names" of the processes from the job.
/// For example if the job is "/bin/sleep" then this will be 'sleep'.
wcstring_list_t proc_base_names{};
/// Set to true when the job is completed.
bool completed{false};
};
using wait_handle_ref_t = std::shared_ptr<wait_handle_t>;
/// A struct representing a job. A job is a pipeline of one or more processes.
class job_t {
public:
@ -394,10 +386,6 @@ class job_t {
// This is never null and not changed after construction.
job_group_ref_t group{};
// The wait handle. This is constructed lazily, and cached.
// Do not access this directly, use the get_wait_handle() function below.
wait_handle_ref_t wait_handle{};
/// \return the pgid for the job, based on the job group.
/// This may be none if the job consists of just internal fish functions or builtins.
/// This may also be fish itself.
@ -491,9 +479,6 @@ class job_t {
/// \returns the statuses for this job.
maybe_t<statuses_t> get_statuses() const;
/// \return the wait handle for the job, creating it if \p create is set.
wait_handle_ref_t get_wait_handle(bool create = true);
};
/// Whether this shell is attached to a tty.

47
src/wait_handle.cpp Normal file
View File

@ -0,0 +1,47 @@
#include "config.h"
#include "wait_handle.h"
#include "flog.h"
wait_handle_store_t::wait_handle_store_t(size_t limit) : limit_(limit) {}
void wait_handle_store_t::add(wait_handle_ref_t wh) {
if (!wh || wh->pid <= 0) return;
pid_t pid = wh->pid;
remove_by_pid(wh->pid);
handles_.push_front(std::move(wh));
handle_map_[pid] = std::begin(handles_);
// Remove oldest until we reach our limit.
while (handles_.size() > limit_) {
handle_map_.erase(handles_.back()->pid);
handles_.pop_back();
}
}
void wait_handle_store_t::remove(const wait_handle_ref_t &wh) {
// Note: this differs from remove_by_pid because we verify that the handle is the same.
if (!wh) return;
auto iter = handle_map_.find(wh->pid);
if (iter != handle_map_.end() && *iter->second == wh) {
// Note this may deallocate the wait handle, leaving it dangling.
handles_.erase(iter->second);
handle_map_.erase(iter);
}
}
void wait_handle_store_t::remove_by_pid(pid_t pid) {
auto iter = handle_map_.find(pid);
if (iter != handle_map_.end()) {
handles_.erase(iter->second);
handle_map_.erase(iter);
}
}
wait_handle_ref_t wait_handle_store_t::get_by_pid(pid_t pid) const {
auto iter = handle_map_.find(pid);
if (iter == handle_map_.end()) return nullptr;
return *iter->second;
}

85
src/wait_handle.h Normal file
View File

@ -0,0 +1,85 @@
// Support for handling pids that are no longer fish jobs.
// This includes pids that have been disowned ("forgotten") and background jobs which have finished,
// but may be wait'ed.
#ifndef FISH_WAIT_HANDLE_H
#define FISH_WAIT_HANDLE_H
#include "config.h" // IWYU pragma: keep
#include <unistd.h>
#include <list>
#include <unordered_map>
#include <vector>
#include "common.h"
/// The bits of a job necessary to support 'wait' and '--on-process-exit'.
/// This may outlive the job.
struct wait_handle_t {
/// Construct from a pid and base name.
wait_handle_t(pid_t pid, wcstring name) : pid(pid), base_name(std::move(name)) {}
/// The pid of this process.
pid_t pid{};
/// The "base name" of this process.
/// For example if the process is "/bin/sleep" then this will be 'sleep'.
wcstring base_name{};
/// Set to true when the process is completed.
bool completed{false};
};
using wait_handle_ref_t = std::shared_ptr<wait_handle_t>;
/// Support for storing a list of wait handles, with a max limit set at initialization.
/// Note this class is not safe for concurrent access.
class wait_handle_store_t {
public:
// Our wait handles are arranged in a linked list for its iterator invalidation semantics: we
// may remove one without needing to update the map from pid -> handle.
using wait_handle_list_t = std::list<wait_handle_ref_t>;
/// Construct with a max limit on the number of handles we will remember.
/// The default is 1024, which is zsh's default.
explicit wait_handle_store_t(size_t limit = 1024);
/// Add a wait handle to the store. This may remove the oldest handle, if our limit is exceeded.
/// It may also remove any existing handle with that pid.
/// For convenience, this does nothing if wh is null.
void add(wait_handle_ref_t wh);
/// \return the wait handle for a pid, or nullptr if there is none.
/// This is a fast lookup.
wait_handle_ref_t get_by_pid(pid_t pid) const;
/// Remove a given wait handle, if present in this store.
void remove(const wait_handle_ref_t &wh);
/// Remove the wait handle for a pid, if present in this store.
void remove_by_pid(pid_t pid);
/// Get the list of all wait handles.
const wait_handle_list_t &get_list() const { return handles_; }
/// Convenience to return the size, for testing.
size_t size() const { return handles_.size(); }
/// No copying allowed.
wait_handle_store_t(const wait_handle_store_t &) = delete;
void operator=(const wait_handle_store_t &) = delete;
private:
using list_node_t = typename wait_handle_list_t::iterator;
/// The list of all wait handles. New ones come on the front, the last one is oldest.
wait_handle_list_t handles_{};
/// Map from pid to the wait handle's position in the list.
std::unordered_map<pid_t, list_node_t> handle_map_{};
/// Max supported wait handles.
const size_t limit_;
};
#endif