discourse/lib/sidekiq/pausable.rb
Alan Guo Xiang Tan 9812407f76
FIX: Redo Sidekiq monitoring to restart stuck sidekiq processes (#30198)
This commit reimplements how we monitor Sidekiq processes that are
forked from the Unicorn master process. Prior to this change, we rely on
`Jobs::Heartbeat` to enqueue a `Jobs::RunHeartbeat` job every 3 minutes.
The `Jobs::RunHeartbeat` job then sets a Redis key with a timestamp. In
the Unicorn master process, we then fetch the timestamp that has been set
by the job from Redis every 30 minutes. If the timestamp has not been
updated for more than 30 minutes, we restart the Sidekiq process. The
fundamental flaw with this approach is that it fails to consider
deployments with multiple hosts and multiple Sidekiq processes. A
sidekiq process on a host may be in a bad state but the heartbeat check
will not restart the process because the `Jobs::RunHeartbeat` job is
still being executed by the working Sidekiq processes on other hosts.

In order to properly ensure that stuck Sidekiq processs are restarted,
we now rely on the [Sidekiq::ProcessSet](https://github.com/sidekiq/sidekiq/wiki/API#processes)
API that is supported by Sidekiq. The API provides us with "near real-time (updated every 5 sec)
info about the current set of Sidekiq processes running". The API
provides useful information like the hostname, pid and also when Sidekiq
last did its own heartbeat check. With that information, we can easily
determine if a Sidekiq process needs to be restarted from the Unicorn
master process.
2024-12-18 12:48:50 +08:00

147 lines
3.1 KiB
Ruby

# frozen_string_literal: true
class SidekiqPauser
TTL = 60
PAUSED_KEY = "sidekiq_is_paused_v2"
def initialize
@mutex = Mutex.new
@dbs ||= Set.new
end
def pause!(value = "paused")
Discourse.redis.setex PAUSED_KEY, TTL, value
extend_lease_thread
true
end
def paused?
!!Discourse.redis.get(PAUSED_KEY)
end
def unpause_all!
@mutex.synchronize do
@dbs = Set.new
stop_extend_lease_thread
end
RailsMultisite::ConnectionManagement.each_connection { unpause! if paused? }
end
def paused_dbs
dbs = []
RailsMultisite::ConnectionManagement.each_connection do
dbs << RailsMultisite::ConnectionManagement.current_db if paused?
end
dbs
end
def unpause!
@mutex.synchronize do
@dbs.delete(RailsMultisite::ConnectionManagement.current_db)
stop_extend_lease_thread if @dbs.size == 0
end
Discourse.redis.del(PAUSED_KEY)
true
end
private
def stop_extend_lease_thread
# should always be called from a mutex
if t = @extend_lease_thread
@extend_lease_thread = nil
while t.alive?
begin
t.wakeup
rescue ThreadError => e
raise e unless e.message =~ /killed thread/
end
sleep 0
end
end
end
def extend_lease_thread
@mutex.synchronize do
@dbs << RailsMultisite::ConnectionManagement.current_db
@extend_lease_thread ||=
Thread.new do
while true
break if !@extend_lease_thread
@mutex.synchronize do
@dbs.each do |db|
RailsMultisite::ConnectionManagement.with_connection(db) do
if !Discourse.redis.expire(PAUSED_KEY, TTL)
# if it was unpaused in another process we got to remove the
# bad key
@dbs.delete(db)
end
end
end
end
sleep(Rails.env.test? ? 0.01 : TTL / 2)
end
end
end
end
end
module Sidekiq
@pauser = SidekiqPauser.new
def self.pause!(key = nil)
key ? @pauser.pause!(key) : @pauser.pause!
end
def self.paused?
@pauser.paused?
end
def self.unpause!
@pauser.unpause!
end
def self.unpause_all!
@pauser.unpause_all!
end
def self.paused_dbs
@pauser.paused_dbs
end
end
# server middleware that will reschedule work whenever Sidekiq is paused
class Sidekiq::Pausable
def initialize(delay = 5.seconds)
@delay = delay
end
def call(worker, msg, queue)
if sidekiq_paused?(msg)
worker.class.perform_in(@delay, *msg["args"])
else
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
result = yield
duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
DiscourseEvent.trigger(:sidekiq_job_ran, worker, msg, queue, duration)
result
end
end
private
def sidekiq_paused?(msg)
if site_id = msg["args"]&.first&.dig("current_site_id")
RailsMultisite::ConnectionManagement.with_connection(site_id) { Sidekiq.paused? }
end
end
end