From 7b63c42304abbe42bbea50472e55fa7d6af916e5 Mon Sep 17 00:00:00 2001 From: Sam Date: Thu, 12 Jan 2023 12:29:50 +1100 Subject: [PATCH] FEATURE: add basic instrumentation to defer queue (#19824) This will give us some aggregate stats on the defer queue performance. It is limited to 100 entries (for safety) which is stored in an LRU cache. Scheduler::Defer.stats can then be used to get an array that denotes: - number of runs and completions (queued, finished) - error count (errors) - total duration (duration) We can look later at exposing these metrics to gain visibility on the reason the defer queue is clogged. --- lib/scheduler/defer.rb | 24 ++++++++++++++++++++++++ spec/lib/scheduler/defer_spec.rb | 26 ++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/lib/scheduler/defer.rb b/lib/scheduler/defer.rb index 0afdcfbaa0b..6b059280060 100644 --- a/lib/scheduler/defer.rb +++ b/lib/scheduler/defer.rb @@ -4,15 +4,18 @@ require "weakref" module Scheduler module Deferrable DEFAULT_TIMEOUT ||= 90 + STATS_CACHE_SIZE ||= 100 def initialize @async = !Rails.env.test? @queue = Queue.new @mutex = Mutex.new + @stats_mutex = Mutex.new @paused = false @thread = nil @reactor = nil @timeout = DEFAULT_TIMEOUT + @stats = LruRedux::ThreadSafeCache.new(STATS_CACHE_SIZE) end def timeout=(t) @@ -23,6 +26,10 @@ module Scheduler @queue.length end + def stats + @stats_mutex.synchronize { @stats.to_a } + end + def pause stop! @paused = true @@ -38,6 +45,11 @@ module Scheduler end def later(desc = nil, db = RailsMultisite::ConnectionManagement.current_db, &blk) + @stats_mutex.synchronize do + stats = (@stats[desc] ||= { queued: 0, finished: 0, duration: 0, errors: 0 }) + stats[:queued] += 1 + end + if @async start_thread if !@thread&.alive? && !@paused @queue << [db, blk, desc] @@ -74,6 +86,7 @@ module Scheduler # using non_block to match Ruby #deq def do_work(non_block = false) db, job, desc = @queue.deq(non_block) + start = Process.clock_gettime(Process::CLOCK_MONOTONIC) db ||= RailsMultisite::ConnectionManagement::DEFAULT RailsMultisite::ConnectionManagement.with_connection(db) do @@ -84,6 +97,10 @@ module Scheduler end if !non_block job.call rescue => ex + @stats_mutex.synchronize do + stats = @stats[desc] + stats[:errors] += 1 if stats + end Discourse.handle_job_exception(ex, message: "Running deferred code '#{desc}'") ensure warning_job&.cancel @@ -93,6 +110,13 @@ module Scheduler Discourse.handle_job_exception(ex, message: "Processing deferred code queue") ensure ActiveRecord::Base.connection_handler.clear_active_connections! + @stats_mutex.synchronize do + stats = @stats[desc] + if stats + stats[:finished] += 1 + stats[:duration] += Process.clock_gettime(Process::CLOCK_MONOTONIC) - start + end + end end end diff --git a/spec/lib/scheduler/defer_spec.rb b/spec/lib/scheduler/defer_spec.rb index 67882c2a7ef..266af82bc39 100644 --- a/spec/lib/scheduler/defer_spec.rb +++ b/spec/lib/scheduler/defer_spec.rb @@ -18,6 +18,32 @@ RSpec.describe Scheduler::Defer do after { @defer.stop! } + it "supports basic instrumentation" do + @defer.later("first") {} + @defer.later("first") {} + @defer.later("second") {} + @defer.later("bad") { raise "boom" } + + wait_for(200) { @defer.length == 0 } + + stats = Hash[@defer.stats] + + expect(stats["first"][:queued]).to eq(2) + expect(stats["first"][:finished]).to eq(2) + expect(stats["first"][:errors]).to eq(0) + expect(stats["first"][:duration]).to be > 0 + + expect(stats["second"][:queued]).to eq(1) + expect(stats["second"][:finished]).to eq(1) + expect(stats["second"][:errors]).to eq(0) + expect(stats["second"][:duration]).to be > 0 + + expect(stats["bad"][:queued]).to eq(1) + expect(stats["bad"][:finished]).to eq(1) + expect(stats["bad"][:duration]).to be > 0 + expect(stats["bad"][:errors]).to eq(1) + end + it "supports timeout reporting" do @defer.timeout = 0.05