mirror of
https://github.com/discourse/discourse.git
synced 2024-12-13 21:58:43 +08:00
4c0af24173
This commit adds a `MiniSchedulerLongRunningJobLogger` class which will poll every 60 seconds for mini_scheduler jobs which are stuck. When it detects that a job is stuck, it will log a warning message with the current backtrace of the thread that is executing the job. Note that for scheduled jobs which are executed at a frequency of less than 30 minutes, we will log when the job has been executing for 30 minutes. For scheduled jobs executed at a frequency of less than 2 hours, we will log when the job has been executing for a duration greater than its specified frequency. For scheduled jobs executed at a frequency greater than 2 hours, we will log as long as the job has been executing for more than 2 hours.
109 lines
2.8 KiB
Ruby
109 lines
2.8 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
class MiniSchedulerLongRunningJobLogger
|
|
DEFAULT_POLL_INTERVAL_SECONDS = 6
|
|
|
|
attr_reader :thread
|
|
|
|
def initialize(poll_interval_seconds: nil)
|
|
@mutex = Mutex.new
|
|
@stop_requested = false
|
|
|
|
@poll_interval_seconds =
|
|
if poll_interval_seconds
|
|
begin
|
|
Integer(poll_interval_seconds)
|
|
rescue ArgumentError
|
|
DEFAULT_POLL_INTERVAL_SECONDS
|
|
end
|
|
else
|
|
DEFAULT_POLL_INTERVAL_SECONDS
|
|
end
|
|
end
|
|
|
|
def start
|
|
@thread ||=
|
|
Thread.new do
|
|
hostname = Discourse.os_hostname
|
|
|
|
loop do
|
|
break if self.stop_requested?
|
|
|
|
current_long_running_jobs = Set.new
|
|
|
|
begin
|
|
MiniScheduler::Manager.discover_running_scheduled_jobs.each do |job|
|
|
job_class = job[:class]
|
|
job_started_at = job[:started_at]
|
|
mini_scheduler_worker_thread_id = job[:thread_id]
|
|
|
|
job_frequency_minutes =
|
|
if job_class.daily
|
|
1.day.in_minutes.minutes
|
|
else
|
|
job_class.every.in_minutes.minutes
|
|
end
|
|
|
|
warning_duration =
|
|
begin
|
|
if job_frequency_minutes < 30.minutes
|
|
30.minutes
|
|
elsif job_frequency_minutes < 2.hours
|
|
job_frequency_minutes
|
|
else
|
|
2.hours
|
|
end
|
|
end
|
|
|
|
next if job_started_at >= (Time.zone.now - warning_duration)
|
|
|
|
running_thread =
|
|
Thread.list.find do |thread|
|
|
thread[:mini_scheduler_worker_thread_id] == mini_scheduler_worker_thread_id
|
|
end
|
|
|
|
next if running_thread.nil?
|
|
|
|
current_long_running_jobs << job_class
|
|
|
|
next if @seen_long_running_jobs&.include?(job_class)
|
|
|
|
Rails.logger.warn(<<~MSG)
|
|
Sidekiq scheduled job `#{job_class}` has been running for more than #{warning_duration.in_minutes.to_i} minutes
|
|
#{running_thread.backtrace.join("\n")}
|
|
MSG
|
|
end
|
|
|
|
@seen_long_running_jobs = current_long_running_jobs
|
|
|
|
yield if block_given?
|
|
rescue => error
|
|
Discourse.warn_exception(
|
|
error,
|
|
message: "Unexpected error in MiniSchedulerLongRunningJobLogger thread",
|
|
)
|
|
end
|
|
|
|
sleep @poll_interval_seconds
|
|
end
|
|
end
|
|
end
|
|
|
|
# Used for testing to stop the thread. In production, the thread is expected to live for the lifetime of the process.
|
|
def stop
|
|
@mutex.synchronize { @stop_requested = true }
|
|
|
|
if @thread
|
|
@thread.wakeup
|
|
@thread.join
|
|
@thread = nil
|
|
end
|
|
end
|
|
|
|
private
|
|
|
|
def stop_requested?
|
|
@mutex.synchronize { @stop_requested }
|
|
end
|
|
end
|