From 4c0af24173e25ece0b2d60d656588895e9e9836c Mon Sep 17 00:00:00 2001 From: Alan Guo Xiang Tan Date: Thu, 8 Aug 2024 12:20:16 +0800 Subject: [PATCH] DEV: Log a warning message when a MiniScheduler scheduled job is stuck (#28258) This commit adds a `MiniSchedulerLongRunningJobLogger` class which will poll every 60 seconds for mini_scheduler jobs which are stuck. When it detects that a job is stuck, it will log a warning message with the current backtrace of the thread that is executing the job. Note that for scheduled jobs which are executed at a frequency of less than 30 minutes, we will log when the job has been executing for 30 minutes. For scheduled jobs executed at a frequency of less than 2 hours, we will log when the job has been executing for a duration greater than its specified frequency. For scheduled jobs executed at a frequency greater than 2 hours, we will log as long as the job has been executing for more than 2 hours. --- Gemfile.lock | 2 +- config/initializers/100-sidekiq.rb | 6 + lib/mini_scheduler_long_running_job_logger.rb | 108 ++++++++++++++++ ..._scheduler_long_running_job_logger_spec.rb | 116 ++++++++++++++++++ 4 files changed, 231 insertions(+), 1 deletion(-) create mode 100644 lib/mini_scheduler_long_running_job_logger.rb create mode 100644 spec/lib/mini_scheduler_long_running_job_logger_spec.rb diff --git a/Gemfile.lock b/Gemfile.lock index 5652f04c13b..520a5a681d9 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -242,7 +242,7 @@ GEM mini_mime (1.1.5) mini_racer (0.9.0) libv8-node (~> 18.19.0.0) - mini_scheduler (0.16.0) + mini_scheduler (0.17.0) sidekiq (>= 4.2.3, < 7.0) mini_sql (1.5.0) mini_suffix (0.3.3) diff --git a/config/initializers/100-sidekiq.rb b/config/initializers/100-sidekiq.rb index 6048203e6c0..ef94c6b15c2 100644 --- a/config/initializers/100-sidekiq.rb +++ b/config/initializers/100-sidekiq.rb @@ -3,6 +3,7 @@ require "sidekiq/pausable" require "sidekiq_logster_reporter" require "sidekiq_long_running_job_logger" +require "mini_scheduler_long_running_job_logger" Sidekiq.configure_client { |config| config.redis = Discourse.sidekiq_redis_config } @@ -47,6 +48,11 @@ if Sidekiq.server? if !scheduler_hostname || scheduler_hostname.split(",").include?(Discourse.os_hostname) begin MiniScheduler.start(workers: GlobalSetting.mini_scheduler_workers) + + MiniSchedulerLongRunningJobLogger.new( + poll_interval_seconds: + ENV["DISCOURSE_MINI_SCHEDULER_LONG_RUNNING_JOB_LOGGER_POLL_INTERVAL_SECONDS"], + ).start rescue MiniScheduler::DistributedMutex::Timeout sleep 5 retry diff --git a/lib/mini_scheduler_long_running_job_logger.rb b/lib/mini_scheduler_long_running_job_logger.rb new file mode 100644 index 00000000000..f64d70800f8 --- /dev/null +++ b/lib/mini_scheduler_long_running_job_logger.rb @@ -0,0 +1,108 @@ +# frozen_string_literal: true + +class MiniSchedulerLongRunningJobLogger + DEFAULT_POLL_INTERVAL_SECONDS = 6 + + attr_reader :thread + + def initialize(poll_interval_seconds: nil) + @mutex = Mutex.new + @stop_requested = false + + @poll_interval_seconds = + if poll_interval_seconds + begin + Integer(poll_interval_seconds) + rescue ArgumentError + DEFAULT_POLL_INTERVAL_SECONDS + end + else + DEFAULT_POLL_INTERVAL_SECONDS + end + end + + def start + @thread ||= + Thread.new do + hostname = Discourse.os_hostname + + loop do + break if self.stop_requested? + + current_long_running_jobs = Set.new + + begin + MiniScheduler::Manager.discover_running_scheduled_jobs.each do |job| + job_class = job[:class] + job_started_at = job[:started_at] + mini_scheduler_worker_thread_id = job[:thread_id] + + job_frequency_minutes = + if job_class.daily + 1.day.in_minutes.minutes + else + job_class.every.in_minutes.minutes + end + + warning_duration = + begin + if job_frequency_minutes < 30.minutes + 30.minutes + elsif job_frequency_minutes < 2.hours + job_frequency_minutes + else + 2.hours + end + end + + next if job_started_at >= (Time.zone.now - warning_duration) + + running_thread = + Thread.list.find do |thread| + thread[:mini_scheduler_worker_thread_id] == mini_scheduler_worker_thread_id + end + + next if running_thread.nil? + + current_long_running_jobs << job_class + + next if @seen_long_running_jobs&.include?(job_class) + + Rails.logger.warn(<<~MSG) + Sidekiq scheduled job `#{job_class}` has been running for more than #{warning_duration.in_minutes.to_i} minutes + #{running_thread.backtrace.join("\n")} + MSG + end + + @seen_long_running_jobs = current_long_running_jobs + + yield if block_given? + rescue => error + Discourse.warn_exception( + error, + message: "Unexpected error in MiniSchedulerLongRunningJobLogger thread", + ) + end + + sleep @poll_interval_seconds + end + end + end + + # Used for testing to stop the thread. In production, the thread is expected to live for the lifetime of the process. + def stop + @mutex.synchronize { @stop_requested = true } + + if @thread + @thread.wakeup + @thread.join + @thread = nil + end + end + + private + + def stop_requested? + @mutex.synchronize { @stop_requested } + end +end diff --git a/spec/lib/mini_scheduler_long_running_job_logger_spec.rb b/spec/lib/mini_scheduler_long_running_job_logger_spec.rb new file mode 100644 index 00000000000..73762bf9cb3 --- /dev/null +++ b/spec/lib/mini_scheduler_long_running_job_logger_spec.rb @@ -0,0 +1,116 @@ +# frozen_string_literal: true + +require "mini_scheduler_long_running_job_logger" + +RSpec.describe MiniSchedulerLongRunningJobLogger do + use_redis_snapshotting + + class Every10MinutesJob + extend ::MiniScheduler::Schedule + + every 10.minutes + + def perform + sleep 10_000 + end + end + + class DailyJob + extend ::MiniScheduler::Schedule + + daily at: 4.hours + + def perform + sleep 10_000 + end + end + + def with_running_scheduled_job(job_class) + manager = MiniScheduler::Manager.new(enable_stats: false) + + info = manager.schedule_info(job_class) + info.next_run = Time.now.to_i - 1 + info.write! + manager.tick + + wait_for { manager.schedule_info(job_class).prev_result == "RUNNING" } + + yield + ensure + manager.stop! + end + + before do + @orig_logger = Rails.logger + Rails.logger = @fake_logger = FakeLogger.new + end + + after { Rails.logger = @orig_logger } + + it "logs long running jobs" do + with_running_scheduled_job(Every10MinutesJob) do + freeze_time(31.minutes.from_now) + + begin + checker = described_class.new + + loops = 0 + + checker.start { loops += 1 } + + wait_for { loops == 1 } + + expect(@fake_logger.warnings.size).to eq(1) + + expect(@fake_logger.warnings.first).to match( + "Sidekiq scheduled job `Every10MinutesJob` has been running for more than 30 minutes", + ) + + # Matches the backtrace + expect(@fake_logger.warnings.first).to match("sleep") + + # Check that the logger doesn't log repeated warnings after 2 loops + expect do + checker.thread.wakeup # Force the thread to run the next loop + + wait_for { loops == 2 } + end.not_to change { @fake_logger.warnings.size } + + # Check that the logger doesn't log repeated warnings after 3 loops + expect do + checker.thread.wakeup # Force the thread to run the next loop + + wait_for { loops == 3 } + end.not_to change { @fake_logger.warnings.size } + ensure + checker.stop + expect(checker.thread).to eq(nil) + end + end + end + + it "logs long running jobs with daily schedule" do + with_running_scheduled_job(DailyJob) do + freeze_time(3.hours.from_now) + + begin + checker = described_class.new + + loops = 0 + + checker.start { loops += 1 } + + wait_for { loops == 1 } + + expect(@fake_logger.warnings.size).to eq(1) + + expect(@fake_logger.warnings.first).to match( + "Sidekiq scheduled job `DailyJob` has been running for more than 120 minutes", + ) + ensure + checker.stop + expect(checker.thread).to eq(nil) + end + end + end +end