Remove the completion form of iothread_perform

Previously iothread_perform could do something on a background thread, and
then do something on the main thread. But we no longer use that second
part: instead everything goes through debounce. Remove the completion
parameter from iothread_perform.
This commit is contained in:
ridiculousfish 2021-02-26 15:27:48 -08:00
parent a68791fa89
commit 05d8907071
2 changed files with 35 additions and 79 deletions

View File

@ -46,10 +46,8 @@ using void_function_t = std::function<void()>;
struct work_request_t {
void_function_t handler;
void_function_t completion;
work_request_t(void_function_t &&f, void_function_t &&comp)
: handler(std::move(f)), completion(std::move(comp)) {}
explicit work_request_t(void_function_t &&f) : handler(std::move(f)) {}
// Move-only
work_request_t &operator=(const work_request_t &) = delete;
@ -109,10 +107,9 @@ struct thread_pool_t {
/// Enqueue a new work item onto the thread pool.
/// The function \p func will execute in one of the pool's threads.
/// \p completion will run on the main thread, if it is not missing.
/// If \p cant_wait is set, disrespect the thread limit, because extant threads may
/// want to wait for new threads.
int perform(void_function_t &&func, void_function_t &&completion, bool cant_wait);
int perform(void_function_t &&func, bool cant_wait);
private:
/// The worker loop for this thread.
@ -143,7 +140,7 @@ static thread_pool_t &s_io_thread_pool = *(new thread_pool_t(1, IO_MAX_THREADS))
/// A queue of "things to do on the main thread."
struct main_thread_queue_t {
// Functions to invoke as the completion callback from iothread_perform.
// Functions to invoke as the completion callback from debounce.
std::vector<void_function_t> completions;
// iothread_perform_on_main requests.
@ -205,26 +202,13 @@ maybe_t<work_request_t> thread_pool_t::dequeue_work_or_commit_to_exit() {
return result;
}
static void enqueue_thread_result(void_function_t req) {
s_main_thread_queue.acquire()->completions.push_back(std::move(req));
get_notify_signaller().post();
}
static void *this_thread() { return (void *)(intptr_t)pthread_self(); }
void *thread_pool_t::run() {
while (auto req = dequeue_work_or_commit_to_exit()) {
FLOGF(iothread, L"pthread %p got work", this_thread());
// Perform the work
req->handler();
// If there's a completion handler, we have to enqueue it on the result queue.
// Note we're using std::function's weirdo operator== here
if (req->completion != nullptr) {
// Enqueue the result, and tell the main thread about it.
enqueue_thread_result(std::move(req->completion));
}
}
FLOGF(iothread, L"pthread %p exiting", this_thread());
return nullptr;
@ -240,10 +224,10 @@ bool thread_pool_t::spawn() const {
return make_detached_pthread(&run_trampoline, const_cast<thread_pool_t *>(this));
}
int thread_pool_t::perform(void_function_t &&func, void_function_t &&completion, bool cant_wait) {
int thread_pool_t::perform(void_function_t &&func, bool cant_wait) {
assert(func && "Missing function");
// Note we permit an empty completion.
struct work_request_t req(std::move(func), std::move(completion));
struct work_request_t req(std::move(func));
int local_thread_count = -1;
auto &pool = s_io_thread_pool;
bool spawn_new_thread = false;
@ -285,10 +269,10 @@ int thread_pool_t::perform(void_function_t &&func, void_function_t &&completion,
return local_thread_count;
}
void iothread_perform_impl(void_function_t &&func, void_function_t &&completion, bool cant_wait) {
void iothread_perform_impl(void_function_t &&func, bool cant_wait) {
ASSERT_IS_MAIN_THREAD();
ASSERT_IS_NOT_FORKED_CHILD();
s_io_thread_pool.perform(std::move(func), std::move(completion), cant_wait);
s_io_thread_pool.perform(std::move(func), cant_wait);
}
int iothread_port() { return get_notify_signaller().read_fd(); }
@ -511,19 +495,16 @@ bool debounce_t::impl_t::run_next(uint64_t token) {
assert(req && req->handler && "Request should have value");
req->handler();
if (req->completion) {
enqueue_thread_result(std::move(req->completion));
}
return true;
}
uint64_t debounce_t::perform_impl(std::function<void()> handler, std::function<void()> completion) {
uint64_t debounce_t::perform(std::function<void()> handler) {
uint64_t active_token{0};
bool spawn{false};
// Local lock.
{
auto d = impl_->data.acquire();
d->next_req = work_request_t{std::move(handler), std::move(completion)};
d->next_req = work_request_t{std::move(handler)};
// If we have a timeout, and our running thread has exceeded it, abandon that thread.
if (d->active_token && timeout_msec_ > 0 &&
std::chrono::steady_clock::now() - d->start_time >
@ -552,6 +533,12 @@ uint64_t debounce_t::perform_impl(std::function<void()> handler, std::function<v
return active_token;
}
// static
void debounce_t::enqueue_main_thread_result(std::function<void()> func) {
s_main_thread_queue.acquire()->completions.push_back(std::move(func));
get_notify_signaller().post();
}
debounce_t::debounce_t(long timeout_msec)
: timeout_msec_(timeout_msec), impl_(std::make_shared<impl_t>()) {}
debounce_t::~debounce_t() = default;

View File

@ -26,58 +26,18 @@ void iothread_service_main_with_timeout(long timeout_usec);
int iothread_drain_all();
// Internal implementation
void iothread_perform_impl(std::function<void()> &&func, std::function<void()> &&completion,
bool cant_wait = false);
void iothread_perform_impl(std::function<void()> &&, bool cant_wait = false);
// This is the glue part of the handler-completion handoff.
// Given a Handler and Completion, where the return value of Handler should be passed to Completion,
// this generates new void->void functions that wraps that behavior. The type T is the return type
// of Handler and the argument to Completion
template <typename Handler, typename Completion,
typename Result = typename std::result_of<Handler()>::type>
struct iothread_trampoline_t {
iothread_trampoline_t(const Handler &hand, const Completion &comp) {
auto result = std::make_shared<maybe_t<Result>>();
this->handler = [=] { *result = hand(); };
this->completion = [=] { comp(result->acquire()); };
}
// The generated handler and completion functions.
std::function<void()> handler;
std::function<void()> completion;
};
// Void specialization.
template <typename Handler, typename Completion>
struct iothread_trampoline_t<Handler, Completion, void> {
iothread_trampoline_t(std::function<void()> hand, std::function<void()> comp)
: handler(std::move(hand)), completion(std::move(comp)) {}
// The handler and completion functions.
std::function<void()> handler;
std::function<void()> completion;
};
// iothread_perform invokes a handler on a background thread, and then a completion function
// on the main thread. The value returned from the handler is passed to the completion.
// In other words, this is like Completion(Handler()) except the handler part is invoked
// on a background thread.
template <typename Handler, typename Completion>
void iothread_perform(const Handler &handler, const Completion &completion) {
iothread_trampoline_t<Handler, Completion> tramp(handler, completion);
iothread_perform_impl(std::move(tramp.handler), std::move(tramp.completion));
}
// variant of iothread_perform without a completion handler
// iothread_perform invokes a handler on a background thread.
inline void iothread_perform(std::function<void()> &&func) {
iothread_perform_impl(std::move(func), {});
iothread_perform_impl(std::move(func));
}
/// Variant of iothread_perform that disrespects the thread limit.
/// It does its best to spawn a new thread if all other threads are occupied.
/// This is for cases where deferring a new thread might lead to deadlock.
inline void iothread_perform_cantwait(std::function<void()> &&func) {
iothread_perform_impl(std::move(func), {}, true);
iothread_perform_impl(std::move(func), true);
}
/// Performs a function on the main thread, blocking until it completes.
@ -103,22 +63,31 @@ class debounce_t {
/// Enqueue \p handler to be performed on a background thread, and \p completion (if any) to be
/// performed on the main thread. If a function is already enqueued, this overwrites it; that
/// function will not execute.
/// This returns the active thread token, which is only of interest to tests.
/// If the function executes, then \p completion will be invoked on the main thread, with the
/// result of the handler.
/// The result is a token which is only of interest to the tests.
template <typename Handler, typename Completion>
void perform(Handler handler, Completion completion) {
iothread_trampoline_t<Handler, Completion> tramp(handler, completion);
perform_impl(std::move(tramp.handler), std::move(tramp.completion));
uint64_t perform(const Handler &handler, const Completion &completion) {
// Make a trampoline function which calls the handler, puts the result into a shared
// pointer, and then enqueues a completion.
auto trampoline = [=] {
using result_type_t = decltype(handler());
auto result = std::make_shared<result_type_t>(handler());
enqueue_main_thread_result([=] { completion(std::move(*result)); });
};
return perform(std::move(trampoline));
}
/// One-argument form with no completion.
uint64_t perform(std::function<void()> func) { return perform_impl(std::move(func), {}); }
/// The result is a token which is only of interest to the tests.
uint64_t perform(std::function<void()> func);
explicit debounce_t(long timeout_msec = 0);
~debounce_t();
private:
/// Implementation of perform().
uint64_t perform_impl(std::function<void()> handler, std::function<void()> completion);
/// Helper to enqueue a function to run on the main thread.
static void enqueue_main_thread_result(std::function<void()> func);
const long timeout_msec_;
struct impl_t;