diff --git a/app/jobs/base.rb b/app/jobs/base.rb index 102d1e8e081..29189bc98dc 100644 --- a/app/jobs/base.rb +++ b/app/jobs/base.rb @@ -368,7 +368,7 @@ module Jobs extend MiniScheduler::Schedule def perform(*args) - super if (::Jobs::Heartbeat === self) || !Discourse.readonly_mode? + super if !Discourse.readonly_mode? end end diff --git a/app/jobs/regular/run_heartbeat.rb b/app/jobs/regular/run_heartbeat.rb index 549c57ac965..888c8773220 100644 --- a/app/jobs/regular/run_heartbeat.rb +++ b/app/jobs/regular/run_heartbeat.rb @@ -1,19 +1,14 @@ # frozen_string_literal: true +# This job is deprecated and will be removed in the future. The only reason it exists is for clean up purposes. module Jobs class RunHeartbeat < ::Jobs::Base - sidekiq_options queue: "critical" - def self.heartbeat_key "heartbeat_last_run" end def execute(args) - Discourse.redis.set(self.class.heartbeat_key, Time.now.to_i.to_s) - end - - def self.last_heartbeat - Discourse.redis.get(heartbeat_key).to_i + Discourse.redis.del(self.class.heartbeat_key) end end end diff --git a/app/jobs/scheduled/heartbeat.rb b/app/jobs/scheduled/heartbeat.rb index 04b807ea8e2..c22dbffe889 100644 --- a/app/jobs/scheduled/heartbeat.rb +++ b/app/jobs/scheduled/heartbeat.rb @@ -1,9 +1,10 @@ # frozen_string_literal: true +# This job is deprecated and will be removed in the future. The only reason it exists is for clean up purposes. module Jobs # used to ensure at least 1 sidekiq is running correctly class Heartbeat < ::Jobs::Scheduled - every 3.minute + every 24.hours def execute(args) ::Jobs.enqueue(:run_heartbeat, {}) diff --git a/config/unicorn.conf.rb b/config/unicorn.conf.rb index 62f06167e84..5260334dca7 100644 --- a/config/unicorn.conf.rb +++ b/config/unicorn.conf.rb @@ -69,6 +69,7 @@ before_fork do |server, worker| initialized = true supervisor = ENV["UNICORN_SUPERVISOR_PID"].to_i + if supervisor > 0 Thread.new do while true @@ -82,6 +83,7 @@ before_fork do |server, worker| end sidekiqs = ENV["UNICORN_SIDEKIQS"].to_i + if sidekiqs > 0 server.logger.info "starting #{sidekiqs} supervised sidekiqs" @@ -107,7 +109,9 @@ before_fork do |server, worker| end end - if ENV["DISCOURSE_ENABLE_EMAIL_SYNC_DEMON"] == "true" + enable_email_sync_demon = ENV["DISCOURSE_ENABLE_EMAIL_SYNC_DEMON"] == "true" + + if enable_email_sync_demon server.logger.info "starting up EmailSync demon" Demon::EmailSync.start(1, logger: server.logger) end @@ -117,9 +121,32 @@ before_fork do |server, worker| demon_class.start(1, logger: server.logger) end - class ::Unicorn::HttpServer - alias master_sleep_orig master_sleep + Thread.new do + while true + begin + sleep 60 + if sidekiqs > 0 + Demon::Sidekiq.ensure_running + Demon::Sidekiq.heartbeat_check + Demon::Sidekiq.rss_memory_check + end + + if enable_email_sync_demon + Demon::EmailSync.ensure_running + check_email_sync_heartbeat + end + + DiscoursePluginRegistry.demon_processes.each { |demon_class| demon_class.ensure_running } + rescue => e + Rails.logger.warn( + "Error in demon processes heartbeat check: #{e}\n#{e.backtrace.join("\n")}", + ) + end + end + end + + class ::Unicorn::HttpServer # Original source: https://github.com/defunkt/unicorn/blob/6c9c442fb6aa12fd871237bc2bb5aec56c5b3538/lib/unicorn/http_server.rb#L477-L496 def murder_lazy_workers next_sleep = @timeout - 1 @@ -154,72 +181,6 @@ before_fork do |server, worker| next_sleep <= 0 ? 1 : next_sleep end - def max_sidekiq_rss - rss = - `ps -eo rss,args | grep sidekiq | grep -v grep | awk '{print $1}'`.split("\n") - .map(&:to_i) - .max - - rss ||= 0 - - rss * 1024 - end - - def max_allowed_sidekiq_rss - [ENV["UNICORN_SIDEKIQ_MAX_RSS"].to_i, 500].max.megabytes - end - - def force_kill_rogue_sidekiq - info = `ps -eo pid,rss,args | grep sidekiq | grep -v grep | awk '{print $1,$2}'` - info - .split("\n") - .each do |row| - pid, mem = row.split(" ").map(&:to_i) - if pid > 0 && (mem * 1024) > max_allowed_sidekiq_rss - Rails.logger.warn "Detected rogue Sidekiq pid #{pid} mem #{mem * 1024}, killing" - begin - Process.kill("KILL", pid) - rescue StandardError - nil - end - end - end - end - - def check_sidekiq_heartbeat - @sidekiq_heartbeat_interval ||= 30.minutes - @sidekiq_next_heartbeat_check ||= Time.now.to_i + @sidekiq_heartbeat_interval - - if @sidekiq_next_heartbeat_check < Time.now.to_i - last_heartbeat = Jobs::RunHeartbeat.last_heartbeat - restart = false - - sidekiq_rss = max_sidekiq_rss - if sidekiq_rss > max_allowed_sidekiq_rss - Rails.logger.warn( - "Sidekiq is consuming too much memory (using: %0.2fM) for '%s', restarting" % - [(sidekiq_rss.to_f / 1.megabyte), ENV["DISCOURSE_HOSTNAME"]], - ) - - restart = true - end - - if last_heartbeat < Time.now.to_i - @sidekiq_heartbeat_interval - Rails.logger.warn "Sidekiq heartbeat test failed, restarting" - - restart = true - end - @sidekiq_next_heartbeat_check = Time.now.to_i + @sidekiq_heartbeat_interval - - if restart - Demon::Sidekiq.restart - sleep 10 - force_kill_rogue_sidekiq - end - Discourse.redis.close - end - end - def max_email_sync_rss return 0 if Demon::EmailSync.demons.empty? @@ -273,23 +234,6 @@ before_fork do |server, worker| Demon::EmailSync.restart if restart end - - def master_sleep(sec) - sidekiqs = ENV["UNICORN_SIDEKIQS"].to_i - if sidekiqs > 0 - Demon::Sidekiq.ensure_running - check_sidekiq_heartbeat - end - - if ENV["DISCOURSE_ENABLE_EMAIL_SYNC_DEMON"] == "true" - Demon::EmailSync.ensure_running - check_email_sync_heartbeat - end - - DiscoursePluginRegistry.demon_processes.each { |demon_class| demon_class.ensure_running } - - master_sleep_orig(sec) - end end end diff --git a/lib/demon/base.rb b/lib/demon/base.rb index 61b7682e6e5..4ebc5ade908 100644 --- a/lib/demon/base.rb +++ b/lib/demon/base.rb @@ -9,6 +9,20 @@ class Demon::Base @demons end + if Rails.env.test? + def self.set_demons(demons) + @demons = demons + end + + def self.reset_demons + @demons = {} + end + + def set_pid(pid) + @pid = pid + end + end + def self.start(count = 1, verbose: false, logger: nil) @demons ||= {} count.times { |i| (@demons["#{prefix}_#{i}"] ||= new(i, verbose:, logger:)).start } @@ -21,10 +35,7 @@ class Demon::Base def self.restart return unless @demons - @demons.values.each do |demon| - demon.stop - demon.start - end + @demons.values.each { |demon| demon.restart } end def self.ensure_running @@ -75,6 +86,11 @@ class Demon::Base "HUP" end + def restart + stop + start + end + def stop @started = false @@ -105,7 +121,9 @@ class Demon::Base wait_for_stop.call if alive? - log("Process would not terminate cleanly, force quitting. pid: #{@pid} #{self.class}") + log( + "Process would not terminate cleanly, force quitting. pid: #{@pid} #{self.class}\n#{caller.join("\n")}", + ) Process.kill("KILL", @pid) end diff --git a/lib/demon/sidekiq.rb b/lib/demon/sidekiq.rb index 2c90676cc72..df689e5db05 100644 --- a/lib/demon/sidekiq.rb +++ b/lib/demon/sidekiq.rb @@ -3,6 +3,8 @@ require "demon/base" class Demon::Sidekiq < ::Demon::Base + HOSTNAME = Socket.gethostname + def self.prefix "sidekiq" end @@ -11,6 +13,65 @@ class Demon::Sidekiq < ::Demon::Base blk ? (@blk = blk) : @blk end + # By default Sidekiq does a heartbeat check every 5 seconds. If the processes misses 20 heartbeat checks, we consider it + # dead and kill the process. + SIDEKIQ_HEARTBEAT_CHECK_MISS_THRESHOLD_SECONDS = 5.seconds * 20 + + def self.heartbeat_check + sidekiq_processes_for_current_hostname = {} + + Sidekiq::ProcessSet.new.each do |process| + if process["hostname"] == HOSTNAME + sidekiq_processes_for_current_hostname[process["pid"]] = process + end + end + + Demon::Sidekiq.demons.values.each do |daemon| + next if !daemon.already_running? + + running_sidekiq_process = sidekiq_processes_for_current_hostname[daemon.pid] + + if !running_sidekiq_process || + (Time.now.to_i - running_sidekiq_process["beat"]) > + SIDEKIQ_HEARTBEAT_CHECK_MISS_THRESHOLD_SECONDS + Rails.logger.warn("Sidekiq heartbeat test failed for #{daemon.pid}, restarting") + daemon.restart + end + end + end + + SIDEKIQ_RSS_MEMORY_CHECK_INTERVAL_SECONDS = 30.minutes + + def self.rss_memory_check + if defined?(@@last_sidekiq_rss_memory_check) && @@last_sidekiq_rss_memory_check && + @@last_sidekiq_rss_memory_check > Time.now.to_i - SIDEKIQ_RSS_MEMORY_CHECK_INTERVAL_SECONDS + return @@last_sidekiq_rss_memory_check + end + + Demon::Sidekiq.demons.values.each do |daemon| + next if !daemon.already_running? + + daemon_rss_bytes = (`ps -o rss= -p #{daemon.pid}`.chomp.to_i || 0) * 1024 + + if daemon_rss_bytes > max_allowed_sidekiq_rss_bytes + Rails.logger.warn( + "Sidekiq is consuming too much memory (using: %0.2fM) for '%s', restarting" % + [(daemon_rss_bytes.to_f / 1.megabyte), HOSTNAME], + ) + + daemon.restart + end + end + + @@last_sidekiq_rss_memory_check = Time.now.to_i + end + + DEFAULT_MAX_ALLOWED_SIDEKIQ_RSS_MEGABYTES = 500 + + def self.max_allowed_sidekiq_rss_bytes + [ENV["UNICORN_SIDEKIQ_MAX_RSS"].to_i, DEFAULT_MAX_ALLOWED_SIDEKIQ_RSS_MEGABYTES].max.megabytes + end + private def suppress_stdout diff --git a/lib/sidekiq/pausable.rb b/lib/sidekiq/pausable.rb index eb06d5dcdaf..6f1a2e901f6 100644 --- a/lib/sidekiq/pausable.rb +++ b/lib/sidekiq/pausable.rb @@ -125,7 +125,7 @@ class Sidekiq::Pausable end def call(worker, msg, queue) - if sidekiq_paused?(msg) && !(Jobs::RunHeartbeat === worker) + if sidekiq_paused?(msg) worker.class.perform_in(@delay, *msg["args"]) else start = Process.clock_gettime(Process::CLOCK_MONOTONIC) diff --git a/spec/jobs/heartbeat_spec.rb b/spec/jobs/heartbeat_spec.rb deleted file mode 100644 index 983b8832c54..00000000000 --- a/spec/jobs/heartbeat_spec.rb +++ /dev/null @@ -1,16 +0,0 @@ -# frozen_string_literal: true - -RSpec.describe ::Jobs::Heartbeat do - after { Discourse.disable_readonly_mode } - - it "still enqueues heartbeats in readonly mode" do - freeze_time 1.week.from_now - - Discourse.enable_readonly_mode - - Sidekiq::Testing.inline! do - ::Jobs::Heartbeat.new.perform(nil) - expect(::Jobs::RunHeartbeat.last_heartbeat).to eq(Time.now.to_i) - end - end -end diff --git a/spec/jobs/jobs_spec.rb b/spec/jobs/jobs_spec.rb index c26af358cfe..cb20f3b7664 100644 --- a/spec/jobs/jobs_spec.rb +++ b/spec/jobs/jobs_spec.rb @@ -126,6 +126,11 @@ RSpec.describe Jobs do end describe "cancel_scheduled_job" do + class Jobs::SomeJob < ::Jobs::Base + def execute(args) + end + end + let(:scheduled_jobs) { Sidekiq::ScheduledSet.new } after { scheduled_jobs.clear } @@ -135,18 +140,18 @@ RSpec.describe Jobs do scheduled_jobs.clear expect(scheduled_jobs.size).to eq(0) - Jobs.enqueue_in(1.year, :run_heartbeat, topic_id: 123) - Jobs.enqueue_in(2.years, :run_heartbeat, topic_id: 456) - Jobs.enqueue_in(3.years, :run_heartbeat, topic_id: 123, current_site_id: "foo") - Jobs.enqueue_in(4.years, :run_heartbeat, topic_id: 123, current_site_id: "bar") + Jobs.enqueue_in(1.year, :some_job, topic_id: 123) + Jobs.enqueue_in(2.years, :some_job, topic_id: 456) + Jobs.enqueue_in(3.years, :some_job, topic_id: 123, current_site_id: "foo") + Jobs.enqueue_in(4.years, :some_job, topic_id: 123, current_site_id: "bar") expect(scheduled_jobs.size).to eq(4) - Jobs.cancel_scheduled_job(:run_heartbeat, topic_id: 123) + Jobs.cancel_scheduled_job(:some_job, topic_id: 123) expect(scheduled_jobs.size).to eq(3) - Jobs.cancel_scheduled_job(:run_heartbeat, topic_id: 123, all_sites: true) + Jobs.cancel_scheduled_job(:some_job, topic_id: 123, all_sites: true) expect(scheduled_jobs.size).to eq(1) end diff --git a/spec/lib/demon/sidekiq_spec.rb b/spec/lib/demon/sidekiq_spec.rb new file mode 100644 index 00000000000..b7ca5c930e7 --- /dev/null +++ b/spec/lib/demon/sidekiq_spec.rb @@ -0,0 +1,67 @@ +# frozen_string_literal: true + +RSpec.describe Demon::Sidekiq do + describe ".heartbeat_check" do + it "should restart sidekiq daemons when daemon cannot be match to an entry in Sidekiq::ProcessSet or when heartbeat check has been missed" do + running_sidekiq_daemon = described_class.new(1) + running_sidekiq_daemon.set_pid(1) + missing_sidekiq_daemon = described_class.new(2) + missing_sidekiq_daemon.set_pid(2) + missed_heartbeat_sidekiq_daemon = described_class.new(3) + missed_heartbeat_sidekiq_daemon.set_pid(3) + + Sidekiq::ProcessSet.expects(:new).returns( + [ + { "hostname" => described_class::HOSTNAME, "pid" => 1, "beat" => Time.now.to_i }, + { + "hostname" => described_class::HOSTNAME, + "pid" => 3, + "beat" => + Time.now.to_i - described_class::SIDEKIQ_HEARTBEAT_CHECK_MISS_THRESHOLD_SECONDS - 1, + }, + ], + ) + + described_class.set_demons( + { + "running_sidekiq_daemon" => running_sidekiq_daemon, + "missing_sidekiq_daemon" => missing_sidekiq_daemon, + "missed_heartbeat_sidekiq_daemon" => missed_heartbeat_sidekiq_daemon, + }, + ) + + running_sidekiq_daemon.expects(:already_running?).returns(true) + missing_sidekiq_daemon.expects(:already_running?).returns(true) + missed_heartbeat_sidekiq_daemon.expects(:already_running?).returns(true) + + running_sidekiq_daemon.expects(:restart).never + missing_sidekiq_daemon.expects(:restart) + missed_heartbeat_sidekiq_daemon.expects(:restart) + + described_class.heartbeat_check + ensure + described_class.reset_demons + end + end + + describe ".rss_memory_check" do + it "should restart sidekiq daemons when daemon's RSS memory exceeds the maximum allowed RSS memory" do + stub_const(described_class, "SIDEKIQ_RSS_MEMORY_CHECK_INTERVAL_SECONDS", 0) do + # Set to a negative value to fake that the process has exceeded the maximum allowed RSS memory + stub_const(described_class, "DEFAULT_MAX_ALLOWED_SIDEKIQ_RSS_MEGABYTES", -1) do + sidekiq_daemon = described_class.new(1) + sidekiq_daemon.set_pid(1) + + described_class.set_demons({ "sidekiq_daemon" => sidekiq_daemon }) + + sidekiq_daemon.expects(:already_running?).returns(true) + sidekiq_daemon.expects(:restart) + + described_class.rss_memory_check + end + end + ensure + described_class.reset_demons + end + end +end diff --git a/spec/multisite/pausable_spec.rb b/spec/multisite/pausable_spec.rb deleted file mode 100644 index 1b6a5778bc9..00000000000 --- a/spec/multisite/pausable_spec.rb +++ /dev/null @@ -1,20 +0,0 @@ -# frozen_string_literal: true - -RSpec.describe Sidekiq::Pausable do - after { Sidekiq.unpause_all! } - - it "can still run heartbeats when paused" do - Sidekiq.pause! - - freeze_time 1.week.from_now - - jobs = Sidekiq::ScheduledSet.new - jobs.clear - middleware = Sidekiq::Pausable.new - - middleware.call(Jobs::RunHeartbeat.new, { "args" => [{}] }, "critical") { "done" } - - jobs = Sidekiq::ScheduledSet.new - expect(jobs.size).to eq(0) - end -end