From 057087e0e84941f60f4e0d19c3000b2be91df2ad Mon Sep 17 00:00:00 2001 From: Sam Date: Fri, 12 Oct 2018 17:03:47 +1100 Subject: [PATCH] FEATURE: log long running jobs in the defer queue If a job in the defer queue takes longer than 90 seconds log an error --- lib/scheduler/defer.rb | 32 ++++++++++++++++++-- spec/components/scheduler/defer_spec.rb | 40 +++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/lib/scheduler/defer.rb b/lib/scheduler/defer.rb index f8f37090bb2..f4a847eef87 100644 --- a/lib/scheduler/defer.rb +++ b/lib/scheduler/defer.rb @@ -1,11 +1,26 @@ +# frozen_string_literal: true +require 'weakref' + module Scheduler + module Deferrable + + DEFAULT_TIMEOUT ||= 90 + def initialize @async = !Rails.env.test? @queue = Queue.new @mutex = Mutex.new @paused = false @thread = nil + @reactor = nil + @timeout = DEFAULT_TIMEOUT + end + + def timeout=(t) + @mutex.synchronize do + @timeout = t + end end def length @@ -28,7 +43,7 @@ module Scheduler def later(desc = nil, db = RailsMultisite::ConnectionManagement.current_db, &blk) if @async - start_thread unless @thread&.alive? || @paused + start_thread if !@thread&.alive? && !@paused @queue << [db, blk, desc] else blk.call @@ -38,6 +53,8 @@ module Scheduler def stop! @thread.kill if @thread&.alive? @thread = nil + @reactor&.stop + @reactor = nil end # test only @@ -55,8 +72,12 @@ module Scheduler def start_thread @mutex.synchronize do - return if @thread&.alive? - @thread = Thread.new { do_work while true } + if !@reactor + @reactor = MessageBus::TimerThread.new + end + if !@thread&.alive? + @thread = Thread.new { do_work while true } + end end end @@ -67,9 +88,14 @@ module Scheduler RailsMultisite::ConnectionManagement.with_connection(db) do begin + warning_job = @reactor.queue(@timeout) do + Rails.logger.error "'#{desc}' is still running after #{@timeout} seconds on db #{db}, this process may need to be restarted!" + end if !non_block job.call rescue => ex Discourse.handle_job_exception(ex, message: "Running deferred code '#{desc}'") + ensure + warning_job&.cancel end end rescue => ex diff --git a/spec/components/scheduler/defer_spec.rb b/spec/components/scheduler/defer_spec.rb index e7f57315e6f..db5da4b9af0 100644 --- a/spec/components/scheduler/defer_spec.rb +++ b/spec/components/scheduler/defer_spec.rb @@ -14,6 +14,26 @@ describe Scheduler::Defer do end end + class TrackingLogger < ::Logger + attr_reader :messages + def initialize + super(nil) + @messages = [] + end + def add(*args, &block) + @messages << args + end + end + + def track_log_messages + old_logger = Rails.logger + logger = Rails.logger = TrackingLogger.new + yield logger.messages + logger.messages + ensure + Rails.logger = old_logger + end + before do @defer = DeferInstance.new @defer.async = true @@ -23,6 +43,26 @@ describe Scheduler::Defer do @defer.stop! end + it "supports timeout reporting" do + @defer.timeout = 0.05 + + m = track_log_messages do |messages| + 10.times do + @defer.later("fast job") {} + end + @defer.later "weird slow job" do + sleep + end + + wait_for(100) do + messages.length == 1 + end + end + + expect(m.length).to eq(1) + expect(m[0][2]).to include("weird slow job") + end + it "can pause and resume" do x = 1 @defer.pause