diff --git a/src/fd_monitor.rs b/src/fd_monitor.rs index b9fb1df8b..698974029 100644 --- a/src/fd_monitor.rs +++ b/src/fd_monitor.rs @@ -192,10 +192,10 @@ impl From for FdMonitorItemId { } /// The callback type used by [`FdMonitorItem`]. It is passed a mutable reference to the -/// `FdMonitorItem`'s [`FdMonitorItem::fd`] and [the reason](ItemWakeupReason) for the wakeup. The -/// callback may close the fd, in which case the `FdMonitorItem` is removed from [`FdMonitor`]'s -/// set. -pub type Callback = Box; +/// `FdMonitorItem`'s [`FdMonitorItem::fd`] and [the reason](ItemWakeupReason) for the wakeup. +/// It should return an [`ItemAction`] to indicate whether the item should be removed from the +/// [`FdMonitor`] set. +pub type Callback = Box ItemAction + Send + Sync>; /// An item containing an fd and callback, which can be monitored to watch when it becomes readable /// and invoke the callback. @@ -214,11 +214,9 @@ pub struct FdMonitorItem { item_id: FdMonitorItemId, } -/// Unlike C++, rust's `Vec` has `Vec::retain()` instead of `std::remove_if(...)` with the inverse -/// logic. It's hard to keep track of which bool means what across the different layers, so be more -/// explicit. +/// A value returned by the callback to indicate what to do with the item. #[derive(PartialEq, Eq)] -enum ItemAction { +pub enum ItemAction { Remove, Retain, } @@ -257,12 +255,9 @@ impl FdMonitorItem { } else { ItemWakeReason::Timeout }; - (self.callback)(&mut self.fd, reason); - if !self.fd.is_valid() { - result = ItemAction::Remove; - } + result = (self.callback)(&mut self.fd, reason); } - return result; + result } /// Invoke this item's callback with a poke, if its id is present in the sorted poke list. @@ -272,12 +267,7 @@ impl FdMonitorItem { return ItemAction::Retain; } - (self.callback)(&mut self.fd, ItemWakeReason::Poke); - // Return `ItemAction::Remove` if the callback closed the fd - match self.fd.is_valid() { - true => ItemAction::Retain, - false => ItemAction::Remove, - } + (self.callback)(&mut self.fd, ItemWakeReason::Poke) } pub fn new(fd: AutoCloseFd, timeout: Option, callback: Callback) -> Self { @@ -483,11 +473,11 @@ impl BackgroundFdMonitor { // A predicate which services each item in turn, returning true if it should be removed let servicer = |item: &mut FdMonitorItem| { let fd = item.fd.as_raw_fd(); - if item.service_item(&fds, &now) == ItemAction::Remove { + let action = item.service_item(&fds, &now); + if action == ItemAction::Remove { FLOG!(fd_monitor, "Removing fd", fd); - return ItemAction::Remove; } - return ItemAction::Retain; + action }; // Service all items that are either readable or have timed out, and remove any which diff --git a/src/io.rs b/src/io.rs index 5d3be4a28..f6ed43d09 100644 --- a/src/io.rs +++ b/src/io.rs @@ -1,6 +1,8 @@ use crate::builtins::shared::{STATUS_CMD_ERROR, STATUS_CMD_OK, STATUS_READ_TOO_MUCH}; use crate::common::{str2wcstring, wcs2string, EMPTY_STRING}; -use crate::fd_monitor::{Callback, FdMonitor, FdMonitorItem, FdMonitorItemId, ItemWakeReason}; +use crate::fd_monitor::{ + Callback, FdMonitor, FdMonitorItem, FdMonitorItemId, ItemAction, ItemWakeReason, +}; use crate::fds::{ make_autoclose_pipes, make_fd_nonblocking, wopen_cloexec, AutoCloseFd, PIPE_ERROR, }; @@ -580,7 +582,9 @@ fn begin_filling(iobuffer: &Arc, fd: AutoCloseFd) { } done = true; } - if done { + if !done { + ItemAction::Retain + } else { fd.close(); let (mutex, condvar) = &*promise; { @@ -588,6 +592,7 @@ fn begin_filling(iobuffer: &Arc, fd: AutoCloseFd) { *done = true; } condvar.notify_one(); + ItemAction::Remove } }) }; diff --git a/src/tests/fd_monitor.rs b/src/tests/fd_monitor.rs index f6b16309a..8712a955f 100644 --- a/src/tests/fd_monitor.rs +++ b/src/tests/fd_monitor.rs @@ -5,7 +5,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use crate::fd_monitor::{ - FdEventSignaller, FdMonitor, FdMonitorItem, FdMonitorItemId, ItemWakeReason, + FdEventSignaller, FdMonitor, FdMonitorItem, FdMonitorItemId, ItemAction, ItemWakeReason, }; use crate::fds::{make_autoclose_pipes, AutoCloseFd}; use crate::tests::prelude::*; @@ -51,9 +51,7 @@ impl ItemMaker { let result = Arc::new(result); let callback = { let result = Arc::clone(&result); - move |fd: &mut AutoCloseFd, reason: ItemWakeReason| { - result.callback(fd, reason); - } + move |fd: &mut AutoCloseFd, reason: ItemWakeReason| result.callback(fd, reason) }; let item = FdMonitorItem::new(pipes.read, timeout, Box::new(callback)); let item_id = monitor.add(item); @@ -66,7 +64,7 @@ impl ItemMaker { self.item_id.load(Ordering::Relaxed).into() } - fn callback(&self, fd: &mut AutoCloseFd, reason: ItemWakeReason) { + fn callback(&self, fd: &mut AutoCloseFd, reason: ItemWakeReason) -> ItemAction { let mut was_closed = false; match reason { @@ -89,6 +87,9 @@ impl ItemMaker { self.total_calls.fetch_add(1, Ordering::Relaxed); if self.always_exit || was_closed { fd.close(); + ItemAction::Remove + } else { + ItemAction::Retain } }