Statically type binary_semaphore_t mode of operation (#10272)

* Cleanup binary_semaphore_t by removing `sem_ok_` checks

* Fix unused import on non-Linux platforms

---------

Co-authored-by: Mahmoud Al-Qudsi <mqudsi@neosmart.net>
This commit is contained in:
Bartłomiej Maryńczak 2024-01-28 19:21:15 +01:00 committed by GitHub
parent a03162bd5b
commit 2ca102193c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -28,7 +28,6 @@ use crate::wutil::perror;
use nix::errno::Errno;
use nix::unistd;
use std::cell::{Cell, UnsafeCell};
use std::mem;
use std::pin::Pin;
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::{Condvar, Mutex, MutexGuard};
@ -156,78 +155,69 @@ impl GenerationsList {
/// A simple binary semaphore.
/// On systems that do not support unnamed semaphores (macOS in particular) this is built on top of
/// a self-pipe. Note that post() must be async-signal safe.
pub struct binary_semaphore_t {
// Whether our semaphore was successfully initialized.
sem_ok_: bool,
// The semaphore, if initalized.
// This is Box'd so it has a stable address.
sem_: Pin<Box<UnsafeCell<libc::sem_t>>>,
// Pipes used to emulate a semaphore, if not initialized.
pipes_: AutoClosePipes,
pub enum binary_semaphore_t {
/// Initialized semaphore.
/// This is Box'd so it has a stable address.
Semaphore(Pin<Box<UnsafeCell<libc::sem_t>>>),
/// Pipes used to emulate a semaphore, if not initialized.
Pipes(AutoClosePipes),
}
impl binary_semaphore_t {
pub fn new() -> binary_semaphore_t {
#[allow(unused_mut, unused_assignments)]
let mut sem_ok_ = false;
// sem_t does not have an initializer in Rust so we use zeroed().
#[allow(unused_mut)]
let mut sem_ = Pin::from(Box::new(UnsafeCell::new(unsafe { mem::zeroed() })));
let mut pipes_ = AutoClosePipes::default();
// sem_init always fails with ENOSYS on Mac and has an annoying deprecation warning.
// On BSD sem_init uses a file descriptor under the hood which doesn't get CLOEXEC (see #7304).
// So use fast semaphores on Linux only.
#[cfg(target_os = "linux")]
{
let res = unsafe { libc::sem_init(sem_.get(), 0, 0) };
sem_ok_ = res == 0;
}
if !sem_ok_ {
let pipes = fds::make_autoclose_pipes();
assert!(pipes.is_some(), "Failed to make pubsub pipes");
pipes_ = pipes.unwrap();
// sem_t does not have an initializer in Rust so we use zeroed().
let sem = Box::pin(UnsafeCell::new(unsafe { std::mem::zeroed() }));
// Whoof. Thread Sanitizer swallows signals and replays them at its leisure, at the
// point where instrumented code makes certain blocking calls. But tsan cannot interrupt
// a signal call, so if we're blocked in read() (like the topic monitor wants to be!),
// we'll never receive SIGCHLD and so deadlock. So if tsan is enabled, we mark our fd as
// non-blocking (so reads will never block) and use select() to poll it.
if cfg!(feature = "FISH_TSAN_WORKAROUNDS") {
let _ = make_fd_nonblocking(pipes_.read.fd());
let res = unsafe { libc::sem_init(sem.get(), 0, 0) };
if res == 0 {
return Self::Semaphore(sem);
}
}
binary_semaphore_t {
sem_ok_,
sem_,
pipes_,
let pipes = fds::make_autoclose_pipes().expect("Failed to make pubsub pipes");
// Whoof. Thread Sanitizer swallows signals and replays them at its leisure, at the
// point where instrumented code makes certain blocking calls. But tsan cannot interrupt
// a signal call, so if we're blocked in read() (like the topic monitor wants to be!),
// we'll never receive SIGCHLD and so deadlock. So if tsan is enabled, we mark our fd as
// non-blocking (so reads will never block) and use select() to poll it.
if cfg!(feature = "FISH_TSAN_WORKAROUNDS") {
let _ = make_fd_nonblocking(pipes.read.fd());
}
Self::Pipes(pipes)
}
/// Release a waiting thread.
pub fn post(&self) {
// Beware, we are in a signal handler.
if self.sem_ok_ {
let res = unsafe { libc::sem_post(self.sem_.get()) };
// sem_post is non-interruptible.
if res < 0 {
self.die("sem_post");
}
} else {
// Write exactly one byte.
let success;
loop {
let v: u8 = 0;
let ret = unistd::write(self.pipes_.write.fd(), std::slice::from_ref(&v));
if ret.err() == Some(Errno::EINTR) {
continue;
match self {
Self::Semaphore(sem) => {
let res = unsafe { libc::sem_post(sem.get()) };
// sem_post is non-interruptible.
if res < 0 {
self.die("sem_post");
}
success = ret.is_ok();
break;
}
if !success {
self.die("write");
Self::Pipes(pipes) => {
// Write exactly one byte.
let success;
loop {
let ret = unistd::write(pipes.write.fd(), &[0]);
if ret.err() == Some(Errno::EINTR) {
continue;
}
success = ret.is_ok();
break;
}
if !success {
self.die("write");
}
}
}
}
@ -235,39 +225,43 @@ impl binary_semaphore_t {
/// Wait for a post.
/// This loops on EINTR.
pub fn wait(&self) {
if self.sem_ok_ {
let mut res;
loop {
res = unsafe { libc::sem_wait(self.sem_.get()) };
if res < 0 && Errno::last() == Errno::EINTR {
continue;
}
break;
}
// Other errors here are very unexpected.
if res < 0 {
self.die("sem_wait");
}
} else {
let fd = self.pipes_.read.fd();
// We must read exactly one byte.
loop {
// Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read()
// call until data is available (that is, fish would use 100% cpu while waiting for
// processes). This call prevents that.
if cfg!(feature = "FISH_TSAN_WORKAROUNDS") {
let _ = fd_readable_set_t::is_fd_readable(fd, fd_readable_set_t::kNoTimeout);
}
let mut ignored: u8 = 0;
let amt = unistd::read(fd, std::slice::from_mut(&mut ignored));
if amt.ok() == Some(1) {
match self {
Self::Semaphore(sem) => {
let mut res;
loop {
res = unsafe { libc::sem_wait(sem.get()) };
if res < 0 && Errno::last() == Errno::EINTR {
continue;
}
break;
}
// EAGAIN should only be returned in TSan case.
if amt.is_err()
&& (amt.err() != Some(Errno::EINTR) && amt.err() != Some(Errno::EAGAIN))
{
self.die("read");
// Other errors here are very unexpected.
if res < 0 {
self.die("sem_wait");
}
}
Self::Pipes(pipes) => {
let fd = pipes.read.fd();
// We must read exactly one byte.
loop {
// Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read()
// call until data is available (that is, fish would use 100% cpu while waiting for
// processes). This call prevents that.
if cfg!(feature = "FISH_TSAN_WORKAROUNDS") {
let _ =
fd_readable_set_t::is_fd_readable(fd, fd_readable_set_t::kNoTimeout);
}
let mut ignored: u8 = 0;
let amt = unistd::read(fd, std::slice::from_mut(&mut ignored));
if amt.ok() == Some(1) {
break;
}
// EAGAIN should only be returned in TSan case.
if amt.is_err()
&& (amt.err() != Some(Errno::EINTR) && amt.err() != Some(Errno::EAGAIN))
{
self.die("read");
}
}
}
}
@ -281,12 +275,8 @@ impl binary_semaphore_t {
impl Drop for binary_semaphore_t {
fn drop(&mut self) {
// We never use sem_t on Mac. The #ifdef avoids deprecation warnings.
#[cfg(target_os = "linux")]
{
if self.sem_ok_ {
_ = unsafe { libc::sem_destroy(self.sem_.get()) };
}
if let Self::Semaphore(sem) = self {
_ = unsafe { libc::sem_destroy(sem.get()) };
}
}
}