Port fd_monitor tests to rust

This shows some of the ugliness of the rust borrow checker when it comes to
safely implementing any sort of recursive access and the need to be overly
explicit about which types are actually used across threads and which aren't.

We're forced to use an `Arc` for `ItemMaker` (née `item_maker_t`) because
there's no other way to make it clear that its lifetime will last longer than
the FdMonitor's. But once we've created an `Arc<T>` we can't call
`Arc::get_mut()` to get an `&mut T` once we've created even a single weak
reference to the Arc (because that weak ref could be upgraded to a strong ref at
any time). This means we need to finish configuring any non-atomic properties
(such as `ItemMaker::always_exit`) before we initialize the callback (which
needs an `Arc<ItemMaker>` to do its thing).

Because rust doesn't like self-referential types and because of the fact that we
now need to create both the `ItemMaker` and the `FdMonitorItem` separately
before we set the callback (at which point it becomes impossible to get a
mutable reference to the `ItemMaker`), `ItemMaker::item` is dropped from the
struct and we instead have the "constructor" for `ItemMaker` take a reference to
an `FdMonitor` instance and directly add itself to the monitor's set, meaning we
don't need to move the item out of the `ItemMaker` in order to add it to the
`FdMonitor` set later.
This commit is contained in:
Mahmoud Al-Qudsi 2023-03-04 23:49:17 -06:00
parent 83a220a532
commit 455b744bca
4 changed files with 192 additions and 141 deletions

View File

@ -41,3 +41,6 @@ mod abbrs;
mod builtins;
mod env;
mod re;
// Don't use `#[cfg(test)]` here to make sure ffi tests are built and tested
mod tests;

View File

@ -0,0 +1,188 @@
use std::os::fd::AsRawFd;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use crate::common::write_loop;
use crate::fd_monitor::{FdMonitor, FdMonitorItem, FdMonitorItemId, ItemWakeReason};
use crate::fds::{make_autoclose_pipes, AutoCloseFd};
use crate::ffi_tests::add_test;
/// Helper to make an item which counts how many times its callback was invoked.
///
/// This could be structured differently to avoid the `Mutex` on `writer`, but it's not worth it
/// since this is just used for test purposes.
struct ItemMaker {
pub did_timeout: AtomicBool,
pub length_read: AtomicUsize,
pub pokes: AtomicUsize,
pub total_calls: AtomicUsize,
item_id: AtomicU64,
pub always_exit: bool,
pub writer: Mutex<AutoCloseFd>,
}
impl ItemMaker {
pub fn insert_new_into(monitor: &FdMonitor, timeout: Option<Duration>) -> Arc<Self> {
Self::insert_new_into2(monitor, timeout, |_| {})
}
pub fn insert_new_into2<F: Fn(&mut Self)>(
monitor: &FdMonitor,
timeout: Option<Duration>,
config: F,
) -> Arc<Self> {
let pipes = make_autoclose_pipes().expect("fds exhausted!");
let mut item = FdMonitorItem::new(pipes.read, timeout, None);
let mut result = ItemMaker {
did_timeout: false.into(),
length_read: 0.into(),
pokes: 0.into(),
total_calls: 0.into(),
item_id: 0.into(),
always_exit: false,
writer: Mutex::new(pipes.write),
};
config(&mut result);
let result = Arc::new(result);
let callback = {
let result = Arc::clone(&result);
move |fd: &mut AutoCloseFd, reason: ItemWakeReason| {
result.callback(fd, reason);
}
};
item.set_callback(Box::new(callback));
let item_id = monitor.add(item);
result.item_id.store(u64::from(item_id), Ordering::Relaxed);
result
}
fn item_id(&self) -> FdMonitorItemId {
self.item_id.load(Ordering::Relaxed).into()
}
fn callback(&self, fd: &mut AutoCloseFd, reason: ItemWakeReason) {
let mut was_closed = false;
match reason {
ItemWakeReason::Timeout => {
self.did_timeout.store(true, Ordering::Relaxed);
}
ItemWakeReason::Poke => {
self.pokes.fetch_add(1, Ordering::Relaxed);
}
ItemWakeReason::Readable => {
let mut buf = [0u8; 1024];
let amt =
unsafe { libc::read(fd.as_raw_fd(), buf.as_mut_ptr() as *mut _, buf.len()) };
assert_ne!(amt, -1, "read error!");
self.length_read.fetch_add(amt as usize, Ordering::Relaxed);
was_closed = amt == 0;
}
_ => unreachable!(),
}
self.total_calls.fetch_add(1, Ordering::Relaxed);
if self.always_exit || was_closed {
fd.close();
}
}
/// Write 42 bytes to our write end.
fn write42(&self) {
let buf = [0u8; 42];
let mut writer = self.writer.lock().expect("Mutex poisoned!");
write_loop(&mut *writer, &buf).expect("Error writing 42 bytes to pipe!");
}
}
add_test!("fd_monitor_items", || {
let monitor = FdMonitor::new();
// Items which will never receive data or be called.
let item_never = ItemMaker::insert_new_into(&monitor, None);
let item_huge_timeout =
ItemMaker::insert_new_into(&monitor, Some(Duration::from_millis(100_000_000)));
// Item which should get no data and time out.
let item0_timeout = ItemMaker::insert_new_into(&monitor, Some(Duration::from_millis(16)));
// Item which should get exactly 42 bytes then time out.
let item42_timeout = ItemMaker::insert_new_into(&monitor, Some(Duration::from_millis(16)));
// Item which should get exactly 42 bytes and not time out.
let item42_no_timeout = ItemMaker::insert_new_into(&monitor, None);
// Item which should get 42 bytes then get notified it is closed.
let item42_then_close = ItemMaker::insert_new_into(&monitor, Some(Duration::from_millis(16)));
// Item which gets one poke.
let item_pokee = ItemMaker::insert_new_into(&monitor, None);
// Item which should get a callback exactly once.
let item_oneshot =
ItemMaker::insert_new_into2(&monitor, Some(Duration::from_millis(16)), |item| {
item.always_exit = true;
});
item42_timeout.write42();
item42_no_timeout.write42();
item42_then_close.write42();
item42_then_close
.writer
.lock()
.expect("Mutex poisoned!")
.close();
item_oneshot.write42();
monitor.poke_item(item_pokee.item_id());
// May need to loop here to ensure our fd_monitor gets scheduled. See #7699.
for _ in 0..100 {
std::thread::sleep(Duration::from_millis(84));
if item0_timeout.did_timeout.load(Ordering::Relaxed) {
break;
}
}
drop(monitor);
assert_eq!(item_never.did_timeout.load(Ordering::Relaxed), false);
assert_eq!(item_never.length_read.load(Ordering::Relaxed), 0);
assert_eq!(item_never.pokes.load(Ordering::Relaxed), 0);
assert_eq!(item_huge_timeout.did_timeout.load(Ordering::Relaxed), false);
assert_eq!(item_huge_timeout.length_read.load(Ordering::Relaxed), 0);
assert_eq!(item_huge_timeout.pokes.load(Ordering::Relaxed), 0);
assert_eq!(item0_timeout.length_read.load(Ordering::Relaxed), 0);
assert_eq!(item0_timeout.did_timeout.load(Ordering::Relaxed), true);
assert_eq!(item0_timeout.pokes.load(Ordering::Relaxed), 0);
assert_eq!(item42_timeout.length_read.load(Ordering::Relaxed), 42);
assert_eq!(item42_timeout.did_timeout.load(Ordering::Relaxed), true);
assert_eq!(item42_timeout.pokes.load(Ordering::Relaxed), 0);
assert_eq!(item42_no_timeout.length_read.load(Ordering::Relaxed), 42);
assert_eq!(item42_no_timeout.did_timeout.load(Ordering::Relaxed), false);
assert_eq!(item42_no_timeout.pokes.load(Ordering::Relaxed), 0);
assert_eq!(item42_then_close.did_timeout.load(Ordering::Relaxed), false);
assert_eq!(item42_then_close.length_read.load(Ordering::Relaxed), 42);
assert_eq!(item42_then_close.total_calls.load(Ordering::Relaxed), 2);
assert_eq!(item42_then_close.pokes.load(Ordering::Relaxed), 0);
assert_eq!(item_oneshot.did_timeout.load(Ordering::Relaxed), false);
assert_eq!(item_oneshot.length_read.load(Ordering::Relaxed), 42);
assert_eq!(item_oneshot.total_calls.load(Ordering::Relaxed), 1);
assert_eq!(item_oneshot.pokes.load(Ordering::Relaxed), 0);
assert_eq!(item_pokee.did_timeout.load(Ordering::Relaxed), false);
assert_eq!(item_pokee.length_read.load(Ordering::Relaxed), 0);
assert_eq!(item_pokee.total_calls.load(Ordering::Relaxed), 1);
assert_eq!(item_pokee.pokes.load(Ordering::Relaxed), 1);
});

View File

@ -0,0 +1 @@
mod fd_monitor;

View File

@ -800,146 +800,6 @@ static void test_tokenizer() {
err(L"redirection_type_for_string failed on line %ld", (long)__LINE__);
}
static void test_fd_monitor() {
say(L"Testing fd_monitor");
// Helper to make an item which counts how many times its callback is invoked.
struct item_maker_t : public noncopyable_t {
std::atomic<bool> did_timeout{false};
std::atomic<size_t> length_read{0};
std::atomic<size_t> pokes{0};
std::atomic<size_t> total_calls{0};
uint64_t item_id{0};
bool always_exit{false};
std::unique_ptr<rust::Box<fd_monitor_item_t>> item;
autoclose_fd_t writer;
void callback(autoclose_fd_t2 &fd, item_wake_reason_t reason) {
bool was_closed = false;
switch (reason) {
case item_wake_reason_t::Timeout:
this->did_timeout = true;
break;
case item_wake_reason_t::Poke:
this->pokes += 1;
break;
case item_wake_reason_t::Readable:
char buff[4096];
ssize_t amt = read(fd.fd(), buff, sizeof buff);
this->length_read += amt;
was_closed = (amt == 0);
break;
}
total_calls += 1;
if (always_exit || was_closed) {
fd.close();
}
}
static void trampoline(autoclose_fd_t2 &fd, item_wake_reason_t reason, uint8_t *param) {
auto &instance = *(item_maker_t *)(param);
instance.callback(fd, reason);
}
explicit item_maker_t(uint64_t timeout_usec) {
auto pipes = make_autoclose_pipes().acquire();
writer = std::move(pipes.write);
item = std::make_unique<rust::Box<fd_monitor_item_t>>(
make_fd_monitor_item_t(pipes.read.acquire(), timeout_usec,
(uint8_t *)item_maker_t::trampoline, (uint8_t *)this));
}
// Write 42 bytes to our write end.
void write42() const {
char buff[42] = {0};
(void)write_loop(writer.fd(), buff, sizeof buff);
}
};
constexpr uint64_t usec_per_msec = 1000;
// Items which will never receive data or be called back.
item_maker_t item_never(kNoTimeout);
item_maker_t item_hugetimeout(100000000LLU * usec_per_msec);
// Item which should get no data, and time out.
item_maker_t item0_timeout(16 * usec_per_msec);
// Item which should get exactly 42 bytes, then time out.
item_maker_t item42_timeout(16 * usec_per_msec);
// Item which should get exactly 42 bytes, and not time out.
item_maker_t item42_nottimeout(kNoTimeout);
// Item which should get 42 bytes, then get notified it is closed.
item_maker_t item42_thenclose(16 * usec_per_msec);
// Item which gets one poke.
item_maker_t item_pokee(kNoTimeout);
// Item which should be called back once.
item_maker_t item_oneshot(16 * usec_per_msec);
item_oneshot.always_exit = true;
{
auto monitor = make_fd_monitor_t();
for (item_maker_t *item :
{&item_never, &item_hugetimeout, &item0_timeout, &item42_timeout, &item42_nottimeout,
&item42_thenclose, &item_pokee, &item_oneshot}) {
item->item_id = monitor->add(std::move(*(std::move(item->item))));
}
item42_timeout.write42();
item42_nottimeout.write42();
item42_thenclose.write42();
item42_thenclose.writer.close();
item_oneshot.write42();
monitor->poke_item(item_pokee.item_id);
// May need to loop here to ensure our fd_monitor gets scheduled - see #7699.
for (int i = 0; i < 100; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(84));
if (item0_timeout.did_timeout) {
break;
}
}
}
do_test(!item_never.did_timeout);
do_test(item_never.length_read == 0);
do_test(item_never.pokes == 0);
do_test(!item_hugetimeout.did_timeout);
do_test(item_hugetimeout.length_read == 0);
do_test(item_hugetimeout.pokes == 0);
do_test(item0_timeout.length_read == 0);
do_test(item0_timeout.did_timeout);
do_test(item0_timeout.pokes == 0);
do_test(item42_timeout.length_read == 42);
do_test(item42_timeout.did_timeout);
do_test(item42_timeout.pokes == 0);
do_test(item42_nottimeout.length_read == 42);
do_test(!item42_nottimeout.did_timeout);
do_test(item42_nottimeout.pokes == 0);
do_test(item42_thenclose.did_timeout == false);
do_test(item42_thenclose.length_read == 42);
do_test(item42_thenclose.total_calls == 2);
do_test(item42_thenclose.pokes == 0);
do_test(!item_oneshot.did_timeout);
do_test(item_oneshot.length_read == 42);
do_test(item_oneshot.total_calls == 1);
do_test(item_oneshot.pokes == 0);
do_test(!item_pokee.did_timeout);
do_test(item_pokee.length_read == 0);
do_test(item_pokee.total_calls == 1);
do_test(item_pokee.pokes == 1);
}
static void test_iothread() {
say(L"Testing iothreads");
std::atomic<int> shared_int{0};
@ -7054,7 +6914,6 @@ static const test_t s_tests[]{
{TEST_GROUP("perf_convert_ascii"), perf_convert_ascii, true},
{TEST_GROUP("convert_nulls"), test_convert_nulls},
{TEST_GROUP("tokenizer"), test_tokenizer},
{TEST_GROUP("fd_monitor"), test_fd_monitor},
{TEST_GROUP("iothread"), test_iothread},
{TEST_GROUP("pthread"), test_pthread},
{TEST_GROUP("debounce"), test_debounce},