Rewrite wait handles and wait handle store in Rust

This commit is contained in:
ridiculousfish 2023-03-13 19:23:31 -07:00
parent 14d6b1c3de
commit 57f4571a01
17 changed files with 441 additions and 257 deletions

View File

@ -125,7 +125,7 @@ set(FISH_SRCS
src/proc.cpp src/re.cpp src/reader.cpp src/screen.cpp src/proc.cpp src/re.cpp src/reader.cpp src/screen.cpp
src/signals.cpp src/termsize.cpp src/tinyexpr.cpp src/signals.cpp src/termsize.cpp src/tinyexpr.cpp
src/trace.cpp src/utf8.cpp src/trace.cpp src/utf8.cpp
src/wait_handle.cpp src/wcstringutil.cpp src/wgetopt.cpp src/wildcard.cpp src/wcstringutil.cpp src/wgetopt.cpp src/wildcard.cpp
src/wutil.cpp src/fds.cpp src/rustffi.cpp src/wutil.cpp src/fds.cpp src/rustffi.cpp
) )

36
fish-rust/Cargo.lock generated
View File

@ -28,6 +28,17 @@ dependencies = [
"version_check", "version_check",
] ]
[[package]]
name = "ahash"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f"
dependencies = [
"cfg-if",
"once_cell",
"version_check",
]
[[package]] [[package]]
name = "aho-corasick" name = "aho-corasick"
version = "0.7.20" version = "0.7.20"
@ -355,6 +366,7 @@ dependencies = [
"errno", "errno",
"inventory", "inventory",
"libc", "libc",
"lru",
"miette", "miette",
"nix", "nix",
"num-traits", "num-traits",
@ -405,7 +417,16 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
dependencies = [ dependencies = [
"ahash", "ahash 0.7.6",
]
[[package]]
name = "hashbrown"
version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e"
dependencies = [
"ahash 0.8.3",
] ]
[[package]] [[package]]
@ -436,7 +457,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399" checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"hashbrown", "hashbrown 0.12.3",
"serde", "serde",
] ]
@ -541,6 +562,15 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "lru"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03f1160296536f10c833a82dca22267d5486734230d47bf00bf435885814ba1e"
dependencies = [
"hashbrown 0.13.2",
]
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.5.0" version = "2.5.0"
@ -984,7 +1014,7 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5faade31a542b8b35855fff6e8def199853b2da8da256da52f52f1316ee3137" checksum = "c5faade31a542b8b35855fff6e8def199853b2da8da256da52f52f1316ee3137"
dependencies = [ dependencies = [
"hashbrown", "hashbrown 0.12.3",
"regex", "regex",
] ]

View File

@ -20,6 +20,7 @@ once_cell = "1.17.0"
rand = { version = "0.8.5", features = ["small_rng"] } rand = { version = "0.8.5", features = ["small_rng"] }
unixstring = "0.2.7" unixstring = "0.2.7"
widestring = "1.0.2" widestring = "1.0.2"
lru = "0.10.0"
[build-dependencies] [build-dependencies]
autocxx-build = "0.23.1" autocxx-build = "0.23.1"

View File

@ -35,6 +35,7 @@ fn main() -> miette::Result<()> {
"src/tokenizer.rs", "src/tokenizer.rs",
"src/topic_monitor.rs", "src/topic_monitor.rs",
"src/util.rs", "src/util.rs",
"src/wait_handle.rs",
"src/builtins/shared.rs", "src/builtins/shared.rs",
]; ];
cxx_build::bridges(&source_files) cxx_build::bridges(&source_files)

View File

@ -4,8 +4,9 @@ use crate::builtins::shared::{
builtin_missing_argument, builtin_print_help, builtin_unknown_option, io_streams_t, builtin_missing_argument, builtin_print_help, builtin_unknown_option, io_streams_t,
STATUS_CMD_OK, STATUS_INVALID_ARGS, STATUS_CMD_OK, STATUS_INVALID_ARGS,
}; };
use crate::ffi::{job_t, parser_t, proc_wait_any, wait_handle_ref_t, Repin}; use crate::ffi::{job_t, parser_t, proc_wait_any, Repin};
use crate::signal::sigchecker_t; use crate::signal::sigchecker_t;
use crate::wait_handle::{WaitHandleRef, WaitHandleStore};
use crate::wchar::{widestrs, wstr}; use crate::wchar::{widestrs, wstr};
use crate::wgetopt::{wgetopter_t, wopt, woption, woption_argument_t}; use crate::wgetopt::{wgetopter_t, wopt, woption, woption_argument_t};
use crate::wutil::{self, fish_wcstoi, wgettext_fmt}; use crate::wutil::{self, fish_wcstoi, wgettext_fmt};
@ -16,14 +17,10 @@ fn can_wait_on_job(j: &cxx::SharedPtr<job_t>) -> bool {
} }
/// \return true if a wait handle matches a pid or a process name. /// \return true if a wait handle matches a pid or a process name.
/// For convenience, this returns false if the wait handle is null. fn wait_handle_matches(query: WaitHandleQuery, wh: &WaitHandleRef) -> bool {
fn wait_handle_matches(query: WaitHandleQuery, wh: &wait_handle_ref_t) -> bool {
if wh.is_null() {
return false;
}
match query { match query {
WaitHandleQuery::Pid(pid) => wh.get_pid().0 == pid, WaitHandleQuery::Pid(pid) => wh.pid == pid,
WaitHandleQuery::ProcName(proc_name) => proc_name == wh.get_base_name(), WaitHandleQuery::ProcName(proc_name) => proc_name == wh.base_name,
} }
} }
@ -32,16 +29,6 @@ fn iswnumeric(s: &wstr) -> bool {
s.chars().all(|c| c.is_ascii_digit()) s.chars().all(|c| c.is_ascii_digit())
} }
// Hack to copy wait handles into a vector.
fn get_wait_handle_list(parser: &parser_t) -> Vec<wait_handle_ref_t> {
let mut handles = Vec::new();
let whs = parser.get_wait_handles1();
for idx in 0..whs.size() {
handles.push(whs.get(idx));
}
handles
}
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
enum WaitHandleQuery<'a> { enum WaitHandleQuery<'a> {
Pid(pid_t), Pid(pid_t),
@ -53,15 +40,16 @@ enum WaitHandleQuery<'a> {
/// \return true if we found a matching job (even if not waitable), false if not. /// \return true if we found a matching job (even if not waitable), false if not.
fn find_wait_handles( fn find_wait_handles(
query: WaitHandleQuery<'_>, query: WaitHandleQuery<'_>,
parser: &parser_t, parser: &mut parser_t,
handles: &mut Vec<wait_handle_ref_t>, handles: &mut Vec<WaitHandleRef>,
) -> bool { ) -> bool {
// Has a job already completed? // Has a job already completed?
// TODO: we can avoid traversing this list if searching by pid. // TODO: we can avoid traversing this list if searching by pid.
let mut matched = false; let mut matched = false;
for wh in get_wait_handle_list(parser) { let wait_handles: &mut WaitHandleStore = parser.get_wait_handles_mut();
if wait_handle_matches(query, &wh) { for wh in wait_handles.iter() {
handles.push(wh); if wait_handle_matches(query, wh) {
handles.push(wh.clone());
matched = true; matched = true;
} }
} }
@ -71,11 +59,17 @@ fn find_wait_handles(
// We want to set 'matched' to true if we could have matched, even if the job was stopped. // We want to set 'matched' to true if we could have matched, even if the job was stopped.
let provide_handle = can_wait_on_job(j); let provide_handle = can_wait_on_job(j);
for proc in j.get_procs() { for proc in j.get_procs() {
let wh = proc.pin_mut().make_wait_handle(j.get_internal_job_id()); let wh = proc
.pin_mut()
.unpin()
.make_wait_handle(j.get_internal_job_id());
let Some(wh) = wh else {
continue;
};
if wait_handle_matches(query, &wh) { if wait_handle_matches(query, &wh) {
matched = true; matched = true;
if provide_handle { if provide_handle {
handles.push(wh); handles.push(wh.clone());
} }
} }
} }
@ -83,13 +77,9 @@ fn find_wait_handles(
matched matched
} }
fn get_all_wait_handles(parser: &parser_t) -> Vec<wait_handle_ref_t> { fn get_all_wait_handles(parser: &parser_t) -> Vec<WaitHandleRef> {
let mut result = Vec::new();
// Get wait handles for reaped jobs. // Get wait handles for reaped jobs.
let wait_handles = parser.get_wait_handles1(); let mut result = parser.get_wait_handles().get_list();
for idx in 0..wait_handles.size() {
result.push(wait_handles.get(idx));
}
// Get wait handles for running jobs. // Get wait handles for running jobs.
for j in parser.get_jobs() { for j in parser.get_jobs() {
@ -97,9 +87,8 @@ fn get_all_wait_handles(parser: &parser_t) -> Vec<wait_handle_ref_t> {
continue; continue;
} }
for proc_ptr in j.get_procs().iter_mut() { for proc_ptr in j.get_procs().iter_mut() {
let proc = proc_ptr.pin_mut(); let proc = proc_ptr.pin_mut().unpin();
let wh = proc.make_wait_handle(j.get_internal_job_id()); if let Some(wh) = proc.make_wait_handle(j.get_internal_job_id()) {
if !wh.is_null() {
result.push(wh); result.push(wh);
} }
} }
@ -107,7 +96,7 @@ fn get_all_wait_handles(parser: &parser_t) -> Vec<wait_handle_ref_t> {
result result
} }
fn is_completed(wh: &wait_handle_ref_t) -> bool { fn is_completed(wh: &WaitHandleRef) -> bool {
wh.is_completed() wh.is_completed()
} }
@ -116,7 +105,7 @@ fn is_completed(wh: &wait_handle_ref_t) -> bool {
/// \return a status code. /// \return a status code.
fn wait_for_completion( fn wait_for_completion(
parser: &mut parser_t, parser: &mut parser_t,
whs: &[wait_handle_ref_t], whs: &[WaitHandleRef],
any_flag: bool, any_flag: bool,
) -> Option<c_int> { ) -> Option<c_int> {
if whs.is_empty() { if whs.is_empty() {
@ -135,7 +124,7 @@ fn wait_for_completion(
// Remove completed wait handles (at most 1 if any_flag is set). // Remove completed wait handles (at most 1 if any_flag is set).
for wh in whs { for wh in whs {
if is_completed(wh) { if is_completed(wh) {
parser.pin().get_wait_handles().remove(wh); parser.get_wait_handles_mut().remove(wh);
if any_flag { if any_flag {
break; break;
} }
@ -203,7 +192,7 @@ pub fn wait(
} }
// Get the list of wait handles for our waiting. // Get the list of wait handles for our waiting.
let mut wait_handles: Vec<wait_handle_ref_t> = Vec::new(); let mut wait_handles: Vec<WaitHandleRef> = Vec::new();
for i in w.woptind..argc { for i in w.woptind..argc {
if iswnumeric(argv[i]) { if iswnumeric(argv[i]) {
// argument is pid // argument is pid

View File

@ -6,6 +6,9 @@ use ::std::fmt::{self, Debug, Formatter};
use ::std::pin::Pin; use ::std::pin::Pin;
#[rustfmt::skip] #[rustfmt::skip]
use ::std::slice; use ::std::slice;
pub use crate::wait_handle::{
WaitHandleRef, WaitHandleRefFFI, WaitHandleStore, WaitHandleStoreFFI,
};
use crate::wchar::wstr; use crate::wchar::wstr;
use autocxx::prelude::*; use autocxx::prelude::*;
use cxx::SharedPtr; use cxx::SharedPtr;
@ -33,6 +36,10 @@ include_cpp! {
#include "wutil.h" #include "wutil.h"
#include "termsize.h" #include "termsize.h"
// We need to block these types so when exposing C++ to Rust.
block!("WaitHandleStoreFFI")
block!("WaitHandleRefFFI")
safety!(unsafe_ffi) safety!(unsafe_ffi)
generate_pod!("wcharz_t") generate_pod!("wcharz_t")
@ -57,6 +64,7 @@ include_cpp! {
generate!("block_t") generate!("block_t")
generate!("parser_t") generate!("parser_t")
generate!("job_t") generate!("job_t")
generate!("process_t") generate!("process_t")
generate!("library_data_t") generate!("library_data_t")
@ -76,9 +84,6 @@ include_cpp! {
generate!("builtin_print_help") generate!("builtin_print_help")
generate!("builtin_print_error_trailer") generate!("builtin_print_error_trailer")
generate!("wait_handle_t")
generate!("wait_handle_store_t")
generate!("escape_string") generate!("escape_string")
generate!("sig2wcs") generate!("sig2wcs")
generate!("wcs2sig") generate!("wcs2sig")
@ -107,6 +112,18 @@ include_cpp! {
} }
impl parser_t { impl parser_t {
pub fn get_wait_handles_mut(&mut self) -> &mut WaitHandleStore {
let ptr = self.get_wait_handles_void() as *mut Box<WaitHandleStoreFFI>;
assert!(!ptr.is_null());
unsafe { (*ptr).from_ffi_mut() }
}
pub fn get_wait_handles(&self) -> &WaitHandleStore {
let ptr = self.get_wait_handles_void() as *const Box<WaitHandleStoreFFI>;
assert!(!ptr.is_null());
unsafe { (*ptr).from_ffi() }
}
pub fn get_block_at_index(&self, i: usize) -> Option<&block_t> { pub fn get_block_at_index(&self, i: usize) -> Option<&block_t> {
let b = self.block_at_index(i); let b = self.block_at_index(i);
unsafe { b.as_ref() } unsafe { b.as_ref() }
@ -145,6 +162,30 @@ impl job_t {
} }
} }
impl process_t {
/// \return the wait handle for the process, if it exists.
pub fn get_wait_handle(&self) -> Option<WaitHandleRef> {
let handle_ptr = self.get_wait_handle_void() as *const Box<WaitHandleRefFFI>;
if handle_ptr.is_null() {
None
} else {
let handle: &WaitHandleRefFFI = unsafe { &*handle_ptr };
Some(handle.from_ffi().clone())
}
}
/// \return the wait handle for the process, creating it if necessary.
pub fn make_wait_handle(&mut self, jid: u64) -> Option<WaitHandleRef> {
let handle_ref = self.pin().make_wait_handle_void(jid) as *const Box<WaitHandleRefFFI>;
if handle_ref.is_null() {
None
} else {
let handle: &WaitHandleRefFFI = unsafe { &*handle_ref };
Some(handle.from_ffi().clone())
}
}
}
/// Allow wcharz_t to be "into" wstr. /// Allow wcharz_t to be "into" wstr.
impl From<wcharz_t> for &wchar::wstr { impl From<wcharz_t> for &wchar::wstr {
fn from(w: wcharz_t) -> Self { fn from(w: wcharz_t) -> Self {

View File

@ -35,6 +35,7 @@ mod timer;
mod tokenizer; mod tokenizer;
mod topic_monitor; mod topic_monitor;
mod util; mod util;
mod wait_handle;
mod wchar; mod wchar;
mod wchar_ext; mod wchar_ext;
mod wchar_ffi; mod wchar_ffi;

View File

@ -0,0 +1,270 @@
use crate::wchar::WString;
use crate::wchar_ffi::WCharFromFFI;
use cxx::CxxWString;
use libc::pid_t;
use std::cell::Cell;
use std::rc::Rc;
#[cxx::bridge]
mod wait_handle_ffi {
extern "Rust" {
type WaitHandleRefFFI;
fn new_wait_handle_ffi(
pid: i32,
internal_job_id: u64,
base_name: &CxxWString,
) -> Box<WaitHandleRefFFI>;
fn set_status_and_complete(self: &mut WaitHandleRefFFI, status: i32);
type WaitHandleStoreFFI;
fn new_wait_handle_store_ffi() -> Box<WaitHandleStoreFFI>;
fn remove_by_pid(self: &mut WaitHandleStoreFFI, pid: i32);
fn get_job_id_by_pid(self: &WaitHandleStoreFFI, pid: i32) -> u64;
fn try_get_status_and_job_id(
self: &WaitHandleStoreFFI,
pid: i32,
only_if_complete: bool,
status: &mut i32,
job_id: &mut u64,
) -> bool;
fn add(self: &mut WaitHandleStoreFFI, wh: *const Box<WaitHandleRefFFI>);
}
}
pub struct WaitHandleRefFFI(WaitHandleRef);
impl WaitHandleRefFFI {
#[allow(clippy::wrong_self_convention)]
pub fn from_ffi(&self) -> &WaitHandleRef {
&self.0
}
#[allow(clippy::wrong_self_convention)]
pub fn from_ffi_mut(&mut self) -> &mut WaitHandleRef {
&mut self.0
}
fn set_status_and_complete(self: &mut WaitHandleRefFFI, status: i32) {
let wh = self.from_ffi();
assert!(!wh.is_completed(), "wait handle already completed");
wh.status.set(Some(status));
}
}
pub struct WaitHandleStoreFFI(WaitHandleStore);
impl WaitHandleStoreFFI {
#[allow(clippy::wrong_self_convention)]
pub fn from_ffi_mut(&mut self) -> &mut WaitHandleStore {
&mut self.0
}
#[allow(clippy::wrong_self_convention)]
pub fn from_ffi(&self) -> &WaitHandleStore {
&self.0
}
/// \return the job ID for a pid, or 0 if None.
fn get_job_id_by_pid(&self, pid: i32) -> u64 {
self.from_ffi()
.get_by_pid(pid)
.map(|wh| wh.internal_job_id)
.unwrap_or(0)
}
/// Try getting the status and job ID of a job.
/// \return true if the job was found.
/// If only_if_complete is true, then only return true if the job is completed.
fn try_get_status_and_job_id(
self: &WaitHandleStoreFFI,
pid: i32,
only_if_complete: bool,
status: &mut i32,
job_id: &mut u64,
) -> bool {
let whs = self.from_ffi();
let Some(wh) = whs.get_by_pid(pid) else {
return false;
};
if only_if_complete && !wh.is_completed() {
return false;
}
*status = wh.status.get().unwrap_or(0);
*job_id = wh.internal_job_id;
true
}
/// Remove the wait handle for a pid, if present in this store.
fn remove_by_pid(&mut self, pid: i32) {
self.from_ffi_mut().remove_by_pid(pid);
}
fn add(self: &mut WaitHandleStoreFFI, wh: *const Box<WaitHandleRefFFI>) {
if wh.is_null() {
return;
}
let wh = unsafe { (*wh).from_ffi() };
self.from_ffi_mut().add(wh.clone());
}
}
fn new_wait_handle_store_ffi() -> Box<WaitHandleStoreFFI> {
Box::new(WaitHandleStoreFFI(WaitHandleStore::new()))
}
fn new_wait_handle_ffi(
pid: i32,
internal_job_id: u64,
base_name: &CxxWString,
) -> Box<WaitHandleRefFFI> {
Box::new(WaitHandleRefFFI(WaitHandle::new(
pid as pid_t,
internal_job_id,
base_name.from_ffi(),
)))
}
pub type InternalJobId = u64;
/// The bits of a job necessary to support 'wait' and '--on-process-exit'.
/// This may outlive the job.
pub struct WaitHandle {
/// The pid of this process.
pub pid: pid_t,
/// The internal job id of the job which contained this process.
pub internal_job_id: InternalJobId,
/// The "base name" of this process.
/// For example if the process is "/bin/sleep" then this will be 'sleep'.
pub base_name: WString,
/// The status, if completed; None if not completed.
status: Cell<Option<i32>>,
}
impl WaitHandle {
/// \return true if this wait handle is completed.
pub fn is_completed(&self) -> bool {
self.status.get().is_some()
}
}
impl WaitHandle {
/// Construct from a pid, job id, and base name.
pub fn new(pid: pid_t, internal_job_id: InternalJobId, base_name: WString) -> WaitHandleRef {
Rc::new(WaitHandle {
pid,
internal_job_id,
base_name,
status: Default::default(),
})
}
}
pub type WaitHandleRef = Rc<WaitHandle>;
const WAIT_HANDLE_STORE_DEFAULT_LIMIT: usize = 1024;
/// Support for storing a list of wait handles, with a max limit set at initialization.
/// Note this class is not safe for concurrent access.
pub struct WaitHandleStore {
// Map from pid to wait handles.
cache: lru::LruCache<pid_t, WaitHandleRef>,
}
impl WaitHandleStore {
/// Construct with the default capacity.
pub fn new() -> WaitHandleStore {
Self::new_with_capacity(WAIT_HANDLE_STORE_DEFAULT_LIMIT)
}
pub fn new_with_capacity(capacity: usize) -> WaitHandleStore {
let capacity = std::num::NonZeroUsize::new(capacity).unwrap();
WaitHandleStore {
cache: lru::LruCache::new(capacity),
}
}
/// Add a wait handle to the store. This may remove the oldest handle, if our limit is exceeded.
/// It may also remove any existing handle with that pid.
pub fn add(&mut self, wh: WaitHandleRef) {
self.cache.put(wh.pid, wh);
}
/// \return the wait handle for a pid, or None if there is none.
/// This is a fast lookup.
pub fn get_by_pid(&self, pid: pid_t) -> Option<WaitHandleRef> {
self.cache.peek(&pid).cloned()
}
/// Remove a given wait handle, if present in this store.
pub fn remove(&mut self, wh: &WaitHandleRef) {
// Note: this differs from remove_by_pid because we verify that the handle is the same.
if let Some(key) = self.cache.peek(&wh.pid) {
if Rc::ptr_eq(key, wh) {
self.cache.pop(&wh.pid);
}
}
}
/// Remove the wait handle for a pid, if present in this store.
pub fn remove_by_pid(&mut self, pid: pid_t) {
self.cache.pop(&pid);
}
/// Iterate over wait handles.
pub fn iter(&self) -> impl Iterator<Item = &WaitHandleRef> {
self.cache.iter().map(|(_, wh)| wh)
}
/// Copy out the list of all wait handles, returning the most-recently-used first.
pub fn get_list(&self) -> Vec<WaitHandleRef> {
self.cache.iter().map(|(_, wh)| wh.clone()).collect()
}
/// Convenience to return the size, for testing.
pub fn size(&self) -> usize {
self.cache.len()
}
}
#[test]
fn test_wait_handles() {
use crate::wchar::L;
let limit: usize = 4;
let mut whs = WaitHandleStore::new_with_capacity(limit);
assert_eq!(whs.size(), 0);
assert!(whs.get_by_pid(5).is_none());
// Duplicate pids drop oldest.
whs.add(WaitHandle::new(5, 0, L!("first").to_owned()));
whs.add(WaitHandle::new(5, 0, L!("second").to_owned()));
assert_eq!(whs.size(), 1);
assert_eq!(whs.get_by_pid(5).unwrap().base_name, "second");
whs.remove_by_pid(123);
assert_eq!(whs.size(), 1);
whs.remove_by_pid(5);
assert_eq!(whs.size(), 0);
// Test evicting oldest.
whs.add(WaitHandle::new(1, 0, L!("1").to_owned()));
whs.add(WaitHandle::new(2, 0, L!("2").to_owned()));
whs.add(WaitHandle::new(3, 0, L!("3").to_owned()));
whs.add(WaitHandle::new(4, 0, L!("4").to_owned()));
whs.add(WaitHandle::new(5, 0, L!("5").to_owned()));
assert_eq!(whs.size(), 4);
let entries = whs.get_list();
let mut iter = entries.iter();
assert_eq!(iter.next().unwrap().base_name, "5");
assert_eq!(iter.next().unwrap().base_name, "4");
assert_eq!(iter.next().unwrap().base_name, "3");
assert_eq!(iter.next().unwrap().base_name, "2");
assert!(iter.next().is_none());
}

View File

@ -28,9 +28,9 @@
#include "../parser_keywords.h" #include "../parser_keywords.h"
#include "../proc.h" #include "../proc.h"
#include "../signals.h" #include "../signals.h"
#include "../wait_handle.h"
#include "../wgetopt.h" #include "../wgetopt.h"
#include "../wutil.h" // IWYU pragma: keep #include "../wutil.h" // IWYU pragma: keep
#include "cxx.h"
namespace { namespace {
struct function_cmd_opts_t { struct function_cmd_opts_t {
@ -66,10 +66,7 @@ static internal_job_id_t job_id_for_pid(pid_t pid, parser_t &parser) {
if (const auto *job = parser.job_get_from_pid(pid)) { if (const auto *job = parser.job_get_from_pid(pid)) {
return job->internal_job_id; return job->internal_job_id;
} }
if (wait_handle_ref_t wh = parser.get_wait_handles().get_by_pid(pid)) { return parser.get_wait_handles_ffi()->get_job_id_by_pid(pid);
return wh->internal_job_id;
}
return 0;
} }
static int parse_cmd_opts(function_cmd_opts_t &opts, int *optind, //!OCLINT(high ncss method) static int parse_cmd_opts(function_cmd_opts_t &opts, int *optind, //!OCLINT(high ncss method)
@ -314,16 +311,20 @@ int builtin_function(parser_t &parser, io_streams_t &streams, const wcstring_lis
if (ed.typ == event_type_t::process_exit) { if (ed.typ == event_type_t::process_exit) {
pid_t pid = ed.pid; pid_t pid = ed.pid;
if (pid == EVENT_ANY_PID) continue; if (pid == EVENT_ANY_PID) continue;
wait_handle_ref_t wh = parser.get_wait_handles().get_by_pid(pid); int status{};
if (wh && wh->completed) { uint64_t internal_job_id{};
event_fire(parser, *new_event_process_exit(pid, wh->status)); if (parser.get_wait_handles_ffi()->try_get_status_and_job_id(pid, true, status,
internal_job_id)) {
event_fire(parser, *new_event_process_exit(pid, status));
} }
} else if (ed.typ == event_type_t::job_exit) { } else if (ed.typ == event_type_t::job_exit) {
pid_t pid = ed.pid; pid_t pid = ed.pid;
if (pid == EVENT_ANY_PID) continue; if (pid == EVENT_ANY_PID) continue;
wait_handle_ref_t wh = parser.get_wait_handles().get_by_pid(pid); int status{};
if (wh && wh->completed) { uint64_t internal_job_id{};
event_fire(parser, *new_event_job_exit(pid, wh->internal_job_id)); if (parser.get_wait_handles_ffi()->try_get_status_and_job_id(pid, true, status,
internal_job_id)) {
event_fire(parser, *new_event_job_exit(pid, internal_job_id));
} }
} }
} }

View File

@ -49,7 +49,6 @@
#include "redirection.h" #include "redirection.h"
#include "timer.rs.h" #include "timer.rs.h"
#include "trace.h" #include "trace.h"
#include "wait_handle.h"
#include "wcstringutil.h" #include "wcstringutil.h"
#include "wutil.h" // IWYU pragma: keep #include "wutil.h" // IWYU pragma: keep
@ -917,7 +916,7 @@ static launch_result_t exec_process_in_job(parser_t &parser, process_t *p,
} }
// It's possible (though unlikely) that this is a background process which recycled a // It's possible (though unlikely) that this is a background process which recycled a
// pid from another, previous background process. Forget any such old process. // pid from another, previous background process. Forget any such old process.
parser.get_wait_handles().remove_by_pid(p->pid); parser.get_wait_handles_ffi()->remove_by_pid(p->pid);
break; break;
} }

View File

@ -99,7 +99,6 @@
#include "topic_monitor.h" #include "topic_monitor.h"
#include "utf8.h" #include "utf8.h"
#include "util.h" #include "util.h"
#include "wait_handle.h"
#include "wcstringutil.h" #include "wcstringutil.h"
#include "wgetopt.h" #include "wgetopt.h"
#include "wildcard.h" #include "wildcard.h"
@ -3407,42 +3406,6 @@ static void test_1_completion(wcstring line, const wcstring &completion, complet
do_test(cursor_pos == out_cursor_pos); do_test(cursor_pos == out_cursor_pos);
} }
static void test_wait_handles() {
say(L"Testing wait handles");
constexpr size_t limit = 4;
wait_handle_store_t whs(limit);
do_test(whs.size() == 0);
// Null handles ignored.
whs.add(wait_handle_ref_t{});
do_test(whs.size() == 0);
do_test(whs.get_by_pid(5) == nullptr);
// Duplicate pids drop oldest.
whs.add(std::make_shared<wait_handle_t>(5, 0, L"first"));
whs.add(std::make_shared<wait_handle_t>(5, 0, L"second"));
do_test(whs.size() == 1);
do_test(whs.get_by_pid(5)->base_name == L"second");
whs.remove_by_pid(123);
do_test(whs.size() == 1);
whs.remove_by_pid(5);
do_test(whs.size() == 0);
// Test evicting oldest.
whs.add(std::make_shared<wait_handle_t>(1, 0, L"1"));
whs.add(std::make_shared<wait_handle_t>(2, 0, L"2"));
whs.add(std::make_shared<wait_handle_t>(3, 0, L"3"));
whs.add(std::make_shared<wait_handle_t>(4, 0, L"4"));
whs.add(std::make_shared<wait_handle_t>(5, 0, L"5"));
do_test(whs.size() == 4);
auto start = whs.get_list().begin();
do_test(std::next(start, 0)->get()->base_name == L"5");
do_test(std::next(start, 1)->get()->base_name == L"4");
do_test(std::next(start, 2)->get()->base_name == L"3");
do_test(std::next(start, 3)->get()->base_name == L"2");
}
static void test_completion_insertions() { static void test_completion_insertions() {
#define TEST_1_COMPLETION(a, b, c, d, e) test_1_completion(a, b, c, d, e, __LINE__) #define TEST_1_COMPLETION(a, b, c, d, e) test_1_completion(a, b, c, d, e, __LINE__)
say(L"Testing completion insertions"); say(L"Testing completion insertions");
@ -6955,7 +6918,6 @@ static const test_t s_tests[]{
{TEST_GROUP("universal"), test_universal_formats}, {TEST_GROUP("universal"), test_universal_formats},
{TEST_GROUP("universal"), test_universal_ok_to_save}, {TEST_GROUP("universal"), test_universal_ok_to_save},
{TEST_GROUP("universal"), test_universal_notifiers}, {TEST_GROUP("universal"), test_universal_notifiers},
{TEST_GROUP("wait_handles"), test_wait_handles},
{TEST_GROUP("completion_insertions"), test_completion_insertions}, {TEST_GROUP("completion_insertions"), test_completion_insertions},
{TEST_GROUP("autosuggestion_ignores"), test_autosuggestion_ignores}, {TEST_GROUP("autosuggestion_ignores"), test_autosuggestion_ignores},
{TEST_GROUP("autosuggestion_combining"), test_autosuggestion_combining}, {TEST_GROUP("autosuggestion_combining"), test_autosuggestion_combining},

View File

@ -40,7 +40,9 @@ static wcstring user_presentable_path(const wcstring &path, const environment_t
} }
parser_t::parser_t(std::shared_ptr<env_stack_t> vars, bool is_principal) parser_t::parser_t(std::shared_ptr<env_stack_t> vars, bool is_principal)
: variables(std::move(vars)), is_principal_(is_principal) { : wait_handles(new_wait_handle_store_ffi()),
variables(std::move(vars)),
is_principal_(is_principal) {
assert(variables.get() && "Null variables in parser initializer"); assert(variables.get() && "Null variables in parser initializer");
int cwd = open_cloexec(".", O_RDONLY); int cwd = open_cloexec(".", O_RDONLY);
if (cwd < 0) { if (cwd < 0) {
@ -62,6 +64,10 @@ parser_t &parser_t::principal_parser() {
void parser_t::assert_can_execute() const { ASSERT_IS_MAIN_THREAD(); } void parser_t::assert_can_execute() const { ASSERT_IS_MAIN_THREAD(); }
rust::Box<WaitHandleStoreFFI> &parser_t::get_wait_handles_ffi() { return wait_handles; }
const rust::Box<WaitHandleStoreFFI> &parser_t::get_wait_handles_ffi() const { return wait_handles; }
int parser_t::set_var_and_fire(const wcstring &key, env_mode_flags_t mode, wcstring_list_t vals) { int parser_t::set_var_and_fire(const wcstring &key, env_mode_flags_t mode, wcstring_list_t vals) {
int res = vars().set(key, mode, std::move(vals)); int res = vars().set(key, mode, std::move(vals));
if (res == ENV_OK) { if (res == ENV_OK) {

View File

@ -265,7 +265,7 @@ class parser_t : public std::enable_shared_from_this<parser_t> {
/// Our store of recorded wait-handles. These are jobs that finished in the background, and have /// Our store of recorded wait-handles. These are jobs that finished in the background, and have
/// been reaped, but may still be wait'ed on. /// been reaped, but may still be wait'ed on.
wait_handle_store_t wait_handles; rust::Box<WaitHandleStoreFFI> wait_handles;
/// The list of blocks. This is a deque because we give out raw pointers to callers, who hold /// The list of blocks. This is a deque because we give out raw pointers to callers, who hold
/// them across manipulating this stack. /// them across manipulating this stack.
@ -395,8 +395,14 @@ class parser_t : public std::enable_shared_from_this<parser_t> {
const library_data_t &libdata() const { return library_data; } const library_data_t &libdata() const { return library_data; }
/// Get our wait handle store. /// Get our wait handle store.
wait_handle_store_t &get_wait_handles() { return wait_handles; } rust::Box<WaitHandleStoreFFI> &get_wait_handles_ffi();
const wait_handle_store_t &get_wait_handles() const { return wait_handles; } const rust::Box<WaitHandleStoreFFI> &get_wait_handles_ffi() const;
/// As get_wait_handles(), but void* pointer-to-Box to satisfy autocxx.
void *get_wait_handles_void() const {
const void *ptr = &get_wait_handles_ffi();
return const_cast<void *>(ptr);
}
/// Get and set the last proc statuses. /// Get and set the last proc statuses.
int get_last_status() const { return vars().get_last_status(); } int get_last_status() const { return vars().get_last_status(); }

View File

@ -290,17 +290,23 @@ bool process_t::is_internal() const {
return true; return true;
} }
wait_handle_ref_t process_t::make_wait_handle(internal_job_id_t jid) { rust::Box<WaitHandleRefFFI> *process_t::get_wait_handle_ffi() const { return wait_handle_.get(); }
rust::Box<WaitHandleRefFFI> *process_t::make_wait_handle_ffi(internal_job_id_t jid) {
if (type != process_type_t::external || pid <= 0) { if (type != process_type_t::external || pid <= 0) {
// Not waitable. // Not waitable.
return nullptr; return nullptr;
} }
if (!wait_handle_) { if (!wait_handle_) {
wait_handle_ = std::make_shared<wait_handle_t>(this->pid, jid, wbasename(this->actual_cmd)); wait_handle_ = make_unique<rust::Box<WaitHandleRefFFI>>(
new_wait_handle_ffi(this->pid, jid, wbasename(this->actual_cmd)));
} }
return wait_handle_; return wait_handle_.get();
} }
void *process_t::get_wait_handle_void() const { return get_wait_handle_ffi(); }
void *process_t::make_wait_handle_void(internal_job_id_t jid) { return make_wait_handle_ffi(jid); }
static uint64_t next_internal_job_id() { static uint64_t next_internal_job_id() {
static std::atomic<uint64_t> s_next{}; static std::atomic<uint64_t> s_next{};
return ++s_next; return ++s_next;
@ -632,20 +638,19 @@ static void remove_disowned_jobs(job_list_t &jobs) {
/// Given that a job has completed, check if it may be wait'ed on; if so add it to the wait handle /// Given that a job has completed, check if it may be wait'ed on; if so add it to the wait handle
/// store. Then mark all wait handles as complete. /// store. Then mark all wait handles as complete.
static void save_wait_handle_for_completed_job(const shared_ptr<job_t> &job, static void save_wait_handle_for_completed_job(const shared_ptr<job_t> &job,
wait_handle_store_t &store) { WaitHandleStoreFFI &store) {
assert(job && job->is_completed() && "Job null or not completed"); assert(job && job->is_completed() && "Job null or not completed");
// Are we a background job? // Are we a background job?
if (!job->is_foreground()) { if (!job->is_foreground()) {
for (auto &proc : job->processes) { for (auto &proc : job->processes) {
store.add(proc->make_wait_handle(job->internal_job_id)); store.add(proc->make_wait_handle_ffi(job->internal_job_id));
} }
} }
// Mark all wait handles as complete (but don't create just for this). // Mark all wait handles as complete (but don't create just for this).
for (auto &proc : job->processes) { for (auto &proc : job->processes) {
if (wait_handle_ref_t wh = proc->get_wait_handle()) { if (auto *wh = proc->get_wait_handle_ffi()) {
wh->status = proc->status.status_value(); (*wh)->set_status_and_complete(proc->status.status_value());
wh->completed = true;
} }
} }
} }
@ -712,7 +717,7 @@ static bool process_clean_after_marking(parser_t &parser, bool allow_interactive
// finished in the background. // finished in the background.
if (job_or_proc_wants_summary(j)) jobs_to_summarize.push_back(j); if (job_or_proc_wants_summary(j)) jobs_to_summarize.push_back(j);
generate_job_exit_events(j, &exit_events); generate_job_exit_events(j, &exit_events);
save_wait_handle_for_completed_job(j, parser.get_wait_handles()); save_wait_handle_for_completed_job(j, *parser.get_wait_handles_ffi());
// Remove it. // Remove it.
iter = parser.jobs().erase(iter); iter = parser.jobs().erase(iter);

View File

@ -242,7 +242,7 @@ class tty_transfer_t : nonmovable_t, noncopyable_t {
/// ///
/// If the process is of type process_type_t::function, argv is the argument vector, and argv[0] is /// If the process is of type process_type_t::function, argv is the argument vector, and argv[0] is
/// the name of the shellscript function. /// the name of the shellscript function.
class process_t : noncopyable_t { class process_t {
public: public:
process_t(); process_t();
@ -294,12 +294,16 @@ class process_t : noncopyable_t {
bool is_internal() const; bool is_internal() const;
/// \return the wait handle for the process, if it exists. /// \return the wait handle for the process, if it exists.
wait_handle_ref_t get_wait_handle() { return wait_handle_; } rust::Box<WaitHandleRefFFI> *get_wait_handle_ffi() const;
/// Create a wait handle for the process. /// Create a wait handle for the process.
/// As a process does not know its job id, we pass it in. /// As a process does not know its job id, we pass it in.
/// Note this will return null if the process is not waitable (has no pid). /// Note this will return null if the process is not waitable (has no pid).
wait_handle_ref_t make_wait_handle(internal_job_id_t jid); rust::Box<WaitHandleRefFFI> *make_wait_handle_ffi(internal_job_id_t jid);
/// Variants of get and make that return void*, to satisfy autocxx.
void *get_wait_handle_void() const;
void *make_wait_handle_void(internal_job_id_t jid);
/// Actual command to pass to exec in case of process_type_t::external or process_type_t::exec. /// Actual command to pass to exec in case of process_type_t::external or process_type_t::exec.
wcstring actual_cmd; wcstring actual_cmd;
@ -338,12 +342,18 @@ class process_t : noncopyable_t {
/// Number of jiffies spent in process at last cpu time check. /// Number of jiffies spent in process at last cpu time check.
clock_ticks_t last_jiffies{0}; clock_ticks_t last_jiffies{0};
process_t(process_t &&) = delete;
process_t &operator=(process_t &&) = delete;
process_t(const process_t &) = delete;
process_t &operator=(const process_t &) = delete;
private: private:
wcstring_list_t argv_; wcstring_list_t argv_;
rust::Box<redirection_spec_list_t> proc_redirection_specs_; rust::Box<redirection_spec_list_t> proc_redirection_specs_;
// The wait handle. This is constructed lazily, and cached. // The wait handle. This is constructed lazily, and cached.
wait_handle_ref_t wait_handle_{}; // This may be null.
std::unique_ptr<rust::Box<WaitHandleRefFFI>> wait_handle_;
}; };
using process_ptr_t = std::unique_ptr<process_t>; using process_ptr_t = std::unique_ptr<process_t>;

View File

@ -1,53 +0,0 @@
#include "config.h" // IWYU pragma: keep
#include "wait_handle.h"
#include <iterator>
wait_handle_store_t::wait_handle_store_t(size_t limit) : limit_(limit) {}
void wait_handle_store_t::add(wait_handle_ref_t wh) {
if (!wh || wh->pid <= 0) return;
pid_t pid = wh->pid;
remove_by_pid(wh->pid);
handles_.push_front(std::move(wh));
handle_map_[pid] = std::begin(handles_);
// Remove oldest until we reach our limit.
while (handles_.size() > limit_) {
handle_map_.erase(handles_.back()->pid);
handles_.pop_back();
}
}
void wait_handle_store_t::remove(const wait_handle_ref_t &wh) {
// Note: this differs from remove_by_pid because we verify that the handle is the same.
if (!wh) return;
auto iter = handle_map_.find(wh->pid);
if (iter != handle_map_.end() && *iter->second == wh) {
// Note this may deallocate the wait handle, leaving it dangling.
handles_.erase(iter->second);
handle_map_.erase(iter);
}
}
void wait_handle_store_t::remove_by_pid(pid_t pid) {
auto iter = handle_map_.find(pid);
if (iter != handle_map_.end()) {
handles_.erase(iter->second);
handle_map_.erase(iter);
}
}
wait_handle_ref_t wait_handle_store_t::get(size_t idx) const {
// TODO: this is O(N)!
assert(idx < handles_.size() && "index out of range");
return *std::next(std::begin(handles_), idx);
}
wait_handle_ref_t wait_handle_store_t::get_by_pid(pid_t pid) const {
auto iter = handle_map_.find(pid);
if (iter == handle_map_.end()) return nullptr;
return *iter->second;
}

View File

@ -1,97 +1,12 @@
// Support for handling pids that are no longer fish jobs.
// This includes pids that have been disowned ("forgotten") and background jobs which have finished,
// but may be wait'ed.
#ifndef FISH_WAIT_HANDLE_H #ifndef FISH_WAIT_HANDLE_H
#define FISH_WAIT_HANDLE_H #define FISH_WAIT_HANDLE_H
#include "config.h" // IWYU pragma: keep // Hacks to allow us to compile without Rust headers.
struct WaitHandleStoreFFI;
struct WaitHandleRefFFI;
#include <unistd.h> #if INCLUDE_RUST_HEADERS
#include "wait_handle.rs.h"
#include <list> #endif
#include <memory>
#include <unordered_map>
#include <utility>
#include "common.h"
/// The bits of a job necessary to support 'wait' and '--on-process-exit'.
/// This may outlive the job.
struct wait_handle_t {
/// Construct from a pid, job id, and base name.
wait_handle_t(pid_t pid, internal_job_id_t jid, wcstring name)
: pid(pid), internal_job_id(jid), base_name(std::move(name)) {}
/// The pid of this process.
const pid_t pid{};
/// The internal job id of the job which contained this process.
const internal_job_id_t internal_job_id{};
/// The "base name" of this process.
/// For example if the process is "/bin/sleep" then this will be 'sleep'.
const wcstring base_name{};
/// The value appropriate for populating $status, if completed.
int status{0};
/// Set to true when the process is completed.
bool completed{false};
/// Autocxx junk.
bool is_completed() const { return completed; }
int get_pid() const { return pid; }
const wcstring &get_base_name() const { return base_name; }
};
using wait_handle_ref_t = std::shared_ptr<wait_handle_t>;
/// Support for storing a list of wait handles, with a max limit set at initialization.
/// Note this class is not safe for concurrent access.
class wait_handle_store_t : noncopyable_t {
public:
// Our wait handles are arranged in a linked list for its iterator invalidation semantics: we
// may remove one without needing to update the map from pid -> handle.
using wait_handle_list_t = std::list<wait_handle_ref_t>;
/// Construct with a max limit on the number of handles we will remember.
/// The default is 1024, which is zsh's default.
explicit wait_handle_store_t(size_t limit = 1024);
/// Add a wait handle to the store. This may remove the oldest handle, if our limit is exceeded.
/// It may also remove any existing handle with that pid.
/// For convenience, this does nothing if wh is null.
void add(wait_handle_ref_t wh);
/// \return the wait handle for a pid, or nullptr if there is none.
/// This is a fast lookup.
wait_handle_ref_t get_by_pid(pid_t pid) const;
/// Remove a given wait handle, if present in this store.
void remove(const wait_handle_ref_t &wh);
/// Remove the wait handle for a pid, if present in this store.
void remove_by_pid(pid_t pid);
/// Get the list of all wait handles.
const wait_handle_list_t &get_list() const { return handles_; }
/// autocxx does not support std::list so allow accessing by index.
wait_handle_ref_t get(size_t idx) const;
/// Convenience to return the size, for testing.
size_t size() const { return handles_.size(); }
private:
using list_node_t = typename wait_handle_list_t::iterator;
/// The list of all wait handles. New ones come on the front, the last one is oldest.
wait_handle_list_t handles_{};
/// Map from pid to the wait handle's position in the list.
std::unordered_map<pid_t, list_node_t> handle_map_{};
/// Max supported wait handles.
const size_t limit_;
};
#endif #endif