Remove iothread drain flag

This was intended to support a mode where we "drain threads before fork"
but that ship has long sailed and it proved unnecessary.
This commit is contained in:
ridiculousfish 2022-06-19 15:06:49 -07:00
parent e2782ac322
commit 96b3a86b87
3 changed files with 6 additions and 60 deletions

View File

@ -4424,21 +4424,6 @@ void history_tests_t::test_history_races() {
say(L"Testing history race conditions");
// It appears TSAN and ASAN's allocators do not release their locks properly in atfork, so
// allocating with multiple threads risks deadlock. Drain threads before running under ASAN.
// TODO: stop forking with these tests.
bool needs_thread_drain = false;
#if __SANITIZE_ADDRESS__
needs_thread_drain |= true;
#endif
#if defined(__has_feature)
needs_thread_drain |= __has_feature(thread_sanitizer) || __has_feature(address_sanitizer);
#endif
if (needs_thread_drain) {
iothread_drain_all();
}
// Test concurrent history writing.
// How many concurrent writers we have
constexpr size_t RACE_COUNT = 4;

View File

@ -57,9 +57,6 @@ struct thread_pool_t : noncopyable_t, nonmovable_t {
/// The number of threads which are waiting for more work.
size_t waiting_threads{0};
/// A flag indicating we should not process new requests.
bool drain{false};
};
/// Data which needs to be atomically accessed.
@ -182,9 +179,7 @@ int thread_pool_t::perform(void_function_t &&func, bool cant_wait) {
auto data = pool.req_data.acquire();
data->request_queue.push(std::move(req));
FLOGF(iothread, L"enqueuing work item (count is %lu)", data->request_queue.size());
if (data->drain) {
// Do nothing here.
} else if (data->waiting_threads >= data->request_queue.size()) {
if (data->waiting_threads >= data->request_queue.size()) {
// There's enough waiting threads, wake one up.
wakeup_thread = true;
} else if (cant_wait || data->total_threads < pool.max_threads) {
@ -228,46 +223,12 @@ void iothread_service_main_with_timeout(uint64_t timeout_usec) {
}
}
/// At the moment, this function is only used in the test suite and in a
/// drain-all-threads-before-fork compatibility mode that no architecture requires, so it's OK that
/// it's terrible.
int iothread_drain_all() {
ASSERT_IS_MAIN_THREAD();
ASSERT_IS_NOT_FORKED_CHILD();
int thread_count;
auto &pool = s_io_thread_pool;
// Set the drain flag.
{
auto data = pool.req_data.acquire();
assert(!data->drain && "Should not be draining already");
data->drain = true;
thread_count = data->total_threads;
}
// Wake everyone up.
pool.queue_cond.notify_all();
double now = timef();
/// At the moment, this function is only used in the test suite.
void iothread_drain_all() {
// Nasty polling via select().
while (pool.req_data.acquire()->total_threads > 0) {
while (s_io_thread_pool.req_data.acquire()->total_threads > 0) {
iothread_service_main_with_timeout(1000);
}
// Clear the drain flag.
// Even though we released the lock, nobody should have added a new thread while the drain flag
// is set.
{
auto data = pool.req_data.acquire();
assert(data->total_threads == 0 && "Should be no threads");
assert(data->drain && "Should be draining");
data->drain = false;
}
double after = timef();
FLOGF(iothread, "Drained %d thread(s) in %.02f msec", thread_count, 1000 * (after - now));
return thread_count;
}
// Service the main thread queue, by invoking any functions enqueued for the main thread.

View File

@ -22,8 +22,8 @@ void iothread_service_main();
void iothread_service_main_with_timeout(uint64_t timeout_usec);
/// Waits for all iothreads to terminate.
/// \return the number of threads that were running.
int iothread_drain_all();
/// This is a hacky function only used in the test suite.
void iothread_drain_all();
// Internal implementation
void iothread_perform_impl(std::function<void()> &&, bool cant_wait = false);