mirror of
https://github.com/fish-shell/fish-shell.git
synced 2024-11-29 05:03:46 +08:00
a7c37e4af4
Previously fish attempted to block all signals on background threads, so that they would be delivered to the main thread. But on Mac, SIGSEGV and probably some others just get silently dropped, leading to potential infinite loops instead of crashing. So stop blocking these signals. With this change the null-deref in #7837 will properly crash instead of spinning.
558 lines
20 KiB
C++
558 lines
20 KiB
C++
#include "config.h" // IWYU pragma: keep
|
|
|
|
#include "iothread.h"
|
|
|
|
#include <limits.h>
|
|
#include <pthread.h>
|
|
#include <signal.h>
|
|
#include <stdio.h>
|
|
#include <string.h>
|
|
#include <sys/select.h>
|
|
#include <sys/time.h>
|
|
#include <sys/types.h>
|
|
#include <unistd.h>
|
|
|
|
#include <atomic>
|
|
#include <condition_variable>
|
|
#include <functional>
|
|
#include <future>
|
|
#include <queue>
|
|
#include <thread>
|
|
|
|
#include "common.h"
|
|
#include "fds.h"
|
|
#include "flog.h"
|
|
#include "global_safety.h"
|
|
#include "wutil.h"
|
|
|
|
// We just define a thread limit of 1024.
|
|
// On all systems I've seen the limit is higher,
|
|
// but on some (like linux with glibc) the setting for _POSIX_THREAD_THREADS_MAX is 64,
|
|
// which is too low, even tho the system can handle more than 64 threads.
|
|
#define IO_MAX_THREADS 1024
|
|
|
|
// iothread has a thread pool. Sometimes there's no work to do, but extant threads wait around for a
|
|
// while (on a condition variable) in case new work comes soon. However condition variables are not
|
|
// properly instrumented with Thread Sanitizer, so it fails to recognize when our mutex is locked.
|
|
// See https://github.com/google/sanitizers/issues/1259
|
|
// When using TSan, disable the wait-around feature.
|
|
#ifdef FISH_TSAN_WORKAROUNDS
|
|
#define IO_WAIT_FOR_WORK_DURATION_MS 0
|
|
#else
|
|
#define IO_WAIT_FOR_WORK_DURATION_MS 500
|
|
#endif
|
|
|
|
using void_function_t = std::function<void()>;
|
|
|
|
struct work_request_t {
|
|
void_function_t handler;
|
|
void_function_t completion;
|
|
|
|
work_request_t(void_function_t &&f, void_function_t &&comp)
|
|
: handler(std::move(f)), completion(std::move(comp)) {}
|
|
|
|
// Move-only
|
|
work_request_t &operator=(const work_request_t &) = delete;
|
|
work_request_t &operator=(work_request_t &&) = default;
|
|
work_request_t(const work_request_t &) = delete;
|
|
work_request_t(work_request_t &&) = default;
|
|
};
|
|
|
|
struct main_thread_request_t {
|
|
// The function to execute.
|
|
void_function_t func;
|
|
|
|
// Set by the main thread when the work is done.
|
|
std::promise<void> done{};
|
|
|
|
explicit main_thread_request_t(void_function_t &&f) : func(f) {}
|
|
|
|
// No moving OR copying
|
|
// main_thread_requests are always stack allocated, and we deal in pointers to them
|
|
void operator=(const main_thread_request_t &) = delete;
|
|
void operator=(main_thread_request_t &&) = delete;
|
|
main_thread_request_t(const main_thread_request_t &) = delete;
|
|
main_thread_request_t(main_thread_request_t &&) = delete;
|
|
};
|
|
|
|
struct thread_pool_t {
|
|
struct data_t {
|
|
/// The queue of outstanding, unclaimed requests.
|
|
std::queue<work_request_t> request_queue{};
|
|
|
|
/// The number of threads that exist in the pool.
|
|
size_t total_threads{0};
|
|
|
|
/// The number of threads which are waiting for more work.
|
|
size_t waiting_threads{0};
|
|
|
|
/// A flag indicating we should not process new requests.
|
|
bool drain{false};
|
|
};
|
|
|
|
/// Data which needs to be atomically accessed.
|
|
owning_lock<data_t> req_data{};
|
|
|
|
/// The condition variable used to wake up waiting threads.
|
|
/// Note this is tied to data's lock.
|
|
std::condition_variable queue_cond{};
|
|
|
|
/// The minimum and maximum number of threads.
|
|
/// Here "minimum" means threads that are kept waiting in the pool.
|
|
/// Note that the pool is initially empty and threads may decide to exit based on a time wait.
|
|
const size_t soft_min_threads;
|
|
const size_t max_threads;
|
|
|
|
/// Construct with a soft minimum and maximum thread count.
|
|
thread_pool_t(size_t soft_min_threads, size_t max_threads)
|
|
: soft_min_threads(soft_min_threads), max_threads(max_threads) {}
|
|
|
|
/// Enqueue a new work item onto the thread pool.
|
|
/// The function \p func will execute in one of the pool's threads.
|
|
/// \p completion will run on the main thread, if it is not missing.
|
|
/// If \p cant_wait is set, disrespect the thread limit, because extant threads may
|
|
/// want to wait for new threads.
|
|
int perform(void_function_t &&func, void_function_t &&completion, bool cant_wait);
|
|
|
|
private:
|
|
/// The worker loop for this thread.
|
|
void *run();
|
|
|
|
/// Dequeue a work item (perhaps waiting on the condition variable), or commit to exiting by
|
|
/// reducing the active thread count.
|
|
/// This runs in the background thread.
|
|
maybe_t<work_request_t> dequeue_work_or_commit_to_exit();
|
|
|
|
/// Trampoline function for pthread_spawn compatibility.
|
|
static void *run_trampoline(void *vpool);
|
|
|
|
/// Attempt to spawn a new pthread.
|
|
bool spawn() const;
|
|
|
|
/// No copying or moving.
|
|
thread_pool_t(const thread_pool_t &) = delete;
|
|
thread_pool_t(thread_pool_t &&) = delete;
|
|
void operator=(const thread_pool_t &) = delete;
|
|
void operator=(thread_pool_t &&) = delete;
|
|
};
|
|
|
|
/// The thread pool for "iothreads" which are used to lift I/O off of the main thread.
|
|
/// These are used for completions, etc.
|
|
/// Leaked to avoid shutdown dtor registration (including tsan).
|
|
static thread_pool_t &s_io_thread_pool = *(new thread_pool_t(1, IO_MAX_THREADS));
|
|
|
|
/// A queue of "things to do on the main thread."
|
|
struct main_thread_queue_t {
|
|
// Functions to invoke as the completion callback from iothread_perform.
|
|
std::vector<void_function_t> completions;
|
|
|
|
// iothread_perform_on_main requests.
|
|
// Note this contains pointers to structs that are stack-allocated on the requesting thread.
|
|
std::vector<main_thread_request_t *> requests;
|
|
|
|
/// Transfer ownership of ourselves to a new queue and return it.
|
|
/// 'this' is left empty.
|
|
main_thread_queue_t take() {
|
|
main_thread_queue_t result;
|
|
std::swap(result.completions, this->completions);
|
|
std::swap(result.requests, this->requests);
|
|
return result;
|
|
}
|
|
|
|
// Moving is allowed, but not copying.
|
|
main_thread_queue_t() = default;
|
|
main_thread_queue_t(main_thread_queue_t &&) = default;
|
|
main_thread_queue_t &operator=(main_thread_queue_t &&) = default;
|
|
main_thread_queue_t(const main_thread_queue_t &) = delete;
|
|
void operator=(const main_thread_queue_t &) = delete;
|
|
};
|
|
static owning_lock<main_thread_queue_t> s_main_thread_queue;
|
|
|
|
/// \return the signaller for completions and main thread requests.
|
|
static fd_event_signaller_t &get_notify_signaller() {
|
|
// Leaked to avoid shutdown dtors.
|
|
static fd_event_signaller_t *s_signaller = new fd_event_signaller_t();
|
|
return *s_signaller;
|
|
}
|
|
|
|
/// Dequeue a work item (perhaps waiting on the condition variable), or commit to exiting by
|
|
/// reducing the active thread count.
|
|
maybe_t<work_request_t> thread_pool_t::dequeue_work_or_commit_to_exit() {
|
|
auto data = this->req_data.acquire();
|
|
// If the queue is empty, check to see if we should wait.
|
|
// We should wait if our exiting would drop us below the soft min.
|
|
if (data->request_queue.empty() && data->total_threads == this->soft_min_threads &&
|
|
IO_WAIT_FOR_WORK_DURATION_MS > 0) {
|
|
data->waiting_threads += 1;
|
|
this->queue_cond.wait_for(data.get_lock(),
|
|
std::chrono::milliseconds(IO_WAIT_FOR_WORK_DURATION_MS));
|
|
data->waiting_threads -= 1;
|
|
}
|
|
|
|
// Now that we've perhaps waited, see if there's something on the queue.
|
|
maybe_t<work_request_t> result{};
|
|
if (!data->request_queue.empty()) {
|
|
result = std::move(data->request_queue.front());
|
|
data->request_queue.pop();
|
|
}
|
|
// If we are returning none, then ensure we balance the thread count increment from when we were
|
|
// created. This has to be done here in this awkward place because we've already committed to
|
|
// exiting - we will never pick up more work. So we need to ensure we decrement the thread count
|
|
// while holding the lock as we are effectively exited.
|
|
if (!result) {
|
|
data->total_threads -= 1;
|
|
}
|
|
return result;
|
|
}
|
|
|
|
static void enqueue_thread_result(void_function_t req) {
|
|
s_main_thread_queue.acquire()->completions.push_back(std::move(req));
|
|
get_notify_signaller().post();
|
|
}
|
|
|
|
static void *this_thread() { return (void *)(intptr_t)pthread_self(); }
|
|
|
|
void *thread_pool_t::run() {
|
|
while (auto req = dequeue_work_or_commit_to_exit()) {
|
|
FLOGF(iothread, L"pthread %p got work", this_thread());
|
|
|
|
// Perform the work
|
|
req->handler();
|
|
|
|
// If there's a completion handler, we have to enqueue it on the result queue.
|
|
// Note we're using std::function's weirdo operator== here
|
|
if (req->completion != nullptr) {
|
|
// Enqueue the result, and tell the main thread about it.
|
|
enqueue_thread_result(std::move(req->completion));
|
|
}
|
|
}
|
|
FLOGF(iothread, L"pthread %p exiting", this_thread());
|
|
return nullptr;
|
|
}
|
|
|
|
void *thread_pool_t::run_trampoline(void *pool) {
|
|
assert(pool && "No thread pool given");
|
|
return static_cast<thread_pool_t *>(pool)->run();
|
|
}
|
|
|
|
/// Spawn another thread. No lock is held when this is called.
|
|
bool thread_pool_t::spawn() const {
|
|
return make_detached_pthread(&run_trampoline, const_cast<thread_pool_t *>(this));
|
|
}
|
|
|
|
int thread_pool_t::perform(void_function_t &&func, void_function_t &&completion, bool cant_wait) {
|
|
assert(func && "Missing function");
|
|
// Note we permit an empty completion.
|
|
struct work_request_t req(std::move(func), std::move(completion));
|
|
int local_thread_count = -1;
|
|
auto &pool = s_io_thread_pool;
|
|
bool spawn_new_thread = false;
|
|
bool wakeup_thread = false;
|
|
{
|
|
// Lock around a local region.
|
|
auto data = pool.req_data.acquire();
|
|
data->request_queue.push(std::move(req));
|
|
FLOGF(iothread, L"enqueuing work item (count is %lu)", data->request_queue.size());
|
|
if (data->drain) {
|
|
// Do nothing here.
|
|
} else if (data->waiting_threads >= data->request_queue.size()) {
|
|
// There's enough waiting threads, wake one up.
|
|
wakeup_thread = true;
|
|
} else if (cant_wait || data->total_threads < pool.max_threads) {
|
|
// No threads are waiting but we can or must spawn a new thread.
|
|
data->total_threads += 1;
|
|
spawn_new_thread = true;
|
|
}
|
|
local_thread_count = data->total_threads;
|
|
}
|
|
|
|
// Kick off the thread if we decided to do so.
|
|
if (wakeup_thread) {
|
|
FLOGF(iothread, L"notifying a thread", this_thread());
|
|
pool.queue_cond.notify_one();
|
|
}
|
|
if (spawn_new_thread) {
|
|
// Spawn a thread. If this fails, it means there's already a bunch of threads; it is very
|
|
// unlikely that they are all on the verge of exiting, so one is likely to be ready to
|
|
// handle extant requests. So we can ignore failure with some confidence.
|
|
if (this->spawn()) {
|
|
FLOGF(iothread, L"pthread spawned");
|
|
} else {
|
|
// We failed to spawn a thread; decrement the thread count.
|
|
pool.req_data.acquire()->total_threads -= 1;
|
|
}
|
|
}
|
|
return local_thread_count;
|
|
}
|
|
|
|
void iothread_perform_impl(void_function_t &&func, void_function_t &&completion, bool cant_wait) {
|
|
ASSERT_IS_MAIN_THREAD();
|
|
ASSERT_IS_NOT_FORKED_CHILD();
|
|
s_io_thread_pool.perform(std::move(func), std::move(completion), cant_wait);
|
|
}
|
|
|
|
int iothread_port() { return get_notify_signaller().read_fd(); }
|
|
|
|
static bool iothread_wait_for_main_requests(long timeout_usec) {
|
|
const long usec_per_sec = 1000000;
|
|
struct timeval tv;
|
|
tv.tv_sec = timeout_usec / usec_per_sec;
|
|
tv.tv_usec = timeout_usec % usec_per_sec;
|
|
const int fd = iothread_port();
|
|
fd_set fds;
|
|
FD_ZERO(&fds);
|
|
FD_SET(fd, &fds);
|
|
int ret = select(fd + 1, &fds, nullptr, nullptr, &tv);
|
|
return ret > 0;
|
|
}
|
|
|
|
void iothread_service_main_with_timeout(long timeout_usec) {
|
|
if (iothread_wait_for_main_requests(timeout_usec)) {
|
|
iothread_service_main();
|
|
}
|
|
}
|
|
|
|
/// At the moment, this function is only used in the test suite and in a
|
|
/// drain-all-threads-before-fork compatibility mode that no architecture requires, so it's OK that
|
|
/// it's terrible.
|
|
int iothread_drain_all() {
|
|
ASSERT_IS_MAIN_THREAD();
|
|
ASSERT_IS_NOT_FORKED_CHILD();
|
|
|
|
int thread_count;
|
|
auto &pool = s_io_thread_pool;
|
|
// Set the drain flag.
|
|
{
|
|
auto data = pool.req_data.acquire();
|
|
assert(!data->drain && "Should not be draining already");
|
|
data->drain = true;
|
|
thread_count = data->total_threads;
|
|
}
|
|
|
|
// Wake everyone up.
|
|
pool.queue_cond.notify_all();
|
|
|
|
double now = timef();
|
|
|
|
// Nasty polling via select().
|
|
while (pool.req_data.acquire()->total_threads > 0) {
|
|
iothread_service_main_with_timeout(1000);
|
|
}
|
|
|
|
// Clear the drain flag.
|
|
// Even though we released the lock, nobody should have added a new thread while the drain flag
|
|
// is set.
|
|
{
|
|
auto data = pool.req_data.acquire();
|
|
assert(data->total_threads == 0 && "Should be no threads");
|
|
assert(data->drain && "Should be draining");
|
|
data->drain = false;
|
|
}
|
|
|
|
double after = timef();
|
|
FLOGF(iothread, "Drained %d thread(s) in %.02f msec", thread_count, 1000 * (after - now));
|
|
return thread_count;
|
|
}
|
|
|
|
// Service the main thread queue, by invoking any functions enqueued for the main thread.
|
|
void iothread_service_main() {
|
|
ASSERT_IS_MAIN_THREAD();
|
|
// Note the order here is important: we must consume events before handling requests, as posting
|
|
// uses the opposite order.
|
|
(void)get_notify_signaller().try_consume();
|
|
|
|
// Move the queue to a local variable.
|
|
// Note the s_main_thread_queue lock is not held after this.
|
|
main_thread_queue_t queue = s_main_thread_queue.acquire()->take();
|
|
|
|
// Perform each completion in order.
|
|
for (const void_function_t &func : queue.completions) {
|
|
// ensure we don't invoke empty functions, that raises an exception
|
|
if (func) func();
|
|
}
|
|
|
|
// Perform each main thread request. Note we are NOT responsible for deleting these. They are
|
|
// stack allocated in their respective threads!
|
|
for (main_thread_request_t *req : queue.requests) {
|
|
req->func();
|
|
req->done.set_value();
|
|
}
|
|
}
|
|
|
|
void iothread_perform_on_main(void_function_t &&func) {
|
|
if (is_main_thread()) {
|
|
func();
|
|
return;
|
|
}
|
|
|
|
// Make a new request. Note we are synchronous, so this can be stack allocated!
|
|
main_thread_request_t req(std::move(func));
|
|
|
|
// Append it. Ensure we don't hold the lock after.
|
|
s_main_thread_queue.acquire()->requests.push_back(&req);
|
|
|
|
// Tell the signaller and then wait until our future is set.
|
|
get_notify_signaller().post();
|
|
req.done.get_future().wait();
|
|
}
|
|
|
|
bool make_detached_pthread(void *(*func)(void *), void *param) {
|
|
// The spawned thread inherits our signal mask. Temporarily block signals, spawn the thread, and
|
|
// then restore it. But we must not block SIGBUS, SIGFPE, SIGILL, or SIGSEGV; that's undefined
|
|
// (#7837). Conservatively don't try to mask SIGKILL or SIGSTOP either; that's ignored on Linux
|
|
// but maybe has an effect elsewhere.
|
|
sigset_t new_set, saved_set;
|
|
sigfillset(&new_set);
|
|
sigdelset(&new_set, SIGILL); // bad jump
|
|
sigdelset(&new_set, SIGFPE); // divide by zero
|
|
sigdelset(&new_set, SIGBUS); // unaligned memory access
|
|
sigdelset(&new_set, SIGSEGV); // bad memory access
|
|
sigdelset(&new_set, SIGSTOP); // unblockable
|
|
sigdelset(&new_set, SIGKILL); // unblockable
|
|
DIE_ON_FAILURE(pthread_sigmask(SIG_BLOCK, &new_set, &saved_set));
|
|
|
|
// Spawn a thread. If this fails, it means there's already a bunch of threads; it is very
|
|
// unlikely that they are all on the verge of exiting, so one is likely to be ready to handle
|
|
// extant requests. So we can ignore failure with some confidence.
|
|
pthread_t thread = 0;
|
|
int err = pthread_create(&thread, nullptr, func, param);
|
|
if (err == 0) {
|
|
// Success, return the thread.
|
|
FLOGF(iothread, "pthread %p spawned", (void *)(intptr_t)thread);
|
|
DIE_ON_FAILURE(pthread_detach(thread));
|
|
} else {
|
|
perror("pthread_create");
|
|
}
|
|
// Restore our sigmask.
|
|
DIE_ON_FAILURE(pthread_sigmask(SIG_SETMASK, &saved_set, nullptr));
|
|
return err == 0;
|
|
}
|
|
|
|
using void_func_t = std::function<void(void)>;
|
|
|
|
static void *func_invoker(void *param) {
|
|
// Acquire a thread id for this thread.
|
|
(void)thread_id();
|
|
auto vf = static_cast<void_func_t *>(param);
|
|
(*vf)();
|
|
delete vf;
|
|
return nullptr;
|
|
}
|
|
|
|
bool make_detached_pthread(void_func_t &&func) {
|
|
// Copy the function into a heap allocation.
|
|
auto vf = new void_func_t(std::move(func));
|
|
if (make_detached_pthread(func_invoker, vf)) {
|
|
return true;
|
|
}
|
|
// Thread spawning failed, clean up our heap allocation.
|
|
delete vf;
|
|
return false;
|
|
}
|
|
|
|
static uint64_t next_thread_id() {
|
|
// Note 0 is an invalid thread id.
|
|
// Note fetch_add is a CAS which returns the value *before* the modification.
|
|
static std::atomic<uint64_t> s_last_thread_id{};
|
|
uint64_t res = 1 + s_last_thread_id.fetch_add(1, std::memory_order_relaxed);
|
|
return res;
|
|
}
|
|
|
|
uint64_t thread_id() {
|
|
static FISH_THREAD_LOCAL uint64_t tl_tid = next_thread_id();
|
|
return tl_tid;
|
|
}
|
|
|
|
// Debounce implementation note: we would like to enqueue at most one request, except if a thread
|
|
// hangs (e.g. on fs access) then we do not want to block indefinitely; such threads are called
|
|
// "abandoned". This is implemented via a monotone uint64 counter, called a token.
|
|
// Every time we spawn a thread, increment the token. When the thread is completed, it compares its
|
|
// token to the active token; if they differ then this thread was abandoned.
|
|
struct debounce_t::impl_t {
|
|
// Synchronized data from debounce_t.
|
|
struct data_t {
|
|
// The (at most 1) next enqueued request, or none if none.
|
|
maybe_t<work_request_t> next_req{};
|
|
|
|
// The token of the current non-abandoned thread, or 0 if no thread is running.
|
|
uint64_t active_token{0};
|
|
|
|
// The next token to use when spawning a thread.
|
|
uint64_t next_token{1};
|
|
|
|
// The start time of the most recently run thread spawn, or request (if any).
|
|
std::chrono::time_point<std::chrono::steady_clock> start_time{};
|
|
};
|
|
owning_lock<data_t> data{};
|
|
|
|
/// Run an iteration in the background, with the given thread token.
|
|
/// \return true if we handled a request, false if there were none.
|
|
bool run_next(uint64_t token);
|
|
};
|
|
|
|
bool debounce_t::impl_t::run_next(uint64_t token) {
|
|
assert(token > 0 && "Invalid token");
|
|
// Note we are on a background thread.
|
|
maybe_t<work_request_t> req;
|
|
{
|
|
auto d = data.acquire();
|
|
if (d->next_req) {
|
|
// The value was dequeued, we are going to execute it.
|
|
req = d->next_req.acquire();
|
|
d->start_time = std::chrono::steady_clock::now();
|
|
} else {
|
|
// There is no request. If we are active, mark ourselves as no longer running.
|
|
if (token == d->active_token) {
|
|
d->active_token = 0;
|
|
}
|
|
return false;
|
|
}
|
|
}
|
|
|
|
assert(req && req->handler && "Request should have value");
|
|
req->handler();
|
|
if (req->completion) {
|
|
enqueue_thread_result(std::move(req->completion));
|
|
}
|
|
return true;
|
|
}
|
|
|
|
uint64_t debounce_t::perform_impl(std::function<void()> handler, std::function<void()> completion) {
|
|
uint64_t active_token{0};
|
|
bool spawn{false};
|
|
// Local lock.
|
|
{
|
|
auto d = impl_->data.acquire();
|
|
d->next_req = work_request_t{std::move(handler), std::move(completion)};
|
|
// If we have a timeout, and our running thread has exceeded it, abandon that thread.
|
|
if (d->active_token && timeout_msec_ > 0 &&
|
|
std::chrono::steady_clock::now() - d->start_time >
|
|
std::chrono::milliseconds(timeout_msec_)) {
|
|
// Abandon this thread by marking nothing as active.
|
|
d->active_token = 0;
|
|
}
|
|
if (!d->active_token) {
|
|
// We need to spawn a new thread.
|
|
// Mark the current time so that a new request won't immediately abandon us.
|
|
spawn = true;
|
|
d->active_token = d->next_token++;
|
|
d->start_time = std::chrono::steady_clock::now();
|
|
}
|
|
active_token = d->active_token;
|
|
assert(active_token && "Something should be active");
|
|
}
|
|
if (spawn) {
|
|
// Equip our background thread with a reference to impl, to keep it alive.
|
|
auto impl = impl_;
|
|
iothread_perform([=] {
|
|
while (impl->run_next(active_token))
|
|
; // pass
|
|
});
|
|
}
|
|
return active_token;
|
|
}
|
|
|
|
debounce_t::debounce_t(long timeout_msec)
|
|
: timeout_msec_(timeout_msec), impl_(std::make_shared<impl_t>()) {}
|
|
debounce_t::~debounce_t() = default;
|