Rename spawn_request_t to work_request_t and clean up the API a bit

This commit is contained in:
ridiculousfish 2019-11-23 12:13:18 -08:00
parent 03a289c9ef
commit 3fb9159b09

View File

@ -31,6 +31,9 @@
#define IO_MAX_THREADS 64
#endif
// The minimum number of threads kept waiting in the pool.
#define IO_MIN_THREADS 1
// Values for the wakeup bytes sent to the ioport.
#define IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE 99
#define IO_SERVICE_RESULT_QUEUE 100
@ -40,19 +43,17 @@ static void iothread_service_result_queue();
typedef std::function<void(void)> void_function_t;
struct spawn_request_t {
struct work_request_t {
void_function_t handler;
void_function_t completion;
spawn_request_t() {}
spawn_request_t(void_function_t &&f, void_function_t &&comp) : handler(f), completion(comp) {}
work_request_t(void_function_t &&f, void_function_t &&comp) : handler(f), completion(comp) {}
// Move-only
spawn_request_t &operator=(const spawn_request_t &) = delete;
spawn_request_t &operator=(spawn_request_t &&) = default;
spawn_request_t(const spawn_request_t &) = delete;
spawn_request_t(spawn_request_t &&) = default;
work_request_t &operator=(const work_request_t &) = delete;
work_request_t &operator=(work_request_t &&) = default;
work_request_t(const work_request_t &) = delete;
work_request_t(work_request_t &&) = default;
};
struct main_thread_request_t {
@ -68,13 +69,17 @@ struct main_thread_request_t {
main_thread_request_t(main_thread_request_t &&) = delete;
};
// Spawn support. Requests are allocated and come in on request_queue and go out on result_queue
struct thread_data_t {
std::queue<spawn_request_t> request_queue;
/// Data about the current set of IO threads and requests.
struct thread_pool_data_t {
/// The queue of outstanding, unclaimed requests.
std::queue<work_request_t> request_queue;
/// The number of extant threads which are able to run requests.
int thread_count = 0;
};
static owning_lock<thread_data_t> s_spawn_requests;
static owning_lock<std::queue<spawn_request_t>> s_result_queue;
static owning_lock<thread_pool_data_t> s_thread_pool;
static owning_lock<std::queue<work_request_t>> s_result_queue;
// "Do on main thread" support.
static std::mutex s_main_thread_performer_lock; // protects the main thread requests
@ -108,17 +113,17 @@ static const notify_pipes_t &get_notify_pipes() {
return s_notify_pipes;
}
static bool dequeue_spawn_request(spawn_request_t *result) {
auto requests = s_spawn_requests.acquire();
static maybe_t<work_request_t> dequeue_spawn_request() {
maybe_t<work_request_t> result{};
auto requests = s_thread_pool.acquire();
if (!requests->request_queue.empty()) {
*result = std::move(requests->request_queue.front());
result = std::move(requests->request_queue.front());
requests->request_queue.pop();
return true;
}
return false;
return result;
}
static void enqueue_thread_result(spawn_request_t req) {
static void enqueue_thread_result(work_request_t req) {
s_result_queue.acquire()->push(std::move(req));
}
@ -127,18 +132,17 @@ static void *this_thread() { return (void *)(intptr_t)pthread_self(); }
/// The function that does thread work.
static void *iothread_worker(void *unused) {
UNUSED(unused);
struct spawn_request_t req;
while (dequeue_spawn_request(&req)) {
while (auto req = dequeue_spawn_request()) {
debug(5, "pthread %p dequeued", this_thread());
// Perform the work
req.handler();
req->handler();
// If there's a completion handler, we have to enqueue it on the result queue.
// Note we're using std::function's weirdo operator== here
if (req.completion != nullptr) {
if (req->completion != nullptr) {
// Enqueue the result, and tell the main thread about it.
enqueue_thread_result(std::move(req));
enqueue_thread_result(req.acquire());
const char wakeup_byte = IO_SERVICE_RESULT_QUEUE;
int notify_fd = get_notify_pipes().write;
assert_with_errno(write_loop(notify_fd, &wakeup_byte, sizeof wakeup_byte) != -1);
@ -150,9 +154,9 @@ static void *iothread_worker(void *unused) {
// it's possible that the main thread saw that thread_count is full, and decided to not
// spawn a new thread, trusting in one of the existing threads to handle it. But we've already
// committed to not handling anything else. Therefore, we have to decrement
// the thread count under the lock, which we still hold. Likewise, the main thread must
// check the value under the lock.
int new_thread_count = --s_spawn_requests.acquire()->thread_count;
// the thread count under the lock. Likewise, the main thread must check the value under the
// lock.
int new_thread_count = --s_thread_pool.acquire()->thread_count;
assert(new_thread_count >= 0);
debug(5, "pthread %p exiting", this_thread());
@ -176,12 +180,12 @@ int iothread_perform_impl(void_function_t &&func, void_function_t &&completion)
ASSERT_IS_MAIN_THREAD();
ASSERT_IS_NOT_FORKED_CHILD();
struct spawn_request_t req(std::move(func), std::move(completion));
struct work_request_t req(std::move(func), std::move(completion));
int local_thread_count = -1;
bool spawn_new_thread = false;
{
// Lock around a local region.
auto spawn_reqs = s_spawn_requests.acquire();
auto spawn_reqs = s_thread_pool.acquire();
spawn_reqs->request_queue.push(std::move(req));
if (spawn_reqs->thread_count < IO_MAX_THREADS) {
spawn_reqs->thread_count++;
@ -246,7 +250,7 @@ void iothread_drain_all() {
#endif
// Nasty polling via select().
while (s_spawn_requests.acquire()->thread_count > 0) {
while (s_thread_pool.acquire()->thread_count > 0) {
if (iothread_wait_for_pending_completions(1000)) {
iothread_service_completion();
}
@ -294,12 +298,12 @@ static void iothread_service_main_thread_requests() {
// Service the queue of results
static void iothread_service_result_queue() {
// Move the queue to a local variable.
std::queue<spawn_request_t> result_queue;
std::queue<work_request_t> result_queue;
s_result_queue.acquire()->swap(result_queue);
// Perform each completion in order
while (!result_queue.empty()) {
spawn_request_t req(std::move(result_queue.front()));
work_request_t req(std::move(result_queue.front()));
result_queue.pop();
// ensure we don't invoke empty functions, that raises an exception
if (req.completion != nullptr) {