Introduce debounce_t

debounce_t will be used to limit thread creation from background highlighting
and autosuggestion scenarios. This is a one-element queue backed by a
single thread. New requests displace any existing queued request; this
reflects the fact that autosuggestions and highlighting only care about
the most recent result.

A timeout allows for abandoning hung threads, which may happen if you
attempt to e.g. access a dead hard-mounted NFS server. We don't want
this to defeat autosuggestions and highlighting permanently, so allow
spawning a new thread after the timeout (here 500 ms).
This commit is contained in:
ridiculousfish 2020-03-02 22:57:41 -08:00
parent a6b565d502
commit bde2f2111d
3 changed files with 263 additions and 37 deletions

View File

@ -881,6 +881,100 @@ static void test_pthread() {
do_test(val == 5);
}
static void test_debounce() {
say(L"Testing debounce");
// Run 8 functions using a condition variable.
// Only the first and last should run.
debounce_t db;
constexpr size_t count = 8;
std::array<bool, count> handler_ran = {};
std::array<bool, count> completion_ran = {};
bool ready_to_go = false;
std::mutex m;
std::condition_variable cv;
// "Enqueue" all functions. Each one waits until ready_to_go.
for (size_t idx = 0; idx < count; idx++) {
do_test(handler_ran[idx] == false);
db.perform(
[&, idx] {
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [&] { return ready_to_go; });
handler_ran[idx] = true;
return idx;
},
[&](size_t idx) { completion_ran[idx] = true; });
}
// We're ready to go.
{
std::unique_lock<std::mutex> lock(m);
ready_to_go = true;
}
cv.notify_all();
// Wait until the last completion is done.
while (!completion_ran.back()) {
iothread_service_completion();
}
iothread_drain_all();
// Each perform() call may displace an existing queued operation.
// Each operation waits until all are queued.
// Therefore we expect the last perform() to have run, and at most one more.
do_test(handler_ran.back());
do_test(completion_ran.back());
size_t total_ran = 0;
for (size_t idx = 0; idx < count; idx++) {
total_ran += (handler_ran[idx] ? 1 : 0);
do_test(handler_ran[idx] == completion_ran[idx]);
}
do_test(total_ran <= 2);
}
static void test_debounce_timeout() {
using namespace std::chrono;
say(L"Testing debounce timeout");
// Verify that debounce doesn't wait forever.
// Use a shared_ptr so we don't have to join our threads.
const long timeout_ms = 50;
struct data_t {
debounce_t db{timeout_ms};
bool exit_ok = false;
std::mutex m;
std::condition_variable cv;
relaxed_atomic_t<uint32_t> running{0};
};
auto data = std::make_shared<data_t>();
// Our background handler. Note this just blocks until exit_ok is set.
std::function<void()> handler = [data] {
data->running++;
std::unique_lock<std::mutex> lock(data->m);
data->cv.wait(lock, [&] { return data->exit_ok; });
};
// Spawn the handler twice. This should not modify the thread token.
uint64_t token1 = data->db.perform(handler);
uint64_t token2 = data->db.perform(handler);
do_test(token1 == token2);
// Wait 75 msec, then enqueue something else; this should spawn a new thread.
std::this_thread::sleep_for(std::chrono::milliseconds(timeout_ms + timeout_ms / 2));
do_test(data->running == 1);
uint64_t token3 = data->db.perform(handler);
do_test(token3 > token2);
// Release all the threads.
std::unique_lock<std::mutex> lock(data->m);
data->exit_ok = true;
data->cv.notify_all();
}
static parser_test_error_bits_t detect_argument_errors(const wcstring &src) {
parse_node_tree_t tree;
if (!parse_tree_from_string(src, parse_flag_none, &tree, NULL, symbol_argument_list)) {
@ -5619,6 +5713,8 @@ int main(int argc, char **argv) {
if (should_test_function("fd_monitor")) test_fd_monitor();
if (should_test_function("iothread")) test_iothread();
if (should_test_function("pthread")) test_pthread();
if (should_test_function("debounce")) test_debounce();
if (should_test_function("debounce")) test_debounce_timeout();
if (should_test_function("parser")) test_parser();
if (should_test_function("cancellation")) test_cancellation();
if (should_test_function("indents")) test_indents();

View File

@ -16,6 +16,7 @@
#include <condition_variable>
#include <functional>
#include <queue>
#include <thread>
#include "common.h"
#include "flog.h"
@ -132,7 +133,7 @@ struct thread_pool_t {
/// These are used for completions, etc.
static thread_pool_t s_io_thread_pool(1, IO_MAX_THREADS);
static owning_lock<std::queue<work_request_t>> s_result_queue;
static owning_lock<std::queue<void_function_t>> s_result_queue;
// "Do on main thread" support.
static std::mutex s_main_thread_performer_lock; // protects the main thread requests
@ -195,8 +196,11 @@ maybe_t<work_request_t> thread_pool_t::dequeue_work_or_commit_to_exit() {
return result;
}
static void enqueue_thread_result(work_request_t req) {
static void enqueue_thread_result(void_function_t req) {
s_result_queue.acquire()->push(std::move(req));
const char wakeup_byte = IO_SERVICE_RESULT_QUEUE;
int notify_fd = get_notify_pipes().write;
assert_with_errno(write_loop(notify_fd, &wakeup_byte, sizeof wakeup_byte) != -1);
}
static void *this_thread() { return (void *)(intptr_t)pthread_self(); }
@ -212,10 +216,7 @@ void *thread_pool_t::run() {
// Note we're using std::function's weirdo operator== here
if (req->completion != nullptr) {
// Enqueue the result, and tell the main thread about it.
enqueue_thread_result(req.acquire());
const char wakeup_byte = IO_SERVICE_RESULT_QUEUE;
int notify_fd = get_notify_pipes().write;
assert_with_errno(write_loop(notify_fd, &wakeup_byte, sizeof wakeup_byte) != -1);
enqueue_thread_result(std::move(req->completion));
}
}
FLOGF(iothread, L"pthread %p exiting", this_thread());
@ -392,16 +393,16 @@ static void iothread_service_main_thread_requests() {
// Service the queue of results
static void iothread_service_result_queue() {
// Move the queue to a local variable.
std::queue<work_request_t> result_queue;
std::queue<void_function_t> result_queue;
s_result_queue.acquire()->swap(result_queue);
// Perform each completion in order
while (!result_queue.empty()) {
work_request_t req(std::move(result_queue.front()));
void_function_t req(std::move(result_queue.front()));
result_queue.pop();
// ensure we don't invoke empty functions, that raises an exception
if (req.completion != nullptr) {
req.completion();
if (req != nullptr) {
req();
}
}
}
@ -493,3 +494,96 @@ uint64_t thread_id() {
static thread_local uint64_t tl_tid = next_thread_id();
return tl_tid;
}
// Debounce implementation note: we would like to enqueue at most one request, except if a thread
// hangs (e.g. on fs access) then we do not want to block indefinitely; such threads are called
// "abandoned". This is implemented via a monotone uint64 counter, called a token.
// Every time we spawn a thread, increment the token. When the thread is completed, it compares its
// token to the active token; if they differ then this thread was abandoned.
struct debounce_t::impl_t {
// Synchronized data from debounce_t.
struct data_t {
// The (at most 1) next enqueued request, or none if none.
maybe_t<work_request_t> next_req{};
// The token of the current non-abandoned thread, or 0 if no thread is running.
uint64_t active_token{0};
// The next token to use when spawning a thread.
uint64_t next_token{1};
// The start time of the most recently run thread spawn, or request (if any).
std::chrono::time_point<std::chrono::steady_clock> start_time{};
};
owning_lock<data_t> data{};
/// Run an iteration in the background, with the given thread token.
/// \return true if we handled a request, false if there were none.
bool run_next(uint64_t token);
};
bool debounce_t::impl_t::run_next(uint64_t token) {
assert(token > 0 && "Invalid token");
// Note we are on a background thread.
maybe_t<work_request_t> req;
{
auto d = data.acquire();
if (d->next_req) {
// The value was dequeued, we are going to execute it.
req = d->next_req.acquire();
d->start_time = std::chrono::steady_clock::now();
} else {
// There is no request. If we are active, mark ourselves as no longer running.
if (token == d->active_token) {
d->active_token = 0;
}
return false;
}
}
assert(req && req->handler && "Request should have value");
req->handler();
if (req->completion) {
enqueue_thread_result(std::move(req->completion));
}
return true;
}
uint64_t debounce_t::perform_impl(std::function<void()> handler, std::function<void()> completion) {
uint64_t active_token{0};
bool spawn{false};
// Local lock.
{
auto d = impl_->data.acquire();
d->next_req = work_request_t{std::move(handler), std::move(completion)};
// If we have a timeout, and our running thread has exceeded it, abandon that thread.
if (d->active_token && timeout_msec_ > 0 &&
std::chrono::steady_clock::now() - d->start_time >
std::chrono::milliseconds(timeout_msec_)) {
// Abandon this thread by marking nothing as active.
d->active_token = 0;
}
if (!d->active_token) {
// We need to spawn a new thread.
// Mark the current time so that a new request won't immediately abandon us.
spawn = true;
d->active_token = d->next_token++;
d->start_time = std::chrono::steady_clock::now();
}
active_token = d->active_token;
assert(active_token && "Something should be active");
}
if (spawn) {
// Equip our background thread with a reference to impl, to keep it alive.
auto impl = impl_;
iothread_perform([=] {
while (impl->run_next(active_token))
; // pass
});
}
return active_token;
}
debounce_t::debounce_t(long timeout_msec)
: timeout_msec_(timeout_msec), impl_(std::make_shared<impl_t>()) {}
debounce_t::~debounce_t() = default;

View File

@ -6,8 +6,11 @@
#include <cstdint> // for uint64_t
#include <functional>
#include <memory>
#include <type_traits>
#include "maybe.h"
/// Runs a command on a thread.
///
/// \param handler The function to execute on a background thread. Accepts an arbitrary context
@ -35,41 +38,43 @@ int iothread_drain_all(void);
int iothread_perform_impl(std::function<void(void)> &&func, std::function<void(void)> &&completion,
bool cant_wait = false);
// Template helpers
// This is the glue part of the handler-completion handoff
// In general we can just allocate an object, move the result of the handler into it,
// and then call the completion with that object. However if our type is void,
// this won't work (new void() fails!). So we have to use this template.
// The type T is the return type of HANDLER and the argument to COMPLETION
template <typename T>
struct _iothread_trampoline {
template <typename HANDLER, typename COMPLETION>
static int perform(const HANDLER &handler, const COMPLETION &completion) {
T *result = new T(); // TODO: placement new?
return iothread_perform_impl([=]() { *result = handler(); },
[=]() {
completion(std::move(*result));
delete result;
});
// This is the glue part of the handler-completion handoff.
// Given a Handler and Completion, where the return value of Handler should be passed to Completion,
// this generates new void->void functions that wraps that behavior. The type T is the return type
// of Handler and the argument to Completion
template <typename Handler, typename Completion,
typename Result = typename std::result_of<Handler()>::type>
struct iothread_trampoline_t {
iothread_trampoline_t(const Handler &hand, const Completion &comp) {
auto result = std::make_shared<maybe_t<Result>>();
this->handler = [=] { *result = hand(); };
this->completion = [=] { comp(result->acquire()); };
}
// The generated handler and completion functions.
std::function<void()> handler;
std::function<void()> completion;
};
// Void specialization
template <>
struct _iothread_trampoline<void> {
template <typename HANDLER, typename COMPLETION>
static int perform(const HANDLER &handler, const COMPLETION &completion) {
return iothread_perform_impl(handler, completion);
}
// Void specialization.
template <typename Handler, typename Completion>
struct iothread_trampoline_t<Handler, Completion, void> {
iothread_trampoline_t(std::function<void()> hand, std::function<void()> comp)
: handler(std::move(hand)), completion(std::move(comp)) {}
// The generated handler and completion functions.
std::function<void()> handler;
std::function<void()> completion;
};
// iothread_perform invokes a handler on a background thread, and then a completion function
// on the main thread. The value returned from the handler is passed to the completion.
// In other words, this is like COMPLETION(HANDLER()) except the handler part is invoked
// In other words, this is like Completion(Handler()) except the handler part is invoked
// on a background thread.
template <typename HANDLER, typename COMPLETION>
int iothread_perform(const HANDLER &handler, const COMPLETION &completion) {
return _iothread_trampoline<decltype(handler())>::perform(handler, completion);
template <typename Handler, typename Completion>
int iothread_perform(const Handler &handler, const Completion &completion) {
iothread_trampoline_t<Handler, Completion> tramp(handler, completion);
return iothread_perform_impl(std::move(tramp.handler), std::move(tramp.completion));
}
// variant of iothread_perform without a completion handler
@ -98,4 +103,35 @@ bool make_detached_pthread(std::function<void(void)> &&func);
/// Thread IDs are never repeated.
uint64_t thread_id();
/// A Debounce is a simple class which executes one function in a background thread,
/// while enqueuing at most one more. New execution requests overwrite the enqueued one.
/// It has an optional timeout; if a handler does not finish within the timeout, then
/// a new thread is spawned.
class debounce_t {
public:
/// Enqueue \p handler to be performed on a background thread, and \p completion (if any) to be
/// performed on the main thread. If a function is already enqueued, this overwrites it; that
/// function will not execute.
/// This returns the active thread token, which is only of interest to tests.
template <typename Handler, typename Completion>
uint64_t perform(Handler handler, Completion completion) {
iothread_trampoline_t<Handler, Completion> tramp(handler, completion);
return perform_impl(std::move(tramp.handler), std::move(tramp.completion));
}
/// One-argument form with no completion.
uint64_t perform(std::function<void()> func) { return perform_impl(std::move(func), {}); }
explicit debounce_t(long timeout_msec = 0);
~debounce_t();
private:
/// Implementation of perform().
uint64_t perform_impl(std::function<void()> handler, std::function<void()> completion);
const long timeout_msec_;
struct impl_t;
const std::shared_ptr<impl_t> impl_;
};
#endif