diff --git a/app/jobs/regular/run_heartbeat.rb b/app/jobs/regular/run_heartbeat.rb index 364343d0434..7b6bd657fdb 100644 --- a/app/jobs/regular/run_heartbeat.rb +++ b/app/jobs/regular/run_heartbeat.rb @@ -2,19 +2,8 @@ module Jobs class RunHeartbeat < Jobs::Base - - sidekiq_options queue: 'critical' - - def self.heartbeat_key - 'heartbeat_last_run' - end - def execute(args) - $redis.set(self.class.heartbeat_key, Time.new.to_i.to_s) - end - - def self.last_heartbeat - $redis.get(heartbeat_key).to_i + Demon::Sidekiq.trigger_heartbeat(args[:queue_name]) end end end diff --git a/app/jobs/scheduled/heartbeat.rb b/app/jobs/scheduled/heartbeat.rb index c1b8a8cb24c..efd82139ec7 100644 --- a/app/jobs/scheduled/heartbeat.rb +++ b/app/jobs/scheduled/heartbeat.rb @@ -7,7 +7,9 @@ module Jobs every 3.minute def execute(args) - Jobs.enqueue(:run_heartbeat, {}) + Demon::Sidekiq::QUEUE_IDS.each do |identifier| + Jobs.enqueue(:run_heartbeat, queue_name: identifier, queue: identifier) + end end end end diff --git a/config/unicorn.conf.rb b/config/unicorn.conf.rb index 85b28005f5a..20d9f6d289c 100644 --- a/config/unicorn.conf.rb +++ b/config/unicorn.conf.rb @@ -144,25 +144,32 @@ before_fork do |server, worker| @sidekiq_next_heartbeat_check ||= Time.new.to_i + @sidekiq_heartbeat_interval if @sidekiq_next_heartbeat_check < Time.new.to_i - - last_heartbeat = Jobs::RunHeartbeat.last_heartbeat - restart = false + @sidekiq_next_heartbeat_check = Time.new.to_i + @sidekiq_heartbeat_interval + restarted = false if out_of_memory? Rails.logger.warn("Sidekiq is consuming too much memory (using: %0.2fM) for '%s', restarting" % [(max_rss.to_f / 1.megabyte), ENV["DISCOURSE_HOSTNAME"]]) - restart = true - end - - if last_heartbeat < Time.new.to_i - @sidekiq_heartbeat_interval - STDERR.puts "Sidekiq heartbeat test failed, restarting" - Rails.logger.warn "Sidekiq heartbeat test failed, restarting" - - restart = true - end - @sidekiq_next_heartbeat_check = Time.new.to_i + @sidekiq_heartbeat_interval - - if restart Demon::Sidekiq.restart + restarted = true + end + + if !restarted + Demon::Sidekiq::QUEUE_IDS.each do |identifier| + last_heartbeat = Demon::Sidekiq.get_queue_last_heartbeat(identifier) + + if last_heartbeat < Time.new.to_i - @sidekiq_heartbeat_interval + if demon = Demon::Sidekiq.demons.values.find { |d| d.identifier == identifier } + STDERR.puts "Sidekiq heartbeat test for worker #{demon.pid} failed, restarting" + Rails.logger.warn "Sidekiq heartbeat test for worker #{demon.pid} failed, restarting" + demon.stop + demon.start + restarted = true + end + end + end + end + + if restarted sleep 10 force_kill_rogue_sidekiq end diff --git a/lib/demon/base.rb b/lib/demon/base.rb index d1626114d70..e1181894982 100644 --- a/lib/demon/base.rb +++ b/lib/demon/base.rb @@ -11,6 +11,7 @@ class Demon::Base def self.start(count = 1, verbose: false) @demons ||= {} + before_start(count) count.times do |i| (@demons["#{prefix}_#{i}"] ||= new(i, verbose: verbose)).start end @@ -37,7 +38,10 @@ class Demon::Base end end - attr_reader :pid, :parent_pid, :started, :index + def self.before_start(count) + end + + attr_reader :pid, :parent_pid, :started, :index, :identifier attr_accessor :stop_timeout def initialize(index, rails_root: nil, parent_pid: nil, verbose: false) diff --git a/lib/demon/sidekiq.rb b/lib/demon/sidekiq.rb index d6da3b55d6c..11fdf561985 100644 --- a/lib/demon/sidekiq.rb +++ b/lib/demon/sidekiq.rb @@ -3,6 +3,38 @@ require "demon/base" class Demon::Sidekiq < Demon::Base + RANDOM_HEX = SecureRandom.hex + QUEUE_IDS = [] + + def self.queues_last_heartbeat_hash_key + @@queues_last_heartbeat_hash_key ||= "#{RANDOM_HEX}_queues_last_heartbeat_hash" + end + + def self.trigger_heartbeat(name) + $redis.hset(queues_last_heartbeat_hash_key, name, Time.new.to_i.to_s) + extend_expiry(queues_last_heartbeat_hash_key) + end + + def self.get_queue_last_heartbeat(name) + extend_expiry(queues_last_heartbeat_hash_key) + $redis.hget(queues_last_heartbeat_hash_key, name).to_i + end + + def self.clear_heartbeat_queues! + $redis.del(queues_last_heartbeat_hash_key) + end + + def self.before_start(count) + # cleans up heartbeat queues from previous boot up + Sidekiq::Queue.all.each { |queue| queue.clear if queue.name[/^\h{32}$/] } + count.times do + QUEUE_IDS << SecureRandom.hex + end + end + + def self.extend_expiry(key) + $redis.expire(key, 60 * 60) + end def self.prefix "sidekiq" @@ -12,6 +44,11 @@ class Demon::Sidekiq < Demon::Base blk ? (@blk = blk) : @blk end + def run + @identifier = QUEUE_IDS[@index] + super + end + private def suppress_stdout @@ -36,7 +73,7 @@ class Demon::Sidekiq < Demon::Base options = ["-c", GlobalSetting.sidekiq_workers.to_s] - [['critical', 8], ['default', 4], ['low', 2], ['ultra_low', 1]].each do |queue_name, weight| + [['critical', 8], [@identifier, 8], ['default', 4], ['low', 2], ['ultra_low', 1]].each do |queue_name, weight| custom_queue_hostname = ENV["UNICORN_SIDEKIQ_#{queue_name.upcase}_QUEUE_HOSTNAME"] if !custom_queue_hostname || custom_queue_hostname.split(',').include?(`hostname`.strip) diff --git a/spec/jobs/heartbeat_spec.rb b/spec/jobs/heartbeat_spec.rb index 315fe4172bc..c132e622f9d 100644 --- a/spec/jobs/heartbeat_spec.rb +++ b/spec/jobs/heartbeat_spec.rb @@ -2,6 +2,7 @@ require 'rails_helper' require_dependency 'jobs/base' +require_dependency 'demon/sidekiq' describe Jobs::Heartbeat do after do @@ -10,12 +11,14 @@ describe Jobs::Heartbeat do it "still enqueues heartbeats in readonly mode" do freeze_time 1.week.from_now + Demon::Sidekiq.clear_heartbeat_queues! + Jobs.run_immediately! Discourse.enable_readonly_mode - Sidekiq::Testing.inline! do - Jobs::Heartbeat.new.perform(nil) - expect(Jobs::RunHeartbeat.last_heartbeat).to eq(Time.new.to_i) - end + queue = SecureRandom.hex + Demon::Sidekiq::QUEUE_IDS << queue + Jobs::Heartbeat.new.perform(nil) + expect(Demon::Sidekiq.get_queue_last_heartbeat(queue)).to eq(Time.new.to_i) end end