diff --git a/lib/scheduler/thread_pool.rb b/lib/scheduler/thread_pool.rb index a6872704355..7f9308c01de 100644 --- a/lib/scheduler/thread_pool.rb +++ b/lib/scheduler/thread_pool.rb @@ -8,15 +8,21 @@ module Scheduler # Usage: # pool = ThreadPool.new(min_threads: 0, max_threads: 4, idle_time: 0.1) # pool.post { do_something } + # pool.stats (returns thread count, busy thread count, etc.) # - # (optional) - # pool.shutdown + # pool.shutdown (do not accept new tasks) + # pool.wait_for_termination(timeout: 1) (optional timeout) class ThreadPool class ShutdownError < StandardError end - def initialize(min_threads:, max_threads:, idle_time:) + def initialize(min_threads:, max_threads:, idle_time: nil) + # 30 seconds is a reasonable default for idle time + # it is particularly useful for the use case of: + # ThreadPool.new(min_threads: 4, max_threads: 4) + # operators would get confused about idle time cause why does it matter + idle_time ||= 30 raise ArgumentError, "min_threads must be 0 or larger" if min_threads < 0 raise ArgumentError, "max_threads must be 1 or larger" if max_threads < 1 raise ArgumentError, "max_threads must be >= min_threads" if max_threads < min_threads @@ -27,6 +33,7 @@ module Scheduler @idle_time = idle_time @threads = Set.new + @busy_threads = Set.new @queue = Queue.new @mutex = Mutex.new @@ -45,37 +52,45 @@ module Scheduler @mutex.synchronize do @queue << wrapped_block - - spawn_thread if @queue.length > 1 && @threads.length < @max_threads + spawn_thread if @threads.length == 0 @new_work.signal end end - def shutdown(timeout: 30) + def wait_for_termination(timeout: nil) + threads_to_join = nil + @mutex.synchronize { threads_to_join = @threads.to_a } + + if timeout.nil? + threads_to_join.each(&:join) + else + failed_to_shutdown = false + + deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout + threads_to_join.each do |thread| + remaining_time = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC) + break if remaining_time <= 0 + if !thread.join(remaining_time) + Rails.logger.error "ThreadPool: Failed to join thread within timeout\n#{thread.backtrace.join("\n")}" + failed_to_shutdown = true + end + end + + if failed_to_shutdown + @mutex.synchronize { @threads.each(&:kill) } + raise ShutdownError, "Failed to shutdown ThreadPool within timeout" + end + end + end + + def shutdown @mutex.synchronize do return if @shutdown @shutdown = true @threads.length.times { @queue << :shutdown } @new_work.broadcast end - - threads_to_join = nil - @mutex.synchronize { threads_to_join = @threads.to_a } - - failed_to_shutdown = false - - deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout - threads_to_join.each do |thread| - remaining_time = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC) - break if remaining_time <= 0 - if !thread.join(remaining_time) - Rails.logger.error "ThreadPool: Failed to join thread within timeout\n#{thread.backtrace.join("\n")}" - failed_to_shutdown = true - end - end - - raise ShutdownError, "Failed to shutdown ThreadPool within timeout" if failed_to_shutdown end def shutdown? @@ -90,6 +105,7 @@ module Scheduler shutdown: @shutdown, min_threads: @min_threads, max_threads: @max_threads, + busy_thread_count: @busy_threads.size, } end end @@ -115,12 +131,16 @@ module Scheduler work = nil @mutex.synchronize do - @new_work.wait(@mutex, @idle_time) + # we may have already have work so no need + # to wait for signals, this also handles the race + # condition between spinning up threads and posting work + work = @queue.pop(timeout: 0) + @new_work.wait(@mutex, @idle_time) if !work - if @queue.empty? + if !work && @queue.empty? done = @threads.count > @min_threads else - work = @queue.pop + work ||= @queue.pop if work == :shutdown work = nil @@ -128,11 +148,23 @@ module Scheduler end end + @busy_threads << Thread.current if work + + if !done && work && @queue.length > 0 && @threads.length < @max_threads && + @busy_threads.length == @threads.length + spawn_thread + end + @threads.delete(Thread.current) if done end - # could be nil if the thread just needs to idle - work&.call if !done + if work + begin + work.call + ensure + @mutex.synchronize { @busy_threads.delete(Thread.current) } + end + end end end diff --git a/spec/lib/scheduler/thread_pool_spec.rb b/spec/lib/scheduler/thread_pool_spec.rb index 76bd662932c..c62d01724d1 100644 --- a/spec/lib/scheduler/thread_pool_spec.rb +++ b/spec/lib/scheduler/thread_pool_spec.rb @@ -9,7 +9,10 @@ RSpec.describe Scheduler::ThreadPool, type: :multisite do described_class.new(min_threads: min_threads, max_threads: max_threads, idle_time: idle_time) end - after { pool.shutdown(timeout: 1) } + after do + pool.shutdown + pool.wait_for_termination(timeout: 1) + end describe "initialization" do it "creates the minimum number of threads and validates parameters" do @@ -70,7 +73,12 @@ RSpec.describe Scheduler::ThreadPool, type: :multisite do end end + # we spin up threads in the thread loop, so it can take + # a bit of time to react to work pressure + wait_for { pool.stats[:thread_count] == max_threads } + expect(pool.stats[:thread_count]).to eq(max_threads) + (max_threads + 1).times { blocker_queue << :continue } results = Array.new(max_threads + 1) { completion_queue.pop } @@ -119,7 +127,8 @@ RSpec.describe Scheduler::ThreadPool, type: :multisite do results2 = Array.new(3) { completion_queue2.pop } end - pool.shutdown(timeout: 1) + pool.shutdown + pool.wait_for_termination(timeout: 1) expect(results1.size).to eq(3) expect(results1.sort).to eq([0, 1, 2]) @@ -165,6 +174,24 @@ RSpec.describe Scheduler::ThreadPool, type: :multisite do end end + describe "when thread pool has zero min threads" do + it "can quickly process and can be cleanly terminated" do + # setting idle time to 1000 to ensure that there are maximal delays waiting + # for jobs + pool = Scheduler::ThreadPool.new(min_threads: 0, max_threads: 5, idle_time: 1000) + + done = Queue.new + pool.post { done << :done } + + # should happen in less than 1 second + Timeout.timeout(1) { expect(done.pop).to eq(:done) } + + pool.shutdown + pool.wait_for_termination + expect(pool.stats[:thread_count]).to eq(0) + end + end + describe "stress test" do it "handles multiple task submissions correctly" do completion_queue = Queue.new