Remove some FFI bits from FdMonitor

This commit is contained in:
ridiculousfish 2024-01-13 12:10:24 -08:00
parent 30f70f02de
commit d8da79717e
3 changed files with 40 additions and 86 deletions

View File

@ -191,31 +191,20 @@ impl From<u64> for FdMonitorItemId {
}
}
pub type NativeCallback = Box<dyn Fn(&mut AutoCloseFd, ItemWakeReason) + Send + Sync>;
/// The callback type used by [`FdMonitorItem`]. It is passed a mutable reference to the
/// `FdMonitorItem`'s [`FdMonitorItem::fd`] and [the reason](ItemWakeupReason) for the wakeup. The
/// callback may close the fd, in which case the `FdMonitorItem` is removed from [`FdMonitor`]'s
/// set.
///
/// As capturing C++ closures can't be safely used via ffi interop and cxx bridge doesn't support
/// passing typed `fn(...)` pointers from C++ to rust, we have a separate variant of the type that
/// uses the C abi to invoke a callback. This will be removed when the dependent C++ code (currently
/// only `src/io.cpp`) is ported to rust
enum FdMonitorCallback {
None,
Native(NativeCallback),
}
pub type Callback = Box<dyn Fn(&mut AutoCloseFd, ItemWakeReason) + Send + Sync>;
/// An item containing an fd and callback, which can be monitored to watch when it becomes readable
/// and invoke the callback.
pub struct FdMonitorItem {
/// The fd to monitor
fd: AutoCloseFd,
/// A callback to be invoked when the fd is readable, or when we are timed out. If we time out,
/// then timed_out will be true. If the fd is invalid on return from the function, then the item
/// is removed from the [`FdMonitor`] set.
callback: FdMonitorCallback,
/// A callback to be invoked when the fd is readable, or for another reason given by the wake reason.
/// If the fd is invalid on return from the function, then the item is removed from the [`FdMonitor`] set.
callback: Callback,
/// The timeout associated with waiting on this item or `None` to wait indefinitely. A timeout
/// of `0` is not supported.
timeout: Option<Duration>,
@ -268,10 +257,7 @@ impl FdMonitorItem {
} else {
ItemWakeReason::Timeout
};
match &self.callback {
FdMonitorCallback::None => panic!("Callback not assigned!"),
FdMonitorCallback::Native(callback) => (callback)(&mut self.fd, reason),
}
(self.callback)(&mut self.fd, reason);
if !self.fd.is_valid() {
result = ItemAction::Remove;
}
@ -286,10 +272,7 @@ impl FdMonitorItem {
return ItemAction::Retain;
}
match &self.callback {
FdMonitorCallback::None => panic!("Callback not assigned!"),
FdMonitorCallback::Native(callback) => (callback)(&mut self.fd, ItemWakeReason::Poke),
}
(self.callback)(&mut self.fd, ItemWakeReason::Poke);
// Return `ItemAction::Remove` if the callback closed the fd
match self.fd.is_valid() {
true => ItemAction::Retain,
@ -297,38 +280,15 @@ impl FdMonitorItem {
}
}
pub fn new(
fd: AutoCloseFd,
timeout: Option<Duration>,
callback: Option<NativeCallback>,
) -> Self {
pub fn new(fd: AutoCloseFd, timeout: Option<Duration>, callback: Callback) -> Self {
FdMonitorItem {
fd,
timeout,
callback: match callback {
Some(callback) => FdMonitorCallback::Native(callback),
None => FdMonitorCallback::None,
},
callback,
item_id: FdMonitorItemId(0),
last_time: None,
}
}
pub fn set_callback(&mut self, callback: NativeCallback) {
self.callback = FdMonitorCallback::Native(callback);
}
}
impl Default for FdMonitorItem {
fn default() -> Self {
Self {
callback: FdMonitorCallback::None,
fd: AutoCloseFd::empty(),
timeout: None,
last_time: None,
item_id: FdMonitorItemId(0),
}
}
}
/// A thread-safe class which can monitor a set of fds, invoking a callback when any becomes

View File

@ -1,8 +1,6 @@
use crate::builtins::shared::{STATUS_CMD_ERROR, STATUS_CMD_OK, STATUS_READ_TOO_MUCH};
use crate::common::{str2wcstring, wcs2string, EMPTY_STRING};
use crate::fd_monitor::{
FdMonitor, FdMonitorItem, FdMonitorItemId, ItemWakeReason, NativeCallback,
};
use crate::fd_monitor::{Callback, FdMonitor, FdMonitorItem, FdMonitorItemId, ItemWakeReason};
use crate::fds::{
make_autoclose_pipes, make_fd_nonblocking, wopen_cloexec, AutoCloseFd, PIPE_ERROR,
};
@ -557,44 +555,41 @@ fn begin_filling(iobuffer: &Arc<IoBuffer>, fd: AutoCloseFd) {
iobuffer.fill_waiter.replace(Some(promise.clone()));
// Run our function to read until the receiver is closed.
// It's OK to capture 'buffer' because 'this' waits for the promise in its dtor.
let item_callback: Option<NativeCallback> = {
let item_callback: Callback = {
let iobuffer = iobuffer.clone();
Some(Box::new(
move |fd: &mut AutoCloseFd, reason: ItemWakeReason| {
// Only check the shutdown flag if we timed out or were poked.
// It's important that if select() indicated we were readable, that we call select() again
// allowing it to time out. Note the typical case is that the fd will be closed, in which
// case select will return immediately.
let mut done = false;
if reason == ItemWakeReason::Readable {
// select() reported us as readable; read a bit.
let mut buf = iobuffer.buffer.lock().unwrap();
Box::new(move |fd: &mut AutoCloseFd, reason: ItemWakeReason| {
// Only check the shutdown flag if we timed out or were poked.
// It's important that if select() indicated we were readable, that we call select() again
// allowing it to time out. Note the typical case is that the fd will be closed, in which
// case select will return immediately.
let mut done = false;
if reason == ItemWakeReason::Readable {
// select() reported us as readable; read a bit.
let mut buf = iobuffer.buffer.lock().unwrap();
let ret = IoBuffer::read_once(fd.fd(), &mut buf);
done = ret == 0 || (ret < 0 && ![EAGAIN, EWOULDBLOCK].contains(&errno::errno().0));
} else if iobuffer.shutdown_fillthread.load() {
// Here our caller asked us to shut down; read while we keep getting data.
// This will stop when the fd is closed or if we get EAGAIN.
let mut buf = iobuffer.buffer.lock().unwrap();
loop {
let ret = IoBuffer::read_once(fd.fd(), &mut buf);
done =
ret == 0 || (ret < 0 && ![EAGAIN, EWOULDBLOCK].contains(&errno::errno().0));
} else if iobuffer.shutdown_fillthread.load() {
// Here our caller asked us to shut down; read while we keep getting data.
// This will stop when the fd is closed or if we get EAGAIN.
let mut buf = iobuffer.buffer.lock().unwrap();
loop {
let ret = IoBuffer::read_once(fd.fd(), &mut buf);
if ret <= 0 {
break;
}
if ret <= 0 {
break;
}
done = true;
}
if done {
fd.close();
let (mutex, condvar) = &*promise;
{
let mut done = mutex.lock().unwrap();
*done = true;
}
condvar.notify_one();
done = true;
}
if done {
fd.close();
let (mutex, condvar) = &*promise;
{
let mut done = mutex.lock().unwrap();
*done = true;
}
},
))
condvar.notify_one();
}
})
};
let item_id = fd_monitor().add(FdMonitorItem::new(fd, None, item_callback));

View File

@ -35,7 +35,6 @@ impl ItemMaker {
config: F,
) -> Arc<Self> {
let pipes = make_autoclose_pipes().expect("fds exhausted!");
let mut item = FdMonitorItem::new(pipes.read, timeout, None);
let mut result = ItemMaker {
did_timeout: false.into(),
@ -56,7 +55,7 @@ impl ItemMaker {
result.callback(fd, reason);
}
};
item.set_callback(Box::new(callback));
let item = FdMonitorItem::new(pipes.read, timeout, Box::new(callback));
let item_id = monitor.add(item);
result.item_id.store(u64::from(item_id), Ordering::Relaxed);