From 9f986d8a8660193d5a716a65307adb3708f14a24 Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Wed, 27 Nov 2013 16:04:12 -0800 Subject: [PATCH] Implemented iothread_perform_on_main() to support background threads scheduling work on main thread --- fish_tests.cpp | 45 +++++++++++++ iothread.cpp | 171 ++++++++++++++++++++++++++++++++++++++----------- iothread.h | 13 +++- 3 files changed, 189 insertions(+), 40 deletions(-) diff --git a/fish_tests.cpp b/fish_tests.cpp index b226d3079..d00c7da29 100644 --- a/fish_tests.cpp +++ b/fish_tests.cpp @@ -489,6 +489,50 @@ static void test_fork(void) #undef FORK_COUNT } +// Little function that runs in the main thread +static int test_iothread_main_call(int *addr) +{ + *addr += 1; + return *addr; +} + +// Little function that runs in a background thread, bouncing to the main +static int test_iothread_thread_call(int *addr) +{ + int before = *addr; + iothread_perform_on_main(test_iothread_main_call, addr); + int after = *addr; + + // Must have incremented it at least once + if (before >= after) + { + err(L"Failed to increment from background thread"); + } + return after; +} + +static void test_iothread(void) +{ + say(L"Testing iothreads"); + int *int_ptr = new int(0); + int iterations = 1000; + for (int i=0; i < iterations; i++) + { + iothread_perform(test_iothread_thread_call, (void (*)(int *, int))NULL, int_ptr); + } + + // Now wait until we're done + iothread_drain_all(); + + // Should have incremented it once per thread + if (*int_ptr != iterations) + { + say(L"Expected int to be %d, but instead it was %d", iterations, *int_ptr); + } + + delete int_ptr; +} + /** Test the parser */ @@ -1878,6 +1922,7 @@ int main(int argc, char **argv) test_convert_nulls(); test_tok(); test_fork(); + test_iothread(); test_parser(); test_utils(); test_escape_sequences(); diff --git a/iothread.cpp b/iothread.cpp index 54117dc04..6f07aec0b 100644 --- a/iothread.cpp +++ b/iothread.cpp @@ -22,6 +22,11 @@ #define IO_MAX_THREADS 64 #endif +/* A special "thread index" that means service main thread requests */ +#define IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE 99 + +static void iothread_service_main_thread_requests(void); + static int s_active_thread_count; typedef unsigned char ThreadIndex_t; @@ -32,16 +37,22 @@ static struct WorkerThread_t pthread_t thread; } threads[IO_MAX_THREADS]; -struct ThreadedRequest_t +struct SpawnRequest_t { - int sequenceNumber; - int (*handler)(void *); void (*completionCallback)(void *, int); void *context; int handlerResult; }; +struct MainThreadRequest_t +{ + int (*handler)(void *); + void *context; + volatile int handlerResult; + volatile bool done; +}; + static struct WorkerThread_t *next_vacant_thread_slot(void) { for (ThreadIndex_t i=0; i < IO_MAX_THREADS; i++) @@ -51,9 +62,17 @@ static struct WorkerThread_t *next_vacant_thread_slot(void) return NULL; } -static pthread_mutex_t s_request_queue_lock; -static std::queue s_request_queue; -static int s_last_sequence_number; +/* Spawn support */ +static pthread_mutex_t s_spawn_queue_lock; +static std::queue s_request_queue; + +/* "Do on main thread" support */ +static pthread_mutex_t s_main_thread_performer_lock; // protects the main thread requests +static pthread_cond_t s_main_thread_performer_condition; //protects the main thread requests +static pthread_mutex_t s_main_thread_request_queue_lock; // protects the queue +static std::queue s_main_thread_request_queue; + +/* Notifying pipes */ static int s_read_pipe, s_write_pipe; static void iothread_init(void) @@ -63,8 +82,11 @@ static void iothread_init(void) { inited = true; - /* Initialize the queue lock */ - VOMIT_ON_FAILURE(pthread_mutex_init(&s_request_queue_lock, NULL)); + /* Initialize some locks */ + VOMIT_ON_FAILURE(pthread_mutex_init(&s_spawn_queue_lock, NULL)); + VOMIT_ON_FAILURE(pthread_mutex_init(&s_main_thread_request_queue_lock, NULL)); + VOMIT_ON_FAILURE(pthread_mutex_init(&s_main_thread_performer_lock, NULL)); + VOMIT_ON_FAILURE(pthread_cond_init(&s_main_thread_performer_condition, NULL)); /* Initialize the completion pipes */ int pipes[2] = {0, 0}; @@ -84,16 +106,16 @@ static void iothread_init(void) } } -static void add_to_queue(struct ThreadedRequest_t *req) +static void add_to_queue(struct SpawnRequest_t *req) { - ASSERT_IS_LOCKED(s_request_queue_lock); + ASSERT_IS_LOCKED(s_spawn_queue_lock); s_request_queue.push(req); } -static ThreadedRequest_t *dequeue_request(void) +static SpawnRequest_t *dequeue_spawn_request(void) { - ThreadedRequest_t *result = NULL; - scoped_lock lock(s_request_queue_lock); + SpawnRequest_t *result = NULL; + scoped_lock lock(s_spawn_queue_lock); if (! s_request_queue.empty()) { result = s_request_queue.front(); @@ -109,7 +131,7 @@ static void *iothread_worker(void *threadPtr) struct WorkerThread_t *thread = (struct WorkerThread_t *)threadPtr; /* Grab a request off of the queue */ - struct ThreadedRequest_t *req = dequeue_request(); + struct SpawnRequest_t *req = dequeue_spawn_request(); /* Run the handler and store the result */ if (req) @@ -127,7 +149,7 @@ static void *iothread_worker(void *threadPtr) /* Spawn another thread if there's work to be done. */ static void iothread_spawn_if_needed(void) { - ASSERT_IS_LOCKED(s_request_queue_lock); + ASSERT_IS_LOCKED(s_spawn_queue_lock); if (! s_request_queue.empty() && s_active_thread_count < IO_MAX_THREADS) { struct WorkerThread_t *thread = next_vacant_thread_slot(); @@ -168,14 +190,13 @@ int iothread_perform_base(int (*handler)(void *), void (*completionCallback)(voi iothread_init(); /* Create and initialize a request. */ - struct ThreadedRequest_t *req = new ThreadedRequest_t(); + struct SpawnRequest_t *req = new SpawnRequest_t(); req->handler = handler; req->completionCallback = completionCallback; req->context = context; - req->sequenceNumber = ++s_last_sequence_number; /* Take our lock */ - scoped_lock lock(s_request_queue_lock); + scoped_lock lock(s_spawn_queue_lock); /* Add to the queue */ add_to_queue(req); @@ -196,31 +217,38 @@ void iothread_service_completion(void) ASSERT_IS_MAIN_THREAD(); ThreadIndex_t threadIdx = (ThreadIndex_t)-1; VOMIT_ON_FAILURE(1 != read_loop(iothread_port(), &threadIdx, sizeof threadIdx)); - assert(threadIdx < IO_MAX_THREADS); - struct WorkerThread_t *thread = &threads[threadIdx]; - assert(thread->thread != 0); - - struct ThreadedRequest_t *req = NULL; - VOMIT_ON_FAILURE(pthread_join(thread->thread, (void **)&req)); - - /* Free up this thread */ - thread->thread = 0; - assert(s_active_thread_count > 0); - s_active_thread_count -= 1; - - /* Handle the request */ - if (req) + if (threadIdx == IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE) { - if (req->completionCallback) - req->completionCallback(req->context, req->handlerResult); - delete req; + iothread_service_main_thread_requests(); } + else + { + assert(threadIdx < IO_MAX_THREADS); - /* Maybe spawn another thread, if there's more work to be done. */ - VOMIT_ON_FAILURE(pthread_mutex_lock(&s_request_queue_lock)); - iothread_spawn_if_needed(); - VOMIT_ON_FAILURE(pthread_mutex_unlock(&s_request_queue_lock)); + struct WorkerThread_t *thread = &threads[threadIdx]; + assert(thread->thread != 0); + + struct SpawnRequest_t *req = NULL; + VOMIT_ON_FAILURE(pthread_join(thread->thread, (void **)&req)); + + /* Free up this thread */ + thread->thread = 0; + assert(s_active_thread_count > 0); + s_active_thread_count -= 1; + + /* Handle the request */ + if (req) + { + if (req->completionCallback) + req->completionCallback(req->context, req->handlerResult); + delete req; + } + + /* Maybe spawn another thread, if there's more work to be done. */ + scoped_lock locker(s_spawn_queue_lock); + iothread_spawn_if_needed(); + } } void iothread_drain_all(void) @@ -243,3 +271,68 @@ void iothread_drain_all(void) printf("(Waited %.02f msec for %d thread(s) to drain)\n", 1000 * (after - now), thread_count); #endif } + +/* "Do on main thread" support */ + +static void iothread_service_main_thread_requests(void) +{ + ASSERT_IS_MAIN_THREAD(); + + // Move the queue to a local variable + std::queue request_queue; + { + scoped_lock queue_lock(s_main_thread_request_queue_lock); + std::swap(request_queue, s_main_thread_request_queue); + } + + if (! request_queue.empty()) + { + // Perform each of the functions + // Note we are NOT responsible for deleting these. They are stack allocated in their respective threads! + scoped_lock cond_lock(s_main_thread_performer_lock); + while (! request_queue.empty()) + { + MainThreadRequest_t *req = request_queue.front(); + request_queue.pop(); + req->handlerResult = req->handler(req->context); + req->done = true; + } + + // Ok, we've handled everybody. Announce the good news, and allow ourselves to be unlocked + VOMIT_ON_FAILURE(pthread_cond_broadcast(&s_main_thread_performer_condition)); + } +} + +int iothread_perform_on_main_base(int (*handler)(void *), void *context) +{ + ASSERT_IS_BACKGROUND_THREAD(); + + // Make a new request. Note we are synchronous, so this can be stack allocated! + MainThreadRequest_t req; + req.handler = handler; + req.context = context; + req.handlerResult = 0; + req.done = false; + + // Append it + { + scoped_lock queue_lock(s_main_thread_request_queue_lock); + s_main_thread_request_queue.push(&req); + } + + // Tell the pipe + const ThreadIndex_t idx = IO_SERVICE_MAIN_THREAD_REQUEST_QUEUE; + VOMIT_ON_FAILURE(! write_loop(s_write_pipe, (const char *)&idx, sizeof idx)); + + // Wait on the condition, until we're done + scoped_lock perform_lock(s_main_thread_performer_lock); + while (! req.done) + { + // It would be nice to support checking for cancellation here, but the clients need a deterministic way to clean up to avoid leaks + VOMIT_ON_FAILURE(pthread_cond_wait(&s_main_thread_performer_condition, &s_main_thread_performer_lock)); + } + + // Ok, the request must now be done + assert(req.done); + return req.handlerResult; +} diff --git a/iothread.h b/iothread.h index 88c4a430c..ac328ace0 100644 --- a/iothread.h +++ b/iothread.h @@ -28,11 +28,22 @@ void iothread_service_completion(void); /** Waits for all iothreads to terminate. */ void iothread_drain_all(void); -/** Helper template */ +/** Performs a function on the main thread, blocking until it completes */ +int iothread_perform_on_main_base(int (*handler)(void *), void *context); + +/** Helper templates */ template int iothread_perform(int (*handler)(T *), void (*completionCallback)(T *, int), T *context) { return iothread_perform_base((int (*)(void *))handler, (void (*)(void *, int))completionCallback, static_cast(context)); } +/** Helper templates */ +template +int iothread_perform_on_main(int (*handler)(T *), T *context) +{ + return iothread_perform_on_main_base((int (*)(void *))handler, static_cast(context)); +} + + #endif