From 2ca102193c88578706d87474f927a12b231fb956 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Mary=C5=84czak?= Date: Sun, 28 Jan 2024 19:21:15 +0100 Subject: [PATCH] Statically type `binary_semaphore_t` mode of operation (#10272) * Cleanup binary_semaphore_t by removing `sem_ok_` checks * Fix unused import on non-Linux platforms --------- Co-authored-by: Mahmoud Al-Qudsi --- src/topic_monitor.rs | 170 ++++++++++++++++++++----------------------- 1 file changed, 80 insertions(+), 90 deletions(-) diff --git a/src/topic_monitor.rs b/src/topic_monitor.rs index 07637f398..e0a8aee3d 100644 --- a/src/topic_monitor.rs +++ b/src/topic_monitor.rs @@ -28,7 +28,6 @@ use crate::wutil::perror; use nix::errno::Errno; use nix::unistd; use std::cell::{Cell, UnsafeCell}; -use std::mem; use std::pin::Pin; use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::{Condvar, Mutex, MutexGuard}; @@ -156,78 +155,69 @@ impl GenerationsList { /// A simple binary semaphore. /// On systems that do not support unnamed semaphores (macOS in particular) this is built on top of /// a self-pipe. Note that post() must be async-signal safe. -pub struct binary_semaphore_t { - // Whether our semaphore was successfully initialized. - sem_ok_: bool, - - // The semaphore, if initalized. - // This is Box'd so it has a stable address. - sem_: Pin>>, - - // Pipes used to emulate a semaphore, if not initialized. - pipes_: AutoClosePipes, +pub enum binary_semaphore_t { + /// Initialized semaphore. + /// This is Box'd so it has a stable address. + Semaphore(Pin>>), + /// Pipes used to emulate a semaphore, if not initialized. + Pipes(AutoClosePipes), } impl binary_semaphore_t { pub fn new() -> binary_semaphore_t { - #[allow(unused_mut, unused_assignments)] - let mut sem_ok_ = false; - // sem_t does not have an initializer in Rust so we use zeroed(). - #[allow(unused_mut)] - let mut sem_ = Pin::from(Box::new(UnsafeCell::new(unsafe { mem::zeroed() }))); - let mut pipes_ = AutoClosePipes::default(); // sem_init always fails with ENOSYS on Mac and has an annoying deprecation warning. // On BSD sem_init uses a file descriptor under the hood which doesn't get CLOEXEC (see #7304). // So use fast semaphores on Linux only. #[cfg(target_os = "linux")] { - let res = unsafe { libc::sem_init(sem_.get(), 0, 0) }; - sem_ok_ = res == 0; - } - if !sem_ok_ { - let pipes = fds::make_autoclose_pipes(); - assert!(pipes.is_some(), "Failed to make pubsub pipes"); - pipes_ = pipes.unwrap(); + // sem_t does not have an initializer in Rust so we use zeroed(). + let sem = Box::pin(UnsafeCell::new(unsafe { std::mem::zeroed() })); - // Whoof. Thread Sanitizer swallows signals and replays them at its leisure, at the - // point where instrumented code makes certain blocking calls. But tsan cannot interrupt - // a signal call, so if we're blocked in read() (like the topic monitor wants to be!), - // we'll never receive SIGCHLD and so deadlock. So if tsan is enabled, we mark our fd as - // non-blocking (so reads will never block) and use select() to poll it. - if cfg!(feature = "FISH_TSAN_WORKAROUNDS") { - let _ = make_fd_nonblocking(pipes_.read.fd()); + let res = unsafe { libc::sem_init(sem.get(), 0, 0) }; + if res == 0 { + return Self::Semaphore(sem); } } - binary_semaphore_t { - sem_ok_, - sem_, - pipes_, + + let pipes = fds::make_autoclose_pipes().expect("Failed to make pubsub pipes"); + + // Whoof. Thread Sanitizer swallows signals and replays them at its leisure, at the + // point where instrumented code makes certain blocking calls. But tsan cannot interrupt + // a signal call, so if we're blocked in read() (like the topic monitor wants to be!), + // we'll never receive SIGCHLD and so deadlock. So if tsan is enabled, we mark our fd as + // non-blocking (so reads will never block) and use select() to poll it. + if cfg!(feature = "FISH_TSAN_WORKAROUNDS") { + let _ = make_fd_nonblocking(pipes.read.fd()); } + + Self::Pipes(pipes) } /// Release a waiting thread. pub fn post(&self) { // Beware, we are in a signal handler. - if self.sem_ok_ { - let res = unsafe { libc::sem_post(self.sem_.get()) }; - // sem_post is non-interruptible. - if res < 0 { - self.die("sem_post"); - } - } else { - // Write exactly one byte. - let success; - loop { - let v: u8 = 0; - let ret = unistd::write(self.pipes_.write.fd(), std::slice::from_ref(&v)); - if ret.err() == Some(Errno::EINTR) { - continue; + match self { + Self::Semaphore(sem) => { + let res = unsafe { libc::sem_post(sem.get()) }; + // sem_post is non-interruptible. + if res < 0 { + self.die("sem_post"); } - success = ret.is_ok(); - break; } - if !success { - self.die("write"); + Self::Pipes(pipes) => { + // Write exactly one byte. + let success; + loop { + let ret = unistd::write(pipes.write.fd(), &[0]); + if ret.err() == Some(Errno::EINTR) { + continue; + } + success = ret.is_ok(); + break; + } + if !success { + self.die("write"); + } } } } @@ -235,39 +225,43 @@ impl binary_semaphore_t { /// Wait for a post. /// This loops on EINTR. pub fn wait(&self) { - if self.sem_ok_ { - let mut res; - loop { - res = unsafe { libc::sem_wait(self.sem_.get()) }; - if res < 0 && Errno::last() == Errno::EINTR { - continue; - } - break; - } - // Other errors here are very unexpected. - if res < 0 { - self.die("sem_wait"); - } - } else { - let fd = self.pipes_.read.fd(); - // We must read exactly one byte. - loop { - // Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read() - // call until data is available (that is, fish would use 100% cpu while waiting for - // processes). This call prevents that. - if cfg!(feature = "FISH_TSAN_WORKAROUNDS") { - let _ = fd_readable_set_t::is_fd_readable(fd, fd_readable_set_t::kNoTimeout); - } - let mut ignored: u8 = 0; - let amt = unistd::read(fd, std::slice::from_mut(&mut ignored)); - if amt.ok() == Some(1) { + match self { + Self::Semaphore(sem) => { + let mut res; + loop { + res = unsafe { libc::sem_wait(sem.get()) }; + if res < 0 && Errno::last() == Errno::EINTR { + continue; + } break; } - // EAGAIN should only be returned in TSan case. - if amt.is_err() - && (amt.err() != Some(Errno::EINTR) && amt.err() != Some(Errno::EAGAIN)) - { - self.die("read"); + // Other errors here are very unexpected. + if res < 0 { + self.die("sem_wait"); + } + } + Self::Pipes(pipes) => { + let fd = pipes.read.fd(); + // We must read exactly one byte. + loop { + // Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read() + // call until data is available (that is, fish would use 100% cpu while waiting for + // processes). This call prevents that. + if cfg!(feature = "FISH_TSAN_WORKAROUNDS") { + let _ = + fd_readable_set_t::is_fd_readable(fd, fd_readable_set_t::kNoTimeout); + } + let mut ignored: u8 = 0; + let amt = unistd::read(fd, std::slice::from_mut(&mut ignored)); + if amt.ok() == Some(1) { + break; + } + // EAGAIN should only be returned in TSan case. + if amt.is_err() + && (amt.err() != Some(Errno::EINTR) && amt.err() != Some(Errno::EAGAIN)) + { + self.die("read"); + } } } } @@ -281,12 +275,8 @@ impl binary_semaphore_t { impl Drop for binary_semaphore_t { fn drop(&mut self) { - // We never use sem_t on Mac. The #ifdef avoids deprecation warnings. - #[cfg(target_os = "linux")] - { - if self.sem_ok_ { - _ = unsafe { libc::sem_destroy(self.sem_.get()) }; - } + if let Self::Semaphore(sem) = self { + _ = unsafe { libc::sem_destroy(sem.get()) }; } } }