fish-shell/src/iothread.cpp

380 lines
13 KiB
C++
Raw Normal View History

#include "config.h" // IWYU pragma: keep
2011-12-27 13:21:12 +08:00
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <limits.h>
#include <pthread.h>
#include <signal.h>
2015-07-25 23:14:25 +08:00
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
2011-12-27 13:21:12 +08:00
#include <unistd.h>
#include <queue>
#include "common.h"
#include "iothread.h"
#include "wutil.h"
2011-12-27 13:21:12 +08:00
#ifdef _POSIX_THREAD_THREADS_MAX
#if _POSIX_THREAD_THREADS_MAX < 64
#define IO_MAX_THREADS _POSIX_THREAD_THREADS_MAX
#endif
2011-12-27 13:21:12 +08:00
#endif
#ifndef IO_MAX_THREADS
#define IO_MAX_THREADS 64
2011-12-27 13:21:12 +08:00
#endif
// Values for the wakeup bytes sent to the ioport.
#define IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE 99
#define IO_SERVICE_RESULT_QUEUE 100
static void iothread_service_main_thread_requests(void);
static void iothread_service_result_queue();
2011-12-27 13:21:12 +08:00
struct spawn_request_t {
int (*handler)(void *) = NULL;
void (*completion)(void *, int) = NULL;
void *context = NULL;
int handler_result = -1;
spawn_request_t() {}
// Move-only
spawn_request_t &operator=(const spawn_request_t &) = delete;
spawn_request_t &operator=(spawn_request_t &&) = default;
spawn_request_t(const spawn_request_t &) = delete;
spawn_request_t(spawn_request_t &&) = default;
2011-12-27 13:21:12 +08:00
};
struct main_thread_request_t {
int (*handler)(void *) = NULL;
void *context = NULL;
volatile int handler_result = -1;
volatile bool done = false;
main_thread_request_t() {}
// No moving OR copying
// main_thread_requests are always stack allocated, and we deal in pointers to them
void operator=(const spawn_request_t &) = delete;
main_thread_request_t(const spawn_request_t &) = delete;
main_thread_request_t(spawn_request_t &&) = delete;
};
// Spawn support. Requests are allocated and come in on request_queue. They go out on result_queue,
// at which point they can be deallocated. s_active_thread_count is also protected by the lock.
static pthread_mutex_t s_spawn_queue_lock = PTHREAD_MUTEX_INITIALIZER;
static std::queue<spawn_request_t> s_request_queue;
static int s_active_thread_count;
static pthread_mutex_t s_result_queue_lock = PTHREAD_MUTEX_INITIALIZER;
static std::queue<spawn_request_t> s_result_queue;
// "Do on main thread" support.
static pthread_mutex_t s_main_thread_performer_lock = PTHREAD_MUTEX_INITIALIZER; // protects the main thread requests
2016-11-03 12:54:57 +08:00
static pthread_cond_t s_main_thread_performer_cond; // protects the main thread requests
static pthread_mutex_t s_main_thread_request_q_lock = PTHREAD_MUTEX_INITIALIZER; // protects the queue
static std::queue<main_thread_request_t *> s_main_thread_request_queue;
// Notifying pipes.
2011-12-27 13:21:12 +08:00
static int s_read_pipe, s_write_pipe;
static void iothread_init(void) {
static bool inited = false;
if (!inited) {
inited = true;
// Initialize some locks.
2016-11-03 12:54:57 +08:00
VOMIT_ON_FAILURE(pthread_cond_init(&s_main_thread_performer_cond, NULL));
// Initialize the completion pipes.
int pipes[2] = {0, 0};
VOMIT_ON_FAILURE(pipe(pipes));
s_read_pipe = pipes[0];
s_write_pipe = pipes[1];
set_cloexec(s_read_pipe);
set_cloexec(s_write_pipe);
}
2011-12-27 13:21:12 +08:00
}
static void add_to_queue(struct spawn_request_t req) {
ASSERT_IS_LOCKED(s_spawn_queue_lock);
s_request_queue.push(std::move(req));
}
static bool dequeue_spawn_request(spawn_request_t *result) {
ASSERT_IS_LOCKED(s_spawn_queue_lock);
if (!s_request_queue.empty()) {
*result = std::move(s_request_queue.front());
s_request_queue.pop();
return true;
}
return false;
2011-12-27 13:21:12 +08:00
}
static void enqueue_thread_result(spawn_request_t req) {
scoped_lock locker(s_result_queue_lock);
s_result_queue.push(std::move(req));
}
static void *this_thread() { return (void *)(intptr_t)pthread_self(); }
2014-04-28 08:23:19 +08:00
/// The function that does thread work.
static void *iothread_worker(void *unused) {
UNUSED(unused);
scoped_lock locker(s_spawn_queue_lock);
struct spawn_request_t req;
while (dequeue_spawn_request(&req)) {
debug(5, "pthread %p dequeued\n", this_thread());
// Unlock the queue while we execute the request.
locker.unlock();
// Perform the work.
req.handler_result = req.handler(req.context);
// If there's a completion handler, we have to enqueue it on the result queue.
if (req.completion != NULL) {
// Enqueue the result, and tell the main thread about it.
enqueue_thread_result(std::move(req));
const char wakeup_byte = IO_SERVICE_RESULT_QUEUE;
VOMIT_ON_FAILURE(!write_loop(s_write_pipe, &wakeup_byte, sizeof wakeup_byte));
}
// Lock us up again.
locker.lock();
}
// We believe we have exhausted the thread request queue. We want to decrement
// s_active_thread_count and exit. But it's possible that a request just came in. Furthermore,
// it's possible that the main thread saw that s_active_thread_count is full, and decided to not
// spawn a new thread, trusting in one of the existing threads to handle it. But we've already
// committed to not handling anything else. Therefore, we have to decrement
// s_active_thread_count under the lock, which we still hold. Likewise, the main thread must
// check the value under the lock.
ASSERT_IS_LOCKED(s_spawn_queue_lock);
assert(s_active_thread_count > 0);
s_active_thread_count -= 1;
2016-10-30 10:01:19 +08:00
debug(5, "pthread %p exiting\n", this_thread());
// We're done.
return NULL;
2011-12-27 13:21:12 +08:00
}
/// Spawn another thread. No lock is held when this is called.
static void iothread_spawn() {
// The spawned thread inherits our signal mask. We don't want the thread to ever receive signals
// on the spawned thread, so temporarily block all signals, spawn the thread, and then restore
// it.
sigset_t new_set, saved_set;
sigfillset(&new_set);
VOMIT_ON_FAILURE(pthread_sigmask(SIG_BLOCK, &new_set, &saved_set));
// Spawn a thread. If this fails, it means there's already a bunch of threads; it is very
// unlikely that they are all on the verge of exiting, so one is likely to be ready to handle
// extant requests. So we can ignore failure with some confidence.
2014-04-28 08:23:19 +08:00
pthread_t thread = 0;
pthread_create(&thread, NULL, iothread_worker, NULL);
// We will never join this thread.
VOMIT_ON_FAILURE(pthread_detach(thread));
2016-10-30 10:01:19 +08:00
debug(5, "pthread %p spawned\n", (void *)(intptr_t)thread);
// Restore our sigmask.
VOMIT_ON_FAILURE(pthread_sigmask(SIG_SETMASK, &saved_set, NULL));
2011-12-27 13:21:12 +08:00
}
int iothread_perform_base(int (*handler)(void *), void (*completion)(void *, int),
void *context) {
ASSERT_IS_MAIN_THREAD();
ASSERT_IS_NOT_FORKED_CHILD();
iothread_init();
// Create and initialize a request.
struct spawn_request_t req;
req.handler = handler;
req.completion = completion;
req.context = context;
int local_thread_count = -1;
bool spawn_new_thread = false;
{
// Lock around a local region. Note that we can only access s_active_thread_count under the
// lock.
scoped_lock locker(s_spawn_queue_lock);
add_to_queue(std::move(req));
if (s_active_thread_count < IO_MAX_THREADS) {
s_active_thread_count++;
spawn_new_thread = true;
}
local_thread_count = s_active_thread_count;
}
// Kick off the thread if we decided to do so.
if (spawn_new_thread) {
iothread_spawn();
}
// We return the active thread count for informational purposes only.
return local_thread_count;
2011-12-27 13:21:12 +08:00
}
int iothread_port(void) {
iothread_init();
return s_read_pipe;
2011-12-27 13:21:12 +08:00
}
void iothread_service_completion(void) {
ASSERT_IS_MAIN_THREAD();
char wakeup_byte;
VOMIT_ON_FAILURE(1 != read_loop(iothread_port(), &wakeup_byte, sizeof wakeup_byte));
if (wakeup_byte == IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE) {
iothread_service_main_thread_requests();
} else if (wakeup_byte == IO_SERVICE_RESULT_QUEUE) {
iothread_service_result_queue();
} else {
debug(0, "Unknown wakeup byte %02x in %s", wakeup_byte, __FUNCTION__);
}
}
static bool iothread_wait_for_pending_completions(long timeout_usec) {
const long usec_per_sec = 1000000;
struct timeval tv;
tv.tv_sec = timeout_usec / usec_per_sec;
tv.tv_usec = timeout_usec % usec_per_sec;
const int fd = iothread_port();
fd_set fds;
FD_ZERO(&fds);
FD_SET(fd, &fds);
int ret = select(fd + 1, &fds, NULL, NULL, &tv);
return ret > 0;
2011-12-27 13:21:12 +08:00
}
/// Note that this function is quite sketchy. In particular, it drains threads, not requests,
/// meaning that it may leave requests on the queue. This is the desired behavior (it may be called
/// before fork, and we don't want to bother servicing requests before we fork), but in the test
/// suite we depend on it draining all requests. In practice, this works, because a thread in
/// practice won't exit while there is outstanding requests.
///
/// At the moment, this function is only used in the test suite and in a
/// drain-all-threads-before-fork compatibility mode that no architecture requires, so it's OK that
/// it's terrible.
void iothread_drain_all(void) {
ASSERT_IS_MAIN_THREAD();
ASSERT_IS_NOT_FORKED_CHILD();
scoped_lock locker(s_spawn_queue_lock);
#define TIME_DRAIN 0
#if TIME_DRAIN
int thread_count = s_active_thread_count;
double now = timef();
#endif
// Nasty polling via select().
while (s_active_thread_count > 0) {
locker.unlock();
if (iothread_wait_for_pending_completions(1000)) {
iothread_service_completion();
}
locker.lock();
}
#if TIME_DRAIN
double after = timef();
fwprintf(stdout, L"(Waited %.02f msec for %d thread(s) to drain)\n", 1000 * (after - now),
thread_count);
#endif
}
/// "Do on main thread" support.
static void iothread_service_main_thread_requests(void) {
ASSERT_IS_MAIN_THREAD();
// Move the queue to a local variable.
std::queue<main_thread_request_t *> request_queue;
{
2016-11-03 12:54:57 +08:00
scoped_lock queue_lock(s_main_thread_request_q_lock);
std::swap(request_queue, s_main_thread_request_queue);
}
if (!request_queue.empty()) {
// Perform each of the functions. Note we are NOT responsible for deleting these. They are
// stack allocated in their respective threads!
while (!request_queue.empty()) {
main_thread_request_t *req = request_queue.front();
request_queue.pop();
req->handler_result = req->handler(req->context);
req->done = true;
}
// Ok, we've handled everybody. Announce the good news, and allow ourselves to be unlocked.
// Note we must do this while holding the lock. Otherwise we race with the waiting threads:
//
// 1. waiting thread checks for done, sees false
// 2. main thread performs request, sets done to true, posts to condition
// 3. waiting thread unlocks lock, waits on condition (forever)
//
// Because the waiting thread performs step 1 under the lock, if we take the lock, we avoid
// posting before the waiting thread is waiting.
scoped_lock broadcast_lock(s_main_thread_performer_lock);
2016-11-03 12:54:57 +08:00
VOMIT_ON_FAILURE(pthread_cond_broadcast(&s_main_thread_performer_cond));
}
}
// Service the queue of results
static void iothread_service_result_queue() {
// Move the queue to a local variable.
std::queue<spawn_request_t> result_queue;
{
scoped_lock queue_lock(s_result_queue_lock);
std::swap(result_queue, s_result_queue);
}
// Perform each completion in order. We are responsibile for cleaning them up.
while (!result_queue.empty()) {
spawn_request_t req = std::move(result_queue.front());
result_queue.pop();
if (req.completion) {
req.completion(req.context, req.handler_result);
}
}
}
int iothread_perform_on_main_base(int (*handler)(void *), void *context) {
// If this is the main thread, just do it.
if (is_main_thread()) {
return handler(context);
}
// Make a new request. Note we are synchronous, so this can be stack allocated!
main_thread_request_t req;
req.handler = handler;
req.context = context;
// Append it. Do not delete the nested scope as it is crucial to the proper functioning of this
// code by virtue of the lock management.
{
2016-11-03 12:54:57 +08:00
scoped_lock queue_lock(s_main_thread_request_q_lock);
s_main_thread_request_queue.push(&req);
}
// Tell the pipe.
const char wakeup_byte = IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE;
VOMIT_ON_FAILURE(!write_loop(s_write_pipe, &wakeup_byte, sizeof wakeup_byte));
// Wait on the condition, until we're done.
scoped_lock perform_lock(s_main_thread_performer_lock);
while (!req.done) {
// It would be nice to support checking for cancellation here, but the clients need a
// deterministic way to clean up to avoid leaks
VOMIT_ON_FAILURE(
2016-11-03 12:54:57 +08:00
pthread_cond_wait(&s_main_thread_performer_cond, &s_main_thread_performer_lock));
}
// Ok, the request must now be done.
assert(req.done);
return req.handler_result;
}