mirror of
https://github.com/discourse/discourse.git
synced 2025-03-14 10:55:30 +08:00
FIX: Redo Sidekiq monitoring to restart stuck sidekiq processes (#30198)
This commit reimplements how we monitor Sidekiq processes that are forked from the Unicorn master process. Prior to this change, we rely on `Jobs::Heartbeat` to enqueue a `Jobs::RunHeartbeat` job every 3 minutes. The `Jobs::RunHeartbeat` job then sets a Redis key with a timestamp. In the Unicorn master process, we then fetch the timestamp that has been set by the job from Redis every 30 minutes. If the timestamp has not been updated for more than 30 minutes, we restart the Sidekiq process. The fundamental flaw with this approach is that it fails to consider deployments with multiple hosts and multiple Sidekiq processes. A sidekiq process on a host may be in a bad state but the heartbeat check will not restart the process because the `Jobs::RunHeartbeat` job is still being executed by the working Sidekiq processes on other hosts. In order to properly ensure that stuck Sidekiq processs are restarted, we now rely on the [Sidekiq::ProcessSet](https://github.com/sidekiq/sidekiq/wiki/API#processes) API that is supported by Sidekiq. The API provides us with "near real-time (updated every 5 sec) info about the current set of Sidekiq processes running". The API provides useful information like the hostname, pid and also when Sidekiq last did its own heartbeat check. With that information, we can easily determine if a Sidekiq process needs to be restarted from the Unicorn master process.
This commit is contained in:
parent
f8837e1a8a
commit
9812407f76
@ -368,7 +368,7 @@ module Jobs
|
||||
extend MiniScheduler::Schedule
|
||||
|
||||
def perform(*args)
|
||||
super if (::Jobs::Heartbeat === self) || !Discourse.readonly_mode?
|
||||
super if !Discourse.readonly_mode?
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -1,19 +1,14 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
# This job is deprecated and will be removed in the future. The only reason it exists is for clean up purposes.
|
||||
module Jobs
|
||||
class RunHeartbeat < ::Jobs::Base
|
||||
sidekiq_options queue: "critical"
|
||||
|
||||
def self.heartbeat_key
|
||||
"heartbeat_last_run"
|
||||
end
|
||||
|
||||
def execute(args)
|
||||
Discourse.redis.set(self.class.heartbeat_key, Time.now.to_i.to_s)
|
||||
end
|
||||
|
||||
def self.last_heartbeat
|
||||
Discourse.redis.get(heartbeat_key).to_i
|
||||
Discourse.redis.del(self.class.heartbeat_key)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -1,9 +1,10 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
# This job is deprecated and will be removed in the future. The only reason it exists is for clean up purposes.
|
||||
module Jobs
|
||||
# used to ensure at least 1 sidekiq is running correctly
|
||||
class Heartbeat < ::Jobs::Scheduled
|
||||
every 3.minute
|
||||
every 24.hours
|
||||
|
||||
def execute(args)
|
||||
::Jobs.enqueue(:run_heartbeat, {})
|
||||
|
@ -69,6 +69,7 @@ before_fork do |server, worker|
|
||||
initialized = true
|
||||
|
||||
supervisor = ENV["UNICORN_SUPERVISOR_PID"].to_i
|
||||
|
||||
if supervisor > 0
|
||||
Thread.new do
|
||||
while true
|
||||
@ -82,6 +83,7 @@ before_fork do |server, worker|
|
||||
end
|
||||
|
||||
sidekiqs = ENV["UNICORN_SIDEKIQS"].to_i
|
||||
|
||||
if sidekiqs > 0
|
||||
server.logger.info "starting #{sidekiqs} supervised sidekiqs"
|
||||
|
||||
@ -107,7 +109,9 @@ before_fork do |server, worker|
|
||||
end
|
||||
end
|
||||
|
||||
if ENV["DISCOURSE_ENABLE_EMAIL_SYNC_DEMON"] == "true"
|
||||
enable_email_sync_demon = ENV["DISCOURSE_ENABLE_EMAIL_SYNC_DEMON"] == "true"
|
||||
|
||||
if enable_email_sync_demon
|
||||
server.logger.info "starting up EmailSync demon"
|
||||
Demon::EmailSync.start(1, logger: server.logger)
|
||||
end
|
||||
@ -117,9 +121,32 @@ before_fork do |server, worker|
|
||||
demon_class.start(1, logger: server.logger)
|
||||
end
|
||||
|
||||
class ::Unicorn::HttpServer
|
||||
alias master_sleep_orig master_sleep
|
||||
Thread.new do
|
||||
while true
|
||||
begin
|
||||
sleep 60
|
||||
|
||||
if sidekiqs > 0
|
||||
Demon::Sidekiq.ensure_running
|
||||
Demon::Sidekiq.heartbeat_check
|
||||
Demon::Sidekiq.rss_memory_check
|
||||
end
|
||||
|
||||
if enable_email_sync_demon
|
||||
Demon::EmailSync.ensure_running
|
||||
check_email_sync_heartbeat
|
||||
end
|
||||
|
||||
DiscoursePluginRegistry.demon_processes.each { |demon_class| demon_class.ensure_running }
|
||||
rescue => e
|
||||
Rails.logger.warn(
|
||||
"Error in demon processes heartbeat check: #{e}\n#{e.backtrace.join("\n")}",
|
||||
)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class ::Unicorn::HttpServer
|
||||
# Original source: https://github.com/defunkt/unicorn/blob/6c9c442fb6aa12fd871237bc2bb5aec56c5b3538/lib/unicorn/http_server.rb#L477-L496
|
||||
def murder_lazy_workers
|
||||
next_sleep = @timeout - 1
|
||||
@ -154,72 +181,6 @@ before_fork do |server, worker|
|
||||
next_sleep <= 0 ? 1 : next_sleep
|
||||
end
|
||||
|
||||
def max_sidekiq_rss
|
||||
rss =
|
||||
`ps -eo rss,args | grep sidekiq | grep -v grep | awk '{print $1}'`.split("\n")
|
||||
.map(&:to_i)
|
||||
.max
|
||||
|
||||
rss ||= 0
|
||||
|
||||
rss * 1024
|
||||
end
|
||||
|
||||
def max_allowed_sidekiq_rss
|
||||
[ENV["UNICORN_SIDEKIQ_MAX_RSS"].to_i, 500].max.megabytes
|
||||
end
|
||||
|
||||
def force_kill_rogue_sidekiq
|
||||
info = `ps -eo pid,rss,args | grep sidekiq | grep -v grep | awk '{print $1,$2}'`
|
||||
info
|
||||
.split("\n")
|
||||
.each do |row|
|
||||
pid, mem = row.split(" ").map(&:to_i)
|
||||
if pid > 0 && (mem * 1024) > max_allowed_sidekiq_rss
|
||||
Rails.logger.warn "Detected rogue Sidekiq pid #{pid} mem #{mem * 1024}, killing"
|
||||
begin
|
||||
Process.kill("KILL", pid)
|
||||
rescue StandardError
|
||||
nil
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def check_sidekiq_heartbeat
|
||||
@sidekiq_heartbeat_interval ||= 30.minutes
|
||||
@sidekiq_next_heartbeat_check ||= Time.now.to_i + @sidekiq_heartbeat_interval
|
||||
|
||||
if @sidekiq_next_heartbeat_check < Time.now.to_i
|
||||
last_heartbeat = Jobs::RunHeartbeat.last_heartbeat
|
||||
restart = false
|
||||
|
||||
sidekiq_rss = max_sidekiq_rss
|
||||
if sidekiq_rss > max_allowed_sidekiq_rss
|
||||
Rails.logger.warn(
|
||||
"Sidekiq is consuming too much memory (using: %0.2fM) for '%s', restarting" %
|
||||
[(sidekiq_rss.to_f / 1.megabyte), ENV["DISCOURSE_HOSTNAME"]],
|
||||
)
|
||||
|
||||
restart = true
|
||||
end
|
||||
|
||||
if last_heartbeat < Time.now.to_i - @sidekiq_heartbeat_interval
|
||||
Rails.logger.warn "Sidekiq heartbeat test failed, restarting"
|
||||
|
||||
restart = true
|
||||
end
|
||||
@sidekiq_next_heartbeat_check = Time.now.to_i + @sidekiq_heartbeat_interval
|
||||
|
||||
if restart
|
||||
Demon::Sidekiq.restart
|
||||
sleep 10
|
||||
force_kill_rogue_sidekiq
|
||||
end
|
||||
Discourse.redis.close
|
||||
end
|
||||
end
|
||||
|
||||
def max_email_sync_rss
|
||||
return 0 if Demon::EmailSync.demons.empty?
|
||||
|
||||
@ -273,23 +234,6 @@ before_fork do |server, worker|
|
||||
|
||||
Demon::EmailSync.restart if restart
|
||||
end
|
||||
|
||||
def master_sleep(sec)
|
||||
sidekiqs = ENV["UNICORN_SIDEKIQS"].to_i
|
||||
if sidekiqs > 0
|
||||
Demon::Sidekiq.ensure_running
|
||||
check_sidekiq_heartbeat
|
||||
end
|
||||
|
||||
if ENV["DISCOURSE_ENABLE_EMAIL_SYNC_DEMON"] == "true"
|
||||
Demon::EmailSync.ensure_running
|
||||
check_email_sync_heartbeat
|
||||
end
|
||||
|
||||
DiscoursePluginRegistry.demon_processes.each { |demon_class| demon_class.ensure_running }
|
||||
|
||||
master_sleep_orig(sec)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -9,6 +9,20 @@ class Demon::Base
|
||||
@demons
|
||||
end
|
||||
|
||||
if Rails.env.test?
|
||||
def self.set_demons(demons)
|
||||
@demons = demons
|
||||
end
|
||||
|
||||
def self.reset_demons
|
||||
@demons = {}
|
||||
end
|
||||
|
||||
def set_pid(pid)
|
||||
@pid = pid
|
||||
end
|
||||
end
|
||||
|
||||
def self.start(count = 1, verbose: false, logger: nil)
|
||||
@demons ||= {}
|
||||
count.times { |i| (@demons["#{prefix}_#{i}"] ||= new(i, verbose:, logger:)).start }
|
||||
@ -21,10 +35,7 @@ class Demon::Base
|
||||
|
||||
def self.restart
|
||||
return unless @demons
|
||||
@demons.values.each do |demon|
|
||||
demon.stop
|
||||
demon.start
|
||||
end
|
||||
@demons.values.each { |demon| demon.restart }
|
||||
end
|
||||
|
||||
def self.ensure_running
|
||||
@ -75,6 +86,11 @@ class Demon::Base
|
||||
"HUP"
|
||||
end
|
||||
|
||||
def restart
|
||||
stop
|
||||
start
|
||||
end
|
||||
|
||||
def stop
|
||||
@started = false
|
||||
|
||||
@ -105,7 +121,9 @@ class Demon::Base
|
||||
wait_for_stop.call
|
||||
|
||||
if alive?
|
||||
log("Process would not terminate cleanly, force quitting. pid: #{@pid} #{self.class}")
|
||||
log(
|
||||
"Process would not terminate cleanly, force quitting. pid: #{@pid} #{self.class}\n#{caller.join("\n")}",
|
||||
)
|
||||
Process.kill("KILL", @pid)
|
||||
end
|
||||
|
||||
|
@ -3,6 +3,8 @@
|
||||
require "demon/base"
|
||||
|
||||
class Demon::Sidekiq < ::Demon::Base
|
||||
HOSTNAME = Socket.gethostname
|
||||
|
||||
def self.prefix
|
||||
"sidekiq"
|
||||
end
|
||||
@ -11,6 +13,65 @@ class Demon::Sidekiq < ::Demon::Base
|
||||
blk ? (@blk = blk) : @blk
|
||||
end
|
||||
|
||||
# By default Sidekiq does a heartbeat check every 5 seconds. If the processes misses 20 heartbeat checks, we consider it
|
||||
# dead and kill the process.
|
||||
SIDEKIQ_HEARTBEAT_CHECK_MISS_THRESHOLD_SECONDS = 5.seconds * 20
|
||||
|
||||
def self.heartbeat_check
|
||||
sidekiq_processes_for_current_hostname = {}
|
||||
|
||||
Sidekiq::ProcessSet.new.each do |process|
|
||||
if process["hostname"] == HOSTNAME
|
||||
sidekiq_processes_for_current_hostname[process["pid"]] = process
|
||||
end
|
||||
end
|
||||
|
||||
Demon::Sidekiq.demons.values.each do |daemon|
|
||||
next if !daemon.already_running?
|
||||
|
||||
running_sidekiq_process = sidekiq_processes_for_current_hostname[daemon.pid]
|
||||
|
||||
if !running_sidekiq_process ||
|
||||
(Time.now.to_i - running_sidekiq_process["beat"]) >
|
||||
SIDEKIQ_HEARTBEAT_CHECK_MISS_THRESHOLD_SECONDS
|
||||
Rails.logger.warn("Sidekiq heartbeat test failed for #{daemon.pid}, restarting")
|
||||
daemon.restart
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
SIDEKIQ_RSS_MEMORY_CHECK_INTERVAL_SECONDS = 30.minutes
|
||||
|
||||
def self.rss_memory_check
|
||||
if defined?(@@last_sidekiq_rss_memory_check) && @@last_sidekiq_rss_memory_check &&
|
||||
@@last_sidekiq_rss_memory_check > Time.now.to_i - SIDEKIQ_RSS_MEMORY_CHECK_INTERVAL_SECONDS
|
||||
return @@last_sidekiq_rss_memory_check
|
||||
end
|
||||
|
||||
Demon::Sidekiq.demons.values.each do |daemon|
|
||||
next if !daemon.already_running?
|
||||
|
||||
daemon_rss_bytes = (`ps -o rss= -p #{daemon.pid}`.chomp.to_i || 0) * 1024
|
||||
|
||||
if daemon_rss_bytes > max_allowed_sidekiq_rss_bytes
|
||||
Rails.logger.warn(
|
||||
"Sidekiq is consuming too much memory (using: %0.2fM) for '%s', restarting" %
|
||||
[(daemon_rss_bytes.to_f / 1.megabyte), HOSTNAME],
|
||||
)
|
||||
|
||||
daemon.restart
|
||||
end
|
||||
end
|
||||
|
||||
@@last_sidekiq_rss_memory_check = Time.now.to_i
|
||||
end
|
||||
|
||||
DEFAULT_MAX_ALLOWED_SIDEKIQ_RSS_MEGABYTES = 500
|
||||
|
||||
def self.max_allowed_sidekiq_rss_bytes
|
||||
[ENV["UNICORN_SIDEKIQ_MAX_RSS"].to_i, DEFAULT_MAX_ALLOWED_SIDEKIQ_RSS_MEGABYTES].max.megabytes
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def suppress_stdout
|
||||
|
@ -125,7 +125,7 @@ class Sidekiq::Pausable
|
||||
end
|
||||
|
||||
def call(worker, msg, queue)
|
||||
if sidekiq_paused?(msg) && !(Jobs::RunHeartbeat === worker)
|
||||
if sidekiq_paused?(msg)
|
||||
worker.class.perform_in(@delay, *msg["args"])
|
||||
else
|
||||
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
||||
|
@ -1,16 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
RSpec.describe ::Jobs::Heartbeat do
|
||||
after { Discourse.disable_readonly_mode }
|
||||
|
||||
it "still enqueues heartbeats in readonly mode" do
|
||||
freeze_time 1.week.from_now
|
||||
|
||||
Discourse.enable_readonly_mode
|
||||
|
||||
Sidekiq::Testing.inline! do
|
||||
::Jobs::Heartbeat.new.perform(nil)
|
||||
expect(::Jobs::RunHeartbeat.last_heartbeat).to eq(Time.now.to_i)
|
||||
end
|
||||
end
|
||||
end
|
@ -126,6 +126,11 @@ RSpec.describe Jobs do
|
||||
end
|
||||
|
||||
describe "cancel_scheduled_job" do
|
||||
class Jobs::SomeJob < ::Jobs::Base
|
||||
def execute(args)
|
||||
end
|
||||
end
|
||||
|
||||
let(:scheduled_jobs) { Sidekiq::ScheduledSet.new }
|
||||
|
||||
after { scheduled_jobs.clear }
|
||||
@ -135,18 +140,18 @@ RSpec.describe Jobs do
|
||||
scheduled_jobs.clear
|
||||
expect(scheduled_jobs.size).to eq(0)
|
||||
|
||||
Jobs.enqueue_in(1.year, :run_heartbeat, topic_id: 123)
|
||||
Jobs.enqueue_in(2.years, :run_heartbeat, topic_id: 456)
|
||||
Jobs.enqueue_in(3.years, :run_heartbeat, topic_id: 123, current_site_id: "foo")
|
||||
Jobs.enqueue_in(4.years, :run_heartbeat, topic_id: 123, current_site_id: "bar")
|
||||
Jobs.enqueue_in(1.year, :some_job, topic_id: 123)
|
||||
Jobs.enqueue_in(2.years, :some_job, topic_id: 456)
|
||||
Jobs.enqueue_in(3.years, :some_job, topic_id: 123, current_site_id: "foo")
|
||||
Jobs.enqueue_in(4.years, :some_job, topic_id: 123, current_site_id: "bar")
|
||||
|
||||
expect(scheduled_jobs.size).to eq(4)
|
||||
|
||||
Jobs.cancel_scheduled_job(:run_heartbeat, topic_id: 123)
|
||||
Jobs.cancel_scheduled_job(:some_job, topic_id: 123)
|
||||
|
||||
expect(scheduled_jobs.size).to eq(3)
|
||||
|
||||
Jobs.cancel_scheduled_job(:run_heartbeat, topic_id: 123, all_sites: true)
|
||||
Jobs.cancel_scheduled_job(:some_job, topic_id: 123, all_sites: true)
|
||||
|
||||
expect(scheduled_jobs.size).to eq(1)
|
||||
end
|
||||
|
67
spec/lib/demon/sidekiq_spec.rb
Normal file
67
spec/lib/demon/sidekiq_spec.rb
Normal file
@ -0,0 +1,67 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
RSpec.describe Demon::Sidekiq do
|
||||
describe ".heartbeat_check" do
|
||||
it "should restart sidekiq daemons when daemon cannot be match to an entry in Sidekiq::ProcessSet or when heartbeat check has been missed" do
|
||||
running_sidekiq_daemon = described_class.new(1)
|
||||
running_sidekiq_daemon.set_pid(1)
|
||||
missing_sidekiq_daemon = described_class.new(2)
|
||||
missing_sidekiq_daemon.set_pid(2)
|
||||
missed_heartbeat_sidekiq_daemon = described_class.new(3)
|
||||
missed_heartbeat_sidekiq_daemon.set_pid(3)
|
||||
|
||||
Sidekiq::ProcessSet.expects(:new).returns(
|
||||
[
|
||||
{ "hostname" => described_class::HOSTNAME, "pid" => 1, "beat" => Time.now.to_i },
|
||||
{
|
||||
"hostname" => described_class::HOSTNAME,
|
||||
"pid" => 3,
|
||||
"beat" =>
|
||||
Time.now.to_i - described_class::SIDEKIQ_HEARTBEAT_CHECK_MISS_THRESHOLD_SECONDS - 1,
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
described_class.set_demons(
|
||||
{
|
||||
"running_sidekiq_daemon" => running_sidekiq_daemon,
|
||||
"missing_sidekiq_daemon" => missing_sidekiq_daemon,
|
||||
"missed_heartbeat_sidekiq_daemon" => missed_heartbeat_sidekiq_daemon,
|
||||
},
|
||||
)
|
||||
|
||||
running_sidekiq_daemon.expects(:already_running?).returns(true)
|
||||
missing_sidekiq_daemon.expects(:already_running?).returns(true)
|
||||
missed_heartbeat_sidekiq_daemon.expects(:already_running?).returns(true)
|
||||
|
||||
running_sidekiq_daemon.expects(:restart).never
|
||||
missing_sidekiq_daemon.expects(:restart)
|
||||
missed_heartbeat_sidekiq_daemon.expects(:restart)
|
||||
|
||||
described_class.heartbeat_check
|
||||
ensure
|
||||
described_class.reset_demons
|
||||
end
|
||||
end
|
||||
|
||||
describe ".rss_memory_check" do
|
||||
it "should restart sidekiq daemons when daemon's RSS memory exceeds the maximum allowed RSS memory" do
|
||||
stub_const(described_class, "SIDEKIQ_RSS_MEMORY_CHECK_INTERVAL_SECONDS", 0) do
|
||||
# Set to a negative value to fake that the process has exceeded the maximum allowed RSS memory
|
||||
stub_const(described_class, "DEFAULT_MAX_ALLOWED_SIDEKIQ_RSS_MEGABYTES", -1) do
|
||||
sidekiq_daemon = described_class.new(1)
|
||||
sidekiq_daemon.set_pid(1)
|
||||
|
||||
described_class.set_demons({ "sidekiq_daemon" => sidekiq_daemon })
|
||||
|
||||
sidekiq_daemon.expects(:already_running?).returns(true)
|
||||
sidekiq_daemon.expects(:restart)
|
||||
|
||||
described_class.rss_memory_check
|
||||
end
|
||||
end
|
||||
ensure
|
||||
described_class.reset_demons
|
||||
end
|
||||
end
|
||||
end
|
@ -1,20 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
RSpec.describe Sidekiq::Pausable do
|
||||
after { Sidekiq.unpause_all! }
|
||||
|
||||
it "can still run heartbeats when paused" do
|
||||
Sidekiq.pause!
|
||||
|
||||
freeze_time 1.week.from_now
|
||||
|
||||
jobs = Sidekiq::ScheduledSet.new
|
||||
jobs.clear
|
||||
middleware = Sidekiq::Pausable.new
|
||||
|
||||
middleware.call(Jobs::RunHeartbeat.new, { "args" => [{}] }, "critical") { "done" }
|
||||
|
||||
jobs = Sidekiq::ScheduledSet.new
|
||||
expect(jobs.size).to eq(0)
|
||||
end
|
||||
end
|
Loading…
x
Reference in New Issue
Block a user