From 455b744bca1292a0b2ddce51413a5a234ca32f04 Mon Sep 17 00:00:00 2001 From: Mahmoud Al-Qudsi Date: Sat, 4 Mar 2023 23:49:17 -0600 Subject: [PATCH] Port fd_monitor tests to rust MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This shows some of the ugliness of the rust borrow checker when it comes to safely implementing any sort of recursive access and the need to be overly explicit about which types are actually used across threads and which aren't. We're forced to use an `Arc` for `ItemMaker` (née `item_maker_t`) because there's no other way to make it clear that its lifetime will last longer than the FdMonitor's. But once we've created an `Arc` we can't call `Arc::get_mut()` to get an `&mut T` once we've created even a single weak reference to the Arc (because that weak ref could be upgraded to a strong ref at any time). This means we need to finish configuring any non-atomic properties (such as `ItemMaker::always_exit`) before we initialize the callback (which needs an `Arc` to do its thing). Because rust doesn't like self-referential types and because of the fact that we now need to create both the `ItemMaker` and the `FdMonitorItem` separately before we set the callback (at which point it becomes impossible to get a mutable reference to the `ItemMaker`), `ItemMaker::item` is dropped from the struct and we instead have the "constructor" for `ItemMaker` take a reference to an `FdMonitor` instance and directly add itself to the monitor's set, meaning we don't need to move the item out of the `ItemMaker` in order to add it to the `FdMonitor` set later. --- fish-rust/src/lib.rs | 3 + fish-rust/src/tests/fd_monitor.rs | 188 ++++++++++++++++++++++++++++++ fish-rust/src/tests/mod.rs | 1 + src/fish_tests.cpp | 141 ---------------------- 4 files changed, 192 insertions(+), 141 deletions(-) create mode 100644 fish-rust/src/tests/fd_monitor.rs create mode 100644 fish-rust/src/tests/mod.rs diff --git a/fish-rust/src/lib.rs b/fish-rust/src/lib.rs index 25a1752fc..6dd4b8e17 100644 --- a/fish-rust/src/lib.rs +++ b/fish-rust/src/lib.rs @@ -41,3 +41,6 @@ mod abbrs; mod builtins; mod env; mod re; + +// Don't use `#[cfg(test)]` here to make sure ffi tests are built and tested +mod tests; diff --git a/fish-rust/src/tests/fd_monitor.rs b/fish-rust/src/tests/fd_monitor.rs new file mode 100644 index 000000000..6593e2f9b --- /dev/null +++ b/fish-rust/src/tests/fd_monitor.rs @@ -0,0 +1,188 @@ +use std::os::fd::AsRawFd; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use crate::common::write_loop; +use crate::fd_monitor::{FdMonitor, FdMonitorItem, FdMonitorItemId, ItemWakeReason}; +use crate::fds::{make_autoclose_pipes, AutoCloseFd}; +use crate::ffi_tests::add_test; + +/// Helper to make an item which counts how many times its callback was invoked. +/// +/// This could be structured differently to avoid the `Mutex` on `writer`, but it's not worth it +/// since this is just used for test purposes. +struct ItemMaker { + pub did_timeout: AtomicBool, + pub length_read: AtomicUsize, + pub pokes: AtomicUsize, + pub total_calls: AtomicUsize, + item_id: AtomicU64, + pub always_exit: bool, + pub writer: Mutex, +} + +impl ItemMaker { + pub fn insert_new_into(monitor: &FdMonitor, timeout: Option) -> Arc { + Self::insert_new_into2(monitor, timeout, |_| {}) + } + + pub fn insert_new_into2( + monitor: &FdMonitor, + timeout: Option, + config: F, + ) -> Arc { + let pipes = make_autoclose_pipes().expect("fds exhausted!"); + let mut item = FdMonitorItem::new(pipes.read, timeout, None); + + let mut result = ItemMaker { + did_timeout: false.into(), + length_read: 0.into(), + pokes: 0.into(), + total_calls: 0.into(), + item_id: 0.into(), + always_exit: false, + writer: Mutex::new(pipes.write), + }; + + config(&mut result); + + let result = Arc::new(result); + let callback = { + let result = Arc::clone(&result); + move |fd: &mut AutoCloseFd, reason: ItemWakeReason| { + result.callback(fd, reason); + } + }; + item.set_callback(Box::new(callback)); + let item_id = monitor.add(item); + result.item_id.store(u64::from(item_id), Ordering::Relaxed); + + result + } + + fn item_id(&self) -> FdMonitorItemId { + self.item_id.load(Ordering::Relaxed).into() + } + + fn callback(&self, fd: &mut AutoCloseFd, reason: ItemWakeReason) { + let mut was_closed = false; + + match reason { + ItemWakeReason::Timeout => { + self.did_timeout.store(true, Ordering::Relaxed); + } + ItemWakeReason::Poke => { + self.pokes.fetch_add(1, Ordering::Relaxed); + } + ItemWakeReason::Readable => { + let mut buf = [0u8; 1024]; + let amt = + unsafe { libc::read(fd.as_raw_fd(), buf.as_mut_ptr() as *mut _, buf.len()) }; + assert_ne!(amt, -1, "read error!"); + self.length_read.fetch_add(amt as usize, Ordering::Relaxed); + was_closed = amt == 0; + } + _ => unreachable!(), + } + + self.total_calls.fetch_add(1, Ordering::Relaxed); + if self.always_exit || was_closed { + fd.close(); + } + } + + /// Write 42 bytes to our write end. + fn write42(&self) { + let buf = [0u8; 42]; + let mut writer = self.writer.lock().expect("Mutex poisoned!"); + write_loop(&mut *writer, &buf).expect("Error writing 42 bytes to pipe!"); + } +} + +add_test!("fd_monitor_items", || { + let monitor = FdMonitor::new(); + + // Items which will never receive data or be called. + let item_never = ItemMaker::insert_new_into(&monitor, None); + let item_huge_timeout = + ItemMaker::insert_new_into(&monitor, Some(Duration::from_millis(100_000_000))); + + // Item which should get no data and time out. + let item0_timeout = ItemMaker::insert_new_into(&monitor, Some(Duration::from_millis(16))); + + // Item which should get exactly 42 bytes then time out. + let item42_timeout = ItemMaker::insert_new_into(&monitor, Some(Duration::from_millis(16))); + + // Item which should get exactly 42 bytes and not time out. + let item42_no_timeout = ItemMaker::insert_new_into(&monitor, None); + + // Item which should get 42 bytes then get notified it is closed. + let item42_then_close = ItemMaker::insert_new_into(&monitor, Some(Duration::from_millis(16))); + + // Item which gets one poke. + let item_pokee = ItemMaker::insert_new_into(&monitor, None); + + // Item which should get a callback exactly once. + let item_oneshot = + ItemMaker::insert_new_into2(&monitor, Some(Duration::from_millis(16)), |item| { + item.always_exit = true; + }); + + item42_timeout.write42(); + item42_no_timeout.write42(); + item42_then_close.write42(); + item42_then_close + .writer + .lock() + .expect("Mutex poisoned!") + .close(); + item_oneshot.write42(); + + monitor.poke_item(item_pokee.item_id()); + + // May need to loop here to ensure our fd_monitor gets scheduled. See #7699. + for _ in 0..100 { + std::thread::sleep(Duration::from_millis(84)); + if item0_timeout.did_timeout.load(Ordering::Relaxed) { + break; + } + } + + drop(monitor); + + assert_eq!(item_never.did_timeout.load(Ordering::Relaxed), false); + assert_eq!(item_never.length_read.load(Ordering::Relaxed), 0); + assert_eq!(item_never.pokes.load(Ordering::Relaxed), 0); + + assert_eq!(item_huge_timeout.did_timeout.load(Ordering::Relaxed), false); + assert_eq!(item_huge_timeout.length_read.load(Ordering::Relaxed), 0); + assert_eq!(item_huge_timeout.pokes.load(Ordering::Relaxed), 0); + + assert_eq!(item0_timeout.length_read.load(Ordering::Relaxed), 0); + assert_eq!(item0_timeout.did_timeout.load(Ordering::Relaxed), true); + assert_eq!(item0_timeout.pokes.load(Ordering::Relaxed), 0); + + assert_eq!(item42_timeout.length_read.load(Ordering::Relaxed), 42); + assert_eq!(item42_timeout.did_timeout.load(Ordering::Relaxed), true); + assert_eq!(item42_timeout.pokes.load(Ordering::Relaxed), 0); + + assert_eq!(item42_no_timeout.length_read.load(Ordering::Relaxed), 42); + assert_eq!(item42_no_timeout.did_timeout.load(Ordering::Relaxed), false); + assert_eq!(item42_no_timeout.pokes.load(Ordering::Relaxed), 0); + + assert_eq!(item42_then_close.did_timeout.load(Ordering::Relaxed), false); + assert_eq!(item42_then_close.length_read.load(Ordering::Relaxed), 42); + assert_eq!(item42_then_close.total_calls.load(Ordering::Relaxed), 2); + assert_eq!(item42_then_close.pokes.load(Ordering::Relaxed), 0); + + assert_eq!(item_oneshot.did_timeout.load(Ordering::Relaxed), false); + assert_eq!(item_oneshot.length_read.load(Ordering::Relaxed), 42); + assert_eq!(item_oneshot.total_calls.load(Ordering::Relaxed), 1); + assert_eq!(item_oneshot.pokes.load(Ordering::Relaxed), 0); + + assert_eq!(item_pokee.did_timeout.load(Ordering::Relaxed), false); + assert_eq!(item_pokee.length_read.load(Ordering::Relaxed), 0); + assert_eq!(item_pokee.total_calls.load(Ordering::Relaxed), 1); + assert_eq!(item_pokee.pokes.load(Ordering::Relaxed), 1); +}); diff --git a/fish-rust/src/tests/mod.rs b/fish-rust/src/tests/mod.rs new file mode 100644 index 000000000..46bb9838d --- /dev/null +++ b/fish-rust/src/tests/mod.rs @@ -0,0 +1 @@ +mod fd_monitor; diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index 81e6f2ebf..1fbf75e2d 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -800,146 +800,6 @@ static void test_tokenizer() { err(L"redirection_type_for_string failed on line %ld", (long)__LINE__); } -static void test_fd_monitor() { - say(L"Testing fd_monitor"); - - // Helper to make an item which counts how many times its callback is invoked. - struct item_maker_t : public noncopyable_t { - std::atomic did_timeout{false}; - std::atomic length_read{0}; - std::atomic pokes{0}; - std::atomic total_calls{0}; - uint64_t item_id{0}; - bool always_exit{false}; - std::unique_ptr> item; - autoclose_fd_t writer; - - void callback(autoclose_fd_t2 &fd, item_wake_reason_t reason) { - bool was_closed = false; - switch (reason) { - case item_wake_reason_t::Timeout: - this->did_timeout = true; - break; - case item_wake_reason_t::Poke: - this->pokes += 1; - break; - case item_wake_reason_t::Readable: - char buff[4096]; - ssize_t amt = read(fd.fd(), buff, sizeof buff); - this->length_read += amt; - was_closed = (amt == 0); - break; - } - total_calls += 1; - if (always_exit || was_closed) { - fd.close(); - } - } - - static void trampoline(autoclose_fd_t2 &fd, item_wake_reason_t reason, uint8_t *param) { - auto &instance = *(item_maker_t *)(param); - instance.callback(fd, reason); - } - - explicit item_maker_t(uint64_t timeout_usec) { - auto pipes = make_autoclose_pipes().acquire(); - writer = std::move(pipes.write); - item = std::make_unique>( - make_fd_monitor_item_t(pipes.read.acquire(), timeout_usec, - (uint8_t *)item_maker_t::trampoline, (uint8_t *)this)); - } - - // Write 42 bytes to our write end. - void write42() const { - char buff[42] = {0}; - (void)write_loop(writer.fd(), buff, sizeof buff); - } - }; - - constexpr uint64_t usec_per_msec = 1000; - - // Items which will never receive data or be called back. - item_maker_t item_never(kNoTimeout); - item_maker_t item_hugetimeout(100000000LLU * usec_per_msec); - - // Item which should get no data, and time out. - item_maker_t item0_timeout(16 * usec_per_msec); - - // Item which should get exactly 42 bytes, then time out. - item_maker_t item42_timeout(16 * usec_per_msec); - - // Item which should get exactly 42 bytes, and not time out. - item_maker_t item42_nottimeout(kNoTimeout); - - // Item which should get 42 bytes, then get notified it is closed. - item_maker_t item42_thenclose(16 * usec_per_msec); - - // Item which gets one poke. - item_maker_t item_pokee(kNoTimeout); - - // Item which should be called back once. - item_maker_t item_oneshot(16 * usec_per_msec); - item_oneshot.always_exit = true; - - { - auto monitor = make_fd_monitor_t(); - for (item_maker_t *item : - {&item_never, &item_hugetimeout, &item0_timeout, &item42_timeout, &item42_nottimeout, - &item42_thenclose, &item_pokee, &item_oneshot}) { - item->item_id = monitor->add(std::move(*(std::move(item->item)))); - } - item42_timeout.write42(); - item42_nottimeout.write42(); - item42_thenclose.write42(); - item42_thenclose.writer.close(); - item_oneshot.write42(); - monitor->poke_item(item_pokee.item_id); - - // May need to loop here to ensure our fd_monitor gets scheduled - see #7699. - for (int i = 0; i < 100; i++) { - std::this_thread::sleep_for(std::chrono::milliseconds(84)); - if (item0_timeout.did_timeout) { - break; - } - } - } - - do_test(!item_never.did_timeout); - do_test(item_never.length_read == 0); - do_test(item_never.pokes == 0); - - do_test(!item_hugetimeout.did_timeout); - do_test(item_hugetimeout.length_read == 0); - do_test(item_hugetimeout.pokes == 0); - - do_test(item0_timeout.length_read == 0); - do_test(item0_timeout.did_timeout); - do_test(item0_timeout.pokes == 0); - - do_test(item42_timeout.length_read == 42); - do_test(item42_timeout.did_timeout); - do_test(item42_timeout.pokes == 0); - - do_test(item42_nottimeout.length_read == 42); - do_test(!item42_nottimeout.did_timeout); - do_test(item42_nottimeout.pokes == 0); - - do_test(item42_thenclose.did_timeout == false); - do_test(item42_thenclose.length_read == 42); - do_test(item42_thenclose.total_calls == 2); - do_test(item42_thenclose.pokes == 0); - - do_test(!item_oneshot.did_timeout); - do_test(item_oneshot.length_read == 42); - do_test(item_oneshot.total_calls == 1); - do_test(item_oneshot.pokes == 0); - - do_test(!item_pokee.did_timeout); - do_test(item_pokee.length_read == 0); - do_test(item_pokee.total_calls == 1); - do_test(item_pokee.pokes == 1); -} - static void test_iothread() { say(L"Testing iothreads"); std::atomic shared_int{0}; @@ -7054,7 +6914,6 @@ static const test_t s_tests[]{ {TEST_GROUP("perf_convert_ascii"), perf_convert_ascii, true}, {TEST_GROUP("convert_nulls"), test_convert_nulls}, {TEST_GROUP("tokenizer"), test_tokenizer}, - {TEST_GROUP("fd_monitor"), test_fd_monitor}, {TEST_GROUP("iothread"), test_iothread}, {TEST_GROUP("pthread"), test_pthread}, {TEST_GROUP("debounce"), test_debounce},