From 9812407f76e6925d28983084f373e9869f5bdc7f Mon Sep 17 00:00:00 2001 From: Alan Guo Xiang Tan Date: Wed, 18 Dec 2024 12:48:50 +0800 Subject: [PATCH] FIX: Redo Sidekiq monitoring to restart stuck sidekiq processes (#30198) This commit reimplements how we monitor Sidekiq processes that are forked from the Unicorn master process. Prior to this change, we rely on `Jobs::Heartbeat` to enqueue a `Jobs::RunHeartbeat` job every 3 minutes. The `Jobs::RunHeartbeat` job then sets a Redis key with a timestamp. In the Unicorn master process, we then fetch the timestamp that has been set by the job from Redis every 30 minutes. If the timestamp has not been updated for more than 30 minutes, we restart the Sidekiq process. The fundamental flaw with this approach is that it fails to consider deployments with multiple hosts and multiple Sidekiq processes. A sidekiq process on a host may be in a bad state but the heartbeat check will not restart the process because the `Jobs::RunHeartbeat` job is still being executed by the working Sidekiq processes on other hosts. In order to properly ensure that stuck Sidekiq processs are restarted, we now rely on the [Sidekiq::ProcessSet](https://github.com/sidekiq/sidekiq/wiki/API#processes) API that is supported by Sidekiq. The API provides us with "near real-time (updated every 5 sec) info about the current set of Sidekiq processes running". The API provides useful information like the hostname, pid and also when Sidekiq last did its own heartbeat check. With that information, we can easily determine if a Sidekiq process needs to be restarted from the Unicorn master process. --- app/jobs/base.rb | 2 +- app/jobs/regular/run_heartbeat.rb | 9 +-- app/jobs/scheduled/heartbeat.rb | 3 +- config/unicorn.conf.rb | 116 ++++++++---------------------- lib/demon/base.rb | 28 ++++++-- lib/demon/sidekiq.rb | 61 ++++++++++++++++ lib/sidekiq/pausable.rb | 2 +- spec/jobs/heartbeat_spec.rb | 16 ----- spec/jobs/jobs_spec.rb | 17 +++-- spec/lib/demon/sidekiq_spec.rb | 67 +++++++++++++++++ spec/multisite/pausable_spec.rb | 20 ------ 11 files changed, 198 insertions(+), 143 deletions(-) delete mode 100644 spec/jobs/heartbeat_spec.rb create mode 100644 spec/lib/demon/sidekiq_spec.rb delete mode 100644 spec/multisite/pausable_spec.rb diff --git a/app/jobs/base.rb b/app/jobs/base.rb index 102d1e8e081..29189bc98dc 100644 --- a/app/jobs/base.rb +++ b/app/jobs/base.rb @@ -368,7 +368,7 @@ module Jobs extend MiniScheduler::Schedule def perform(*args) - super if (::Jobs::Heartbeat === self) || !Discourse.readonly_mode? + super if !Discourse.readonly_mode? end end diff --git a/app/jobs/regular/run_heartbeat.rb b/app/jobs/regular/run_heartbeat.rb index 549c57ac965..888c8773220 100644 --- a/app/jobs/regular/run_heartbeat.rb +++ b/app/jobs/regular/run_heartbeat.rb @@ -1,19 +1,14 @@ # frozen_string_literal: true +# This job is deprecated and will be removed in the future. The only reason it exists is for clean up purposes. module Jobs class RunHeartbeat < ::Jobs::Base - sidekiq_options queue: "critical" - def self.heartbeat_key "heartbeat_last_run" end def execute(args) - Discourse.redis.set(self.class.heartbeat_key, Time.now.to_i.to_s) - end - - def self.last_heartbeat - Discourse.redis.get(heartbeat_key).to_i + Discourse.redis.del(self.class.heartbeat_key) end end end diff --git a/app/jobs/scheduled/heartbeat.rb b/app/jobs/scheduled/heartbeat.rb index 04b807ea8e2..c22dbffe889 100644 --- a/app/jobs/scheduled/heartbeat.rb +++ b/app/jobs/scheduled/heartbeat.rb @@ -1,9 +1,10 @@ # frozen_string_literal: true +# This job is deprecated and will be removed in the future. The only reason it exists is for clean up purposes. module Jobs # used to ensure at least 1 sidekiq is running correctly class Heartbeat < ::Jobs::Scheduled - every 3.minute + every 24.hours def execute(args) ::Jobs.enqueue(:run_heartbeat, {}) diff --git a/config/unicorn.conf.rb b/config/unicorn.conf.rb index 62f06167e84..5260334dca7 100644 --- a/config/unicorn.conf.rb +++ b/config/unicorn.conf.rb @@ -69,6 +69,7 @@ before_fork do |server, worker| initialized = true supervisor = ENV["UNICORN_SUPERVISOR_PID"].to_i + if supervisor > 0 Thread.new do while true @@ -82,6 +83,7 @@ before_fork do |server, worker| end sidekiqs = ENV["UNICORN_SIDEKIQS"].to_i + if sidekiqs > 0 server.logger.info "starting #{sidekiqs} supervised sidekiqs" @@ -107,7 +109,9 @@ before_fork do |server, worker| end end - if ENV["DISCOURSE_ENABLE_EMAIL_SYNC_DEMON"] == "true" + enable_email_sync_demon = ENV["DISCOURSE_ENABLE_EMAIL_SYNC_DEMON"] == "true" + + if enable_email_sync_demon server.logger.info "starting up EmailSync demon" Demon::EmailSync.start(1, logger: server.logger) end @@ -117,9 +121,32 @@ before_fork do |server, worker| demon_class.start(1, logger: server.logger) end - class ::Unicorn::HttpServer - alias master_sleep_orig master_sleep + Thread.new do + while true + begin + sleep 60 + if sidekiqs > 0 + Demon::Sidekiq.ensure_running + Demon::Sidekiq.heartbeat_check + Demon::Sidekiq.rss_memory_check + end + + if enable_email_sync_demon + Demon::EmailSync.ensure_running + check_email_sync_heartbeat + end + + DiscoursePluginRegistry.demon_processes.each { |demon_class| demon_class.ensure_running } + rescue => e + Rails.logger.warn( + "Error in demon processes heartbeat check: #{e}\n#{e.backtrace.join("\n")}", + ) + end + end + end + + class ::Unicorn::HttpServer # Original source: https://github.com/defunkt/unicorn/blob/6c9c442fb6aa12fd871237bc2bb5aec56c5b3538/lib/unicorn/http_server.rb#L477-L496 def murder_lazy_workers next_sleep = @timeout - 1 @@ -154,72 +181,6 @@ before_fork do |server, worker| next_sleep <= 0 ? 1 : next_sleep end - def max_sidekiq_rss - rss = - `ps -eo rss,args | grep sidekiq | grep -v grep | awk '{print $1}'`.split("\n") - .map(&:to_i) - .max - - rss ||= 0 - - rss * 1024 - end - - def max_allowed_sidekiq_rss - [ENV["UNICORN_SIDEKIQ_MAX_RSS"].to_i, 500].max.megabytes - end - - def force_kill_rogue_sidekiq - info = `ps -eo pid,rss,args | grep sidekiq | grep -v grep | awk '{print $1,$2}'` - info - .split("\n") - .each do |row| - pid, mem = row.split(" ").map(&:to_i) - if pid > 0 && (mem * 1024) > max_allowed_sidekiq_rss - Rails.logger.warn "Detected rogue Sidekiq pid #{pid} mem #{mem * 1024}, killing" - begin - Process.kill("KILL", pid) - rescue StandardError - nil - end - end - end - end - - def check_sidekiq_heartbeat - @sidekiq_heartbeat_interval ||= 30.minutes - @sidekiq_next_heartbeat_check ||= Time.now.to_i + @sidekiq_heartbeat_interval - - if @sidekiq_next_heartbeat_check < Time.now.to_i - last_heartbeat = Jobs::RunHeartbeat.last_heartbeat - restart = false - - sidekiq_rss = max_sidekiq_rss - if sidekiq_rss > max_allowed_sidekiq_rss - Rails.logger.warn( - "Sidekiq is consuming too much memory (using: %0.2fM) for '%s', restarting" % - [(sidekiq_rss.to_f / 1.megabyte), ENV["DISCOURSE_HOSTNAME"]], - ) - - restart = true - end - - if last_heartbeat < Time.now.to_i - @sidekiq_heartbeat_interval - Rails.logger.warn "Sidekiq heartbeat test failed, restarting" - - restart = true - end - @sidekiq_next_heartbeat_check = Time.now.to_i + @sidekiq_heartbeat_interval - - if restart - Demon::Sidekiq.restart - sleep 10 - force_kill_rogue_sidekiq - end - Discourse.redis.close - end - end - def max_email_sync_rss return 0 if Demon::EmailSync.demons.empty? @@ -273,23 +234,6 @@ before_fork do |server, worker| Demon::EmailSync.restart if restart end - - def master_sleep(sec) - sidekiqs = ENV["UNICORN_SIDEKIQS"].to_i - if sidekiqs > 0 - Demon::Sidekiq.ensure_running - check_sidekiq_heartbeat - end - - if ENV["DISCOURSE_ENABLE_EMAIL_SYNC_DEMON"] == "true" - Demon::EmailSync.ensure_running - check_email_sync_heartbeat - end - - DiscoursePluginRegistry.demon_processes.each { |demon_class| demon_class.ensure_running } - - master_sleep_orig(sec) - end end end diff --git a/lib/demon/base.rb b/lib/demon/base.rb index 61b7682e6e5..4ebc5ade908 100644 --- a/lib/demon/base.rb +++ b/lib/demon/base.rb @@ -9,6 +9,20 @@ class Demon::Base @demons end + if Rails.env.test? + def self.set_demons(demons) + @demons = demons + end + + def self.reset_demons + @demons = {} + end + + def set_pid(pid) + @pid = pid + end + end + def self.start(count = 1, verbose: false, logger: nil) @demons ||= {} count.times { |i| (@demons["#{prefix}_#{i}"] ||= new(i, verbose:, logger:)).start } @@ -21,10 +35,7 @@ class Demon::Base def self.restart return unless @demons - @demons.values.each do |demon| - demon.stop - demon.start - end + @demons.values.each { |demon| demon.restart } end def self.ensure_running @@ -75,6 +86,11 @@ class Demon::Base "HUP" end + def restart + stop + start + end + def stop @started = false @@ -105,7 +121,9 @@ class Demon::Base wait_for_stop.call if alive? - log("Process would not terminate cleanly, force quitting. pid: #{@pid} #{self.class}") + log( + "Process would not terminate cleanly, force quitting. pid: #{@pid} #{self.class}\n#{caller.join("\n")}", + ) Process.kill("KILL", @pid) end diff --git a/lib/demon/sidekiq.rb b/lib/demon/sidekiq.rb index 2c90676cc72..df689e5db05 100644 --- a/lib/demon/sidekiq.rb +++ b/lib/demon/sidekiq.rb @@ -3,6 +3,8 @@ require "demon/base" class Demon::Sidekiq < ::Demon::Base + HOSTNAME = Socket.gethostname + def self.prefix "sidekiq" end @@ -11,6 +13,65 @@ class Demon::Sidekiq < ::Demon::Base blk ? (@blk = blk) : @blk end + # By default Sidekiq does a heartbeat check every 5 seconds. If the processes misses 20 heartbeat checks, we consider it + # dead and kill the process. + SIDEKIQ_HEARTBEAT_CHECK_MISS_THRESHOLD_SECONDS = 5.seconds * 20 + + def self.heartbeat_check + sidekiq_processes_for_current_hostname = {} + + Sidekiq::ProcessSet.new.each do |process| + if process["hostname"] == HOSTNAME + sidekiq_processes_for_current_hostname[process["pid"]] = process + end + end + + Demon::Sidekiq.demons.values.each do |daemon| + next if !daemon.already_running? + + running_sidekiq_process = sidekiq_processes_for_current_hostname[daemon.pid] + + if !running_sidekiq_process || + (Time.now.to_i - running_sidekiq_process["beat"]) > + SIDEKIQ_HEARTBEAT_CHECK_MISS_THRESHOLD_SECONDS + Rails.logger.warn("Sidekiq heartbeat test failed for #{daemon.pid}, restarting") + daemon.restart + end + end + end + + SIDEKIQ_RSS_MEMORY_CHECK_INTERVAL_SECONDS = 30.minutes + + def self.rss_memory_check + if defined?(@@last_sidekiq_rss_memory_check) && @@last_sidekiq_rss_memory_check && + @@last_sidekiq_rss_memory_check > Time.now.to_i - SIDEKIQ_RSS_MEMORY_CHECK_INTERVAL_SECONDS + return @@last_sidekiq_rss_memory_check + end + + Demon::Sidekiq.demons.values.each do |daemon| + next if !daemon.already_running? + + daemon_rss_bytes = (`ps -o rss= -p #{daemon.pid}`.chomp.to_i || 0) * 1024 + + if daemon_rss_bytes > max_allowed_sidekiq_rss_bytes + Rails.logger.warn( + "Sidekiq is consuming too much memory (using: %0.2fM) for '%s', restarting" % + [(daemon_rss_bytes.to_f / 1.megabyte), HOSTNAME], + ) + + daemon.restart + end + end + + @@last_sidekiq_rss_memory_check = Time.now.to_i + end + + DEFAULT_MAX_ALLOWED_SIDEKIQ_RSS_MEGABYTES = 500 + + def self.max_allowed_sidekiq_rss_bytes + [ENV["UNICORN_SIDEKIQ_MAX_RSS"].to_i, DEFAULT_MAX_ALLOWED_SIDEKIQ_RSS_MEGABYTES].max.megabytes + end + private def suppress_stdout diff --git a/lib/sidekiq/pausable.rb b/lib/sidekiq/pausable.rb index eb06d5dcdaf..6f1a2e901f6 100644 --- a/lib/sidekiq/pausable.rb +++ b/lib/sidekiq/pausable.rb @@ -125,7 +125,7 @@ class Sidekiq::Pausable end def call(worker, msg, queue) - if sidekiq_paused?(msg) && !(Jobs::RunHeartbeat === worker) + if sidekiq_paused?(msg) worker.class.perform_in(@delay, *msg["args"]) else start = Process.clock_gettime(Process::CLOCK_MONOTONIC) diff --git a/spec/jobs/heartbeat_spec.rb b/spec/jobs/heartbeat_spec.rb deleted file mode 100644 index 983b8832c54..00000000000 --- a/spec/jobs/heartbeat_spec.rb +++ /dev/null @@ -1,16 +0,0 @@ -# frozen_string_literal: true - -RSpec.describe ::Jobs::Heartbeat do - after { Discourse.disable_readonly_mode } - - it "still enqueues heartbeats in readonly mode" do - freeze_time 1.week.from_now - - Discourse.enable_readonly_mode - - Sidekiq::Testing.inline! do - ::Jobs::Heartbeat.new.perform(nil) - expect(::Jobs::RunHeartbeat.last_heartbeat).to eq(Time.now.to_i) - end - end -end diff --git a/spec/jobs/jobs_spec.rb b/spec/jobs/jobs_spec.rb index c26af358cfe..cb20f3b7664 100644 --- a/spec/jobs/jobs_spec.rb +++ b/spec/jobs/jobs_spec.rb @@ -126,6 +126,11 @@ RSpec.describe Jobs do end describe "cancel_scheduled_job" do + class Jobs::SomeJob < ::Jobs::Base + def execute(args) + end + end + let(:scheduled_jobs) { Sidekiq::ScheduledSet.new } after { scheduled_jobs.clear } @@ -135,18 +140,18 @@ RSpec.describe Jobs do scheduled_jobs.clear expect(scheduled_jobs.size).to eq(0) - Jobs.enqueue_in(1.year, :run_heartbeat, topic_id: 123) - Jobs.enqueue_in(2.years, :run_heartbeat, topic_id: 456) - Jobs.enqueue_in(3.years, :run_heartbeat, topic_id: 123, current_site_id: "foo") - Jobs.enqueue_in(4.years, :run_heartbeat, topic_id: 123, current_site_id: "bar") + Jobs.enqueue_in(1.year, :some_job, topic_id: 123) + Jobs.enqueue_in(2.years, :some_job, topic_id: 456) + Jobs.enqueue_in(3.years, :some_job, topic_id: 123, current_site_id: "foo") + Jobs.enqueue_in(4.years, :some_job, topic_id: 123, current_site_id: "bar") expect(scheduled_jobs.size).to eq(4) - Jobs.cancel_scheduled_job(:run_heartbeat, topic_id: 123) + Jobs.cancel_scheduled_job(:some_job, topic_id: 123) expect(scheduled_jobs.size).to eq(3) - Jobs.cancel_scheduled_job(:run_heartbeat, topic_id: 123, all_sites: true) + Jobs.cancel_scheduled_job(:some_job, topic_id: 123, all_sites: true) expect(scheduled_jobs.size).to eq(1) end diff --git a/spec/lib/demon/sidekiq_spec.rb b/spec/lib/demon/sidekiq_spec.rb new file mode 100644 index 00000000000..b7ca5c930e7 --- /dev/null +++ b/spec/lib/demon/sidekiq_spec.rb @@ -0,0 +1,67 @@ +# frozen_string_literal: true + +RSpec.describe Demon::Sidekiq do + describe ".heartbeat_check" do + it "should restart sidekiq daemons when daemon cannot be match to an entry in Sidekiq::ProcessSet or when heartbeat check has been missed" do + running_sidekiq_daemon = described_class.new(1) + running_sidekiq_daemon.set_pid(1) + missing_sidekiq_daemon = described_class.new(2) + missing_sidekiq_daemon.set_pid(2) + missed_heartbeat_sidekiq_daemon = described_class.new(3) + missed_heartbeat_sidekiq_daemon.set_pid(3) + + Sidekiq::ProcessSet.expects(:new).returns( + [ + { "hostname" => described_class::HOSTNAME, "pid" => 1, "beat" => Time.now.to_i }, + { + "hostname" => described_class::HOSTNAME, + "pid" => 3, + "beat" => + Time.now.to_i - described_class::SIDEKIQ_HEARTBEAT_CHECK_MISS_THRESHOLD_SECONDS - 1, + }, + ], + ) + + described_class.set_demons( + { + "running_sidekiq_daemon" => running_sidekiq_daemon, + "missing_sidekiq_daemon" => missing_sidekiq_daemon, + "missed_heartbeat_sidekiq_daemon" => missed_heartbeat_sidekiq_daemon, + }, + ) + + running_sidekiq_daemon.expects(:already_running?).returns(true) + missing_sidekiq_daemon.expects(:already_running?).returns(true) + missed_heartbeat_sidekiq_daemon.expects(:already_running?).returns(true) + + running_sidekiq_daemon.expects(:restart).never + missing_sidekiq_daemon.expects(:restart) + missed_heartbeat_sidekiq_daemon.expects(:restart) + + described_class.heartbeat_check + ensure + described_class.reset_demons + end + end + + describe ".rss_memory_check" do + it "should restart sidekiq daemons when daemon's RSS memory exceeds the maximum allowed RSS memory" do + stub_const(described_class, "SIDEKIQ_RSS_MEMORY_CHECK_INTERVAL_SECONDS", 0) do + # Set to a negative value to fake that the process has exceeded the maximum allowed RSS memory + stub_const(described_class, "DEFAULT_MAX_ALLOWED_SIDEKIQ_RSS_MEGABYTES", -1) do + sidekiq_daemon = described_class.new(1) + sidekiq_daemon.set_pid(1) + + described_class.set_demons({ "sidekiq_daemon" => sidekiq_daemon }) + + sidekiq_daemon.expects(:already_running?).returns(true) + sidekiq_daemon.expects(:restart) + + described_class.rss_memory_check + end + end + ensure + described_class.reset_demons + end + end +end diff --git a/spec/multisite/pausable_spec.rb b/spec/multisite/pausable_spec.rb deleted file mode 100644 index 1b6a5778bc9..00000000000 --- a/spec/multisite/pausable_spec.rb +++ /dev/null @@ -1,20 +0,0 @@ -# frozen_string_literal: true - -RSpec.describe Sidekiq::Pausable do - after { Sidekiq.unpause_all! } - - it "can still run heartbeats when paused" do - Sidekiq.pause! - - freeze_time 1.week.from_now - - jobs = Sidekiq::ScheduledSet.new - jobs.clear - middleware = Sidekiq::Pausable.new - - middleware.call(Jobs::RunHeartbeat.new, { "args" => [{}] }, "critical") { "done" } - - jobs = Sidekiq::ScheduledSet.new - expect(jobs.size).to eq(0) - end -end