mirror of
https://github.com/discourse/discourse.git
synced 2024-11-22 14:03:22 +08:00
FEATURE: log long running jobs in the defer queue
If a job in the defer queue takes longer than 90 seconds log an error
This commit is contained in:
parent
f07bece17f
commit
057087e0e8
|
@ -1,11 +1,26 @@
|
|||
# frozen_string_literal: true
|
||||
require 'weakref'
|
||||
|
||||
module Scheduler
|
||||
|
||||
module Deferrable
|
||||
|
||||
DEFAULT_TIMEOUT ||= 90
|
||||
|
||||
def initialize
|
||||
@async = !Rails.env.test?
|
||||
@queue = Queue.new
|
||||
@mutex = Mutex.new
|
||||
@paused = false
|
||||
@thread = nil
|
||||
@reactor = nil
|
||||
@timeout = DEFAULT_TIMEOUT
|
||||
end
|
||||
|
||||
def timeout=(t)
|
||||
@mutex.synchronize do
|
||||
@timeout = t
|
||||
end
|
||||
end
|
||||
|
||||
def length
|
||||
|
@ -28,7 +43,7 @@ module Scheduler
|
|||
|
||||
def later(desc = nil, db = RailsMultisite::ConnectionManagement.current_db, &blk)
|
||||
if @async
|
||||
start_thread unless @thread&.alive? || @paused
|
||||
start_thread if !@thread&.alive? && !@paused
|
||||
@queue << [db, blk, desc]
|
||||
else
|
||||
blk.call
|
||||
|
@ -38,6 +53,8 @@ module Scheduler
|
|||
def stop!
|
||||
@thread.kill if @thread&.alive?
|
||||
@thread = nil
|
||||
@reactor&.stop
|
||||
@reactor = nil
|
||||
end
|
||||
|
||||
# test only
|
||||
|
@ -55,8 +72,12 @@ module Scheduler
|
|||
|
||||
def start_thread
|
||||
@mutex.synchronize do
|
||||
return if @thread&.alive?
|
||||
@thread = Thread.new { do_work while true }
|
||||
if !@reactor
|
||||
@reactor = MessageBus::TimerThread.new
|
||||
end
|
||||
if !@thread&.alive?
|
||||
@thread = Thread.new { do_work while true }
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -67,9 +88,14 @@ module Scheduler
|
|||
|
||||
RailsMultisite::ConnectionManagement.with_connection(db) do
|
||||
begin
|
||||
warning_job = @reactor.queue(@timeout) do
|
||||
Rails.logger.error "'#{desc}' is still running after #{@timeout} seconds on db #{db}, this process may need to be restarted!"
|
||||
end if !non_block
|
||||
job.call
|
||||
rescue => ex
|
||||
Discourse.handle_job_exception(ex, message: "Running deferred code '#{desc}'")
|
||||
ensure
|
||||
warning_job&.cancel
|
||||
end
|
||||
end
|
||||
rescue => ex
|
||||
|
|
|
@ -14,6 +14,26 @@ describe Scheduler::Defer do
|
|||
end
|
||||
end
|
||||
|
||||
class TrackingLogger < ::Logger
|
||||
attr_reader :messages
|
||||
def initialize
|
||||
super(nil)
|
||||
@messages = []
|
||||
end
|
||||
def add(*args, &block)
|
||||
@messages << args
|
||||
end
|
||||
end
|
||||
|
||||
def track_log_messages
|
||||
old_logger = Rails.logger
|
||||
logger = Rails.logger = TrackingLogger.new
|
||||
yield logger.messages
|
||||
logger.messages
|
||||
ensure
|
||||
Rails.logger = old_logger
|
||||
end
|
||||
|
||||
before do
|
||||
@defer = DeferInstance.new
|
||||
@defer.async = true
|
||||
|
@ -23,6 +43,26 @@ describe Scheduler::Defer do
|
|||
@defer.stop!
|
||||
end
|
||||
|
||||
it "supports timeout reporting" do
|
||||
@defer.timeout = 0.05
|
||||
|
||||
m = track_log_messages do |messages|
|
||||
10.times do
|
||||
@defer.later("fast job") {}
|
||||
end
|
||||
@defer.later "weird slow job" do
|
||||
sleep
|
||||
end
|
||||
|
||||
wait_for(100) do
|
||||
messages.length == 1
|
||||
end
|
||||
end
|
||||
|
||||
expect(m.length).to eq(1)
|
||||
expect(m[0][2]).to include("weird slow job")
|
||||
end
|
||||
|
||||
it "can pause and resume" do
|
||||
x = 1
|
||||
@defer.pause
|
||||
|
|
Loading…
Reference in New Issue
Block a user