discourse/lib/sidekiq_long_running_job_logger.rb
Alan Guo Xiang Tan ae1d8c50da
DEV: Add ability to log a warning when Sidekiq job runs for too long (#27446)
This commits introduces the `sidekiq_report_long_running_jobs_minutes`
global setting which allows a site administrator to log a warning in the
Rails log when a Sidekiq job has been running for too long.

The warning is logged with the backtrace of the thread that is
processing the Sidekiq job to make it easier to figure out what a
sidekiq job is stuck on.
2024-06-13 14:24:44 +08:00

82 lines
2.3 KiB
Ruby

# frozen_string_literal: true
class SidekiqLongRunningJobLogger
attr_reader :thread
def initialize(stuck_sidekiq_job_minutes:)
@mutex = Mutex.new
@stop_requested = false
# Assume that setting the value of `stuck_sidekiq_job_minutes` lower than 0 is a mistake and set it to 1. This makes
# the code in this class easier to reason about.
@stuck_sidekiq_job_minutes = stuck_sidekiq_job_minutes <= 0 ? 1 : stuck_sidekiq_job_minutes.ceil
end
def start
@thread ||=
begin
hostname = Discourse.os_hostname
seconds_to_sleep_between_checks = (@stuck_sidekiq_job_minutes * 60) / 2
Thread.new do
loop do
break if self.stop_requested?
begin
current_long_running_jobs = Set.new
Sidekiq::Workers.new.each do |process_id, thread_id, work|
next unless process_id.start_with?(hostname)
if Time.at(work["run_at"]).to_i >=
(Time.now - (60 * @stuck_sidekiq_job_minutes)).to_i
next
end
jid = work.dig("payload", "jid")
current_long_running_jobs << jid
next if @seen_long_running_jobs&.include?(jid)
if thread = Thread.list.find { |t| t["sidekiq_tid"] == thread_id }
Rails.logger.warn(<<~MSG)
Sidekiq job `#{work.dig("payload", "class")}` has been running for more than #{@stuck_sidekiq_job_minutes} minutes
#{thread.backtrace.join("\n")}
MSG
end
end
@seen_long_running_jobs = current_long_running_jobs
yield if block_given?
rescue => error
Discourse.warn_exception(
error,
message: "Unexpected error in SidekiqLongRunningJobChecker thread",
)
end
sleep seconds_to_sleep_between_checks
end
end
end
end
# Used for testing to stop the thread. In production, the thread is expected to live for the lifetime of the process.
def stop
@mutex.synchronize { @stop_requested = true }
if @thread
@thread.wakeup
@thread.join
@thread = nil
end
end
private
def stop_requested?
@mutex.synchronize { @stop_requested }
end
end