diff --git a/fish-rust/src/lib.rs b/fish-rust/src/lib.rs index 25a1752fc..6dd4b8e17 100644 --- a/fish-rust/src/lib.rs +++ b/fish-rust/src/lib.rs @@ -41,3 +41,6 @@ mod abbrs; mod builtins; mod env; mod re; + +// Don't use `#[cfg(test)]` here to make sure ffi tests are built and tested +mod tests; diff --git a/fish-rust/src/tests/fd_monitor.rs b/fish-rust/src/tests/fd_monitor.rs new file mode 100644 index 000000000..6593e2f9b --- /dev/null +++ b/fish-rust/src/tests/fd_monitor.rs @@ -0,0 +1,188 @@ +use std::os::fd::AsRawFd; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use crate::common::write_loop; +use crate::fd_monitor::{FdMonitor, FdMonitorItem, FdMonitorItemId, ItemWakeReason}; +use crate::fds::{make_autoclose_pipes, AutoCloseFd}; +use crate::ffi_tests::add_test; + +/// Helper to make an item which counts how many times its callback was invoked. +/// +/// This could be structured differently to avoid the `Mutex` on `writer`, but it's not worth it +/// since this is just used for test purposes. +struct ItemMaker { + pub did_timeout: AtomicBool, + pub length_read: AtomicUsize, + pub pokes: AtomicUsize, + pub total_calls: AtomicUsize, + item_id: AtomicU64, + pub always_exit: bool, + pub writer: Mutex, +} + +impl ItemMaker { + pub fn insert_new_into(monitor: &FdMonitor, timeout: Option) -> Arc { + Self::insert_new_into2(monitor, timeout, |_| {}) + } + + pub fn insert_new_into2( + monitor: &FdMonitor, + timeout: Option, + config: F, + ) -> Arc { + let pipes = make_autoclose_pipes().expect("fds exhausted!"); + let mut item = FdMonitorItem::new(pipes.read, timeout, None); + + let mut result = ItemMaker { + did_timeout: false.into(), + length_read: 0.into(), + pokes: 0.into(), + total_calls: 0.into(), + item_id: 0.into(), + always_exit: false, + writer: Mutex::new(pipes.write), + }; + + config(&mut result); + + let result = Arc::new(result); + let callback = { + let result = Arc::clone(&result); + move |fd: &mut AutoCloseFd, reason: ItemWakeReason| { + result.callback(fd, reason); + } + }; + item.set_callback(Box::new(callback)); + let item_id = monitor.add(item); + result.item_id.store(u64::from(item_id), Ordering::Relaxed); + + result + } + + fn item_id(&self) -> FdMonitorItemId { + self.item_id.load(Ordering::Relaxed).into() + } + + fn callback(&self, fd: &mut AutoCloseFd, reason: ItemWakeReason) { + let mut was_closed = false; + + match reason { + ItemWakeReason::Timeout => { + self.did_timeout.store(true, Ordering::Relaxed); + } + ItemWakeReason::Poke => { + self.pokes.fetch_add(1, Ordering::Relaxed); + } + ItemWakeReason::Readable => { + let mut buf = [0u8; 1024]; + let amt = + unsafe { libc::read(fd.as_raw_fd(), buf.as_mut_ptr() as *mut _, buf.len()) }; + assert_ne!(amt, -1, "read error!"); + self.length_read.fetch_add(amt as usize, Ordering::Relaxed); + was_closed = amt == 0; + } + _ => unreachable!(), + } + + self.total_calls.fetch_add(1, Ordering::Relaxed); + if self.always_exit || was_closed { + fd.close(); + } + } + + /// Write 42 bytes to our write end. + fn write42(&self) { + let buf = [0u8; 42]; + let mut writer = self.writer.lock().expect("Mutex poisoned!"); + write_loop(&mut *writer, &buf).expect("Error writing 42 bytes to pipe!"); + } +} + +add_test!("fd_monitor_items", || { + let monitor = FdMonitor::new(); + + // Items which will never receive data or be called. + let item_never = ItemMaker::insert_new_into(&monitor, None); + let item_huge_timeout = + ItemMaker::insert_new_into(&monitor, Some(Duration::from_millis(100_000_000))); + + // Item which should get no data and time out. + let item0_timeout = ItemMaker::insert_new_into(&monitor, Some(Duration::from_millis(16))); + + // Item which should get exactly 42 bytes then time out. + let item42_timeout = ItemMaker::insert_new_into(&monitor, Some(Duration::from_millis(16))); + + // Item which should get exactly 42 bytes and not time out. + let item42_no_timeout = ItemMaker::insert_new_into(&monitor, None); + + // Item which should get 42 bytes then get notified it is closed. + let item42_then_close = ItemMaker::insert_new_into(&monitor, Some(Duration::from_millis(16))); + + // Item which gets one poke. + let item_pokee = ItemMaker::insert_new_into(&monitor, None); + + // Item which should get a callback exactly once. + let item_oneshot = + ItemMaker::insert_new_into2(&monitor, Some(Duration::from_millis(16)), |item| { + item.always_exit = true; + }); + + item42_timeout.write42(); + item42_no_timeout.write42(); + item42_then_close.write42(); + item42_then_close + .writer + .lock() + .expect("Mutex poisoned!") + .close(); + item_oneshot.write42(); + + monitor.poke_item(item_pokee.item_id()); + + // May need to loop here to ensure our fd_monitor gets scheduled. See #7699. + for _ in 0..100 { + std::thread::sleep(Duration::from_millis(84)); + if item0_timeout.did_timeout.load(Ordering::Relaxed) { + break; + } + } + + drop(monitor); + + assert_eq!(item_never.did_timeout.load(Ordering::Relaxed), false); + assert_eq!(item_never.length_read.load(Ordering::Relaxed), 0); + assert_eq!(item_never.pokes.load(Ordering::Relaxed), 0); + + assert_eq!(item_huge_timeout.did_timeout.load(Ordering::Relaxed), false); + assert_eq!(item_huge_timeout.length_read.load(Ordering::Relaxed), 0); + assert_eq!(item_huge_timeout.pokes.load(Ordering::Relaxed), 0); + + assert_eq!(item0_timeout.length_read.load(Ordering::Relaxed), 0); + assert_eq!(item0_timeout.did_timeout.load(Ordering::Relaxed), true); + assert_eq!(item0_timeout.pokes.load(Ordering::Relaxed), 0); + + assert_eq!(item42_timeout.length_read.load(Ordering::Relaxed), 42); + assert_eq!(item42_timeout.did_timeout.load(Ordering::Relaxed), true); + assert_eq!(item42_timeout.pokes.load(Ordering::Relaxed), 0); + + assert_eq!(item42_no_timeout.length_read.load(Ordering::Relaxed), 42); + assert_eq!(item42_no_timeout.did_timeout.load(Ordering::Relaxed), false); + assert_eq!(item42_no_timeout.pokes.load(Ordering::Relaxed), 0); + + assert_eq!(item42_then_close.did_timeout.load(Ordering::Relaxed), false); + assert_eq!(item42_then_close.length_read.load(Ordering::Relaxed), 42); + assert_eq!(item42_then_close.total_calls.load(Ordering::Relaxed), 2); + assert_eq!(item42_then_close.pokes.load(Ordering::Relaxed), 0); + + assert_eq!(item_oneshot.did_timeout.load(Ordering::Relaxed), false); + assert_eq!(item_oneshot.length_read.load(Ordering::Relaxed), 42); + assert_eq!(item_oneshot.total_calls.load(Ordering::Relaxed), 1); + assert_eq!(item_oneshot.pokes.load(Ordering::Relaxed), 0); + + assert_eq!(item_pokee.did_timeout.load(Ordering::Relaxed), false); + assert_eq!(item_pokee.length_read.load(Ordering::Relaxed), 0); + assert_eq!(item_pokee.total_calls.load(Ordering::Relaxed), 1); + assert_eq!(item_pokee.pokes.load(Ordering::Relaxed), 1); +}); diff --git a/fish-rust/src/tests/mod.rs b/fish-rust/src/tests/mod.rs new file mode 100644 index 000000000..46bb9838d --- /dev/null +++ b/fish-rust/src/tests/mod.rs @@ -0,0 +1 @@ +mod fd_monitor; diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index 81e6f2ebf..1fbf75e2d 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -800,146 +800,6 @@ static void test_tokenizer() { err(L"redirection_type_for_string failed on line %ld", (long)__LINE__); } -static void test_fd_monitor() { - say(L"Testing fd_monitor"); - - // Helper to make an item which counts how many times its callback is invoked. - struct item_maker_t : public noncopyable_t { - std::atomic did_timeout{false}; - std::atomic length_read{0}; - std::atomic pokes{0}; - std::atomic total_calls{0}; - uint64_t item_id{0}; - bool always_exit{false}; - std::unique_ptr> item; - autoclose_fd_t writer; - - void callback(autoclose_fd_t2 &fd, item_wake_reason_t reason) { - bool was_closed = false; - switch (reason) { - case item_wake_reason_t::Timeout: - this->did_timeout = true; - break; - case item_wake_reason_t::Poke: - this->pokes += 1; - break; - case item_wake_reason_t::Readable: - char buff[4096]; - ssize_t amt = read(fd.fd(), buff, sizeof buff); - this->length_read += amt; - was_closed = (amt == 0); - break; - } - total_calls += 1; - if (always_exit || was_closed) { - fd.close(); - } - } - - static void trampoline(autoclose_fd_t2 &fd, item_wake_reason_t reason, uint8_t *param) { - auto &instance = *(item_maker_t *)(param); - instance.callback(fd, reason); - } - - explicit item_maker_t(uint64_t timeout_usec) { - auto pipes = make_autoclose_pipes().acquire(); - writer = std::move(pipes.write); - item = std::make_unique>( - make_fd_monitor_item_t(pipes.read.acquire(), timeout_usec, - (uint8_t *)item_maker_t::trampoline, (uint8_t *)this)); - } - - // Write 42 bytes to our write end. - void write42() const { - char buff[42] = {0}; - (void)write_loop(writer.fd(), buff, sizeof buff); - } - }; - - constexpr uint64_t usec_per_msec = 1000; - - // Items which will never receive data or be called back. - item_maker_t item_never(kNoTimeout); - item_maker_t item_hugetimeout(100000000LLU * usec_per_msec); - - // Item which should get no data, and time out. - item_maker_t item0_timeout(16 * usec_per_msec); - - // Item which should get exactly 42 bytes, then time out. - item_maker_t item42_timeout(16 * usec_per_msec); - - // Item which should get exactly 42 bytes, and not time out. - item_maker_t item42_nottimeout(kNoTimeout); - - // Item which should get 42 bytes, then get notified it is closed. - item_maker_t item42_thenclose(16 * usec_per_msec); - - // Item which gets one poke. - item_maker_t item_pokee(kNoTimeout); - - // Item which should be called back once. - item_maker_t item_oneshot(16 * usec_per_msec); - item_oneshot.always_exit = true; - - { - auto monitor = make_fd_monitor_t(); - for (item_maker_t *item : - {&item_never, &item_hugetimeout, &item0_timeout, &item42_timeout, &item42_nottimeout, - &item42_thenclose, &item_pokee, &item_oneshot}) { - item->item_id = monitor->add(std::move(*(std::move(item->item)))); - } - item42_timeout.write42(); - item42_nottimeout.write42(); - item42_thenclose.write42(); - item42_thenclose.writer.close(); - item_oneshot.write42(); - monitor->poke_item(item_pokee.item_id); - - // May need to loop here to ensure our fd_monitor gets scheduled - see #7699. - for (int i = 0; i < 100; i++) { - std::this_thread::sleep_for(std::chrono::milliseconds(84)); - if (item0_timeout.did_timeout) { - break; - } - } - } - - do_test(!item_never.did_timeout); - do_test(item_never.length_read == 0); - do_test(item_never.pokes == 0); - - do_test(!item_hugetimeout.did_timeout); - do_test(item_hugetimeout.length_read == 0); - do_test(item_hugetimeout.pokes == 0); - - do_test(item0_timeout.length_read == 0); - do_test(item0_timeout.did_timeout); - do_test(item0_timeout.pokes == 0); - - do_test(item42_timeout.length_read == 42); - do_test(item42_timeout.did_timeout); - do_test(item42_timeout.pokes == 0); - - do_test(item42_nottimeout.length_read == 42); - do_test(!item42_nottimeout.did_timeout); - do_test(item42_nottimeout.pokes == 0); - - do_test(item42_thenclose.did_timeout == false); - do_test(item42_thenclose.length_read == 42); - do_test(item42_thenclose.total_calls == 2); - do_test(item42_thenclose.pokes == 0); - - do_test(!item_oneshot.did_timeout); - do_test(item_oneshot.length_read == 42); - do_test(item_oneshot.total_calls == 1); - do_test(item_oneshot.pokes == 0); - - do_test(!item_pokee.did_timeout); - do_test(item_pokee.length_read == 0); - do_test(item_pokee.total_calls == 1); - do_test(item_pokee.pokes == 1); -} - static void test_iothread() { say(L"Testing iothreads"); std::atomic shared_int{0}; @@ -7054,7 +6914,6 @@ static const test_t s_tests[]{ {TEST_GROUP("perf_convert_ascii"), perf_convert_ascii, true}, {TEST_GROUP("convert_nulls"), test_convert_nulls}, {TEST_GROUP("tokenizer"), test_tokenizer}, - {TEST_GROUP("fd_monitor"), test_fd_monitor}, {TEST_GROUP("iothread"), test_iothread}, {TEST_GROUP("pthread"), test_pthread}, {TEST_GROUP("debounce"), test_debounce},