discourse/app/jobs/base.rb
Alan Guo Xiang Tan 9812407f76
FIX: Redo Sidekiq monitoring to restart stuck sidekiq processes (#30198)
This commit reimplements how we monitor Sidekiq processes that are
forked from the Unicorn master process. Prior to this change, we rely on
`Jobs::Heartbeat` to enqueue a `Jobs::RunHeartbeat` job every 3 minutes.
The `Jobs::RunHeartbeat` job then sets a Redis key with a timestamp. In
the Unicorn master process, we then fetch the timestamp that has been set
by the job from Redis every 30 minutes. If the timestamp has not been
updated for more than 30 minutes, we restart the Sidekiq process. The
fundamental flaw with this approach is that it fails to consider
deployments with multiple hosts and multiple Sidekiq processes. A
sidekiq process on a host may be in a bad state but the heartbeat check
will not restart the process because the `Jobs::RunHeartbeat` job is
still being executed by the working Sidekiq processes on other hosts.

In order to properly ensure that stuck Sidekiq processs are restarted,
we now rely on the [Sidekiq::ProcessSet](https://github.com/sidekiq/sidekiq/wiki/API#processes)
API that is supported by Sidekiq. The API provides us with "near real-time (updated every 5 sec)
info about the current set of Sidekiq processes running". The API
provides useful information like the hostname, pid and also when Sidekiq
last did its own heartbeat check. With that information, we can easily
determine if a Sidekiq process needs to be restarted from the Unicorn
master process.
2024-12-18 12:48:50 +08:00

484 lines
14 KiB
Ruby

# frozen_string_literal: true
module Jobs
def self.queued
Sidekiq::Stats.new.enqueued
end
def self.run_later?
!@run_immediately
end
def self.run_immediately?
!!@run_immediately
end
def self.run_immediately!
@run_immediately = true
end
def self.run_later!
@run_immediately = false
end
def self.with_immediate_jobs
prior = @run_immediately
run_immediately!
yield
ensure
@run_immediately = prior
end
def self.last_job_performed_at
Sidekiq.redis do |r|
int = r.get("last_job_perform_at")
int ? Time.at(int.to_i) : nil
end
end
def self.num_email_retry_jobs
Sidekiq::RetrySet.new.count { |job| job.klass =~ /Email\z/ }
end
class Base
class JobInstrumenter
def initialize(job_class:, opts:, db:, jid:)
return unless enabled?
self.class.mutex.synchronize do
@data = {}
@data["hostname"] = Discourse.os_hostname
@data["pid"] = Process.pid # Pid
@data["database"] = db # DB name - multisite db name it ran on
@data["job_id"] = jid # Job unique ID
@data["job_name"] = job_class.name # Job Name - eg: Jobs::AboutStats
@data["job_type"] = job_class.try(:scheduled?) ? "scheduled" : "regular" # Job Type - either s for scheduled or r for regular
@data["opts"] = opts.to_json # Params - json encoded params for the job
if ENV["DISCOURSE_LOG_SIDEKIQ_INTERVAL"]
@data["status"] = "starting"
write_to_log
end
@data["status"] = "pending"
@start_timestamp = Process.clock_gettime(Process::CLOCK_MONOTONIC)
self.class.ensure_interval_logging!
@@active_jobs ||= []
@@active_jobs << self
MethodProfiler.ensure_discourse_instrumentation!
MethodProfiler.start
@data["live_slots_start"] = GC.stat[:heap_live_slots]
end
end
def stop(exception:)
return unless enabled?
self.class.mutex.synchronize do
profile = MethodProfiler.stop
@@active_jobs.delete(self)
@data["duration"] = profile[:total_duration] # Duration - length in seconds it took to run
@data["sql_duration"] = profile.dig(:sql, :duration) || 0 # Sql Duration (s)
@data["sql_calls"] = profile.dig(:sql, :calls) || 0 # Sql Statements - how many statements ran
@data["redis_duration"] = profile.dig(:redis, :duration) || 0 # Redis Duration (s)
@data["redis_calls"] = profile.dig(:redis, :calls) || 0 # Redis commands
@data["net_duration"] = profile.dig(:net, :duration) || 0 # Redis Duration (s)
@data["net_calls"] = profile.dig(:net, :calls) || 0 # Redis commands
@data["live_slots_finish"] = GC.stat[:heap_live_slots]
@data["live_slots"] = @data["live_slots_finish"] - @data["live_slots_start"]
if exception.present?
@data["exception"] = exception # Exception - if job fails a json encoded exception
@data["status"] = "failed"
else
@data["status"] = "success" # Status - fail, success, pending
end
write_to_log
end
end
def self.raw_log(message)
begin
logger << message
rescue => e
Discourse.warn_exception(e, message: "Exception encountered while logging Sidekiq job")
end
end
# For test environment only
def self.set_log_path(path)
@@log_path = path
@@logger = nil
end
# For test environment only
def self.reset_log_path
@@log_path = nil
@@logger = nil
end
def self.log_path
@@log_path ||= "#{Rails.root}/log/sidekiq.log"
end
def self.logger
@@logger ||=
begin
FileUtils.touch(log_path) if !File.exist?(log_path)
Logger.new(log_path)
end
end
def current_duration
Process.clock_gettime(Process::CLOCK_MONOTONIC) - @start_timestamp
end
def write_to_log
return unless enabled?
@data["@timestamp"] = Time.now
@data["duration"] = current_duration if @data["status"] == "pending"
self.class.raw_log("#{@data.to_json}\n")
end
def enabled?
Discourse.enable_sidekiq_logging?
end
def self.mutex
@@mutex ||= Mutex.new
end
def self.ensure_interval_logging!
interval = ENV["DISCOURSE_LOG_SIDEKIQ_INTERVAL"]
return if !interval
interval = interval.to_i
@@interval_thread ||=
Thread.new do
begin
loop do
sleep interval
mutex.synchronize do
@@active_jobs.each { |j| j.write_to_log if j.current_duration > interval }
end
end
rescue Exception => e
Discourse.warn_exception(
e,
message: "Sidekiq interval logging thread terminated unexpectedly",
)
end
end
end
end
include Sidekiq::Worker
def self.cluster_concurrency(val)
raise ArgumentError, "cluster_concurrency must be 1 or nil" if val != 1 && val != nil
@cluster_concurrency = val
end
def self.get_cluster_concurrency
@cluster_concurrency
end
def log(*args)
args.each do |arg|
Rails.logger.info "#{Time.now.to_formatted_s(:db)}: [#{self.class.name.upcase}] #{arg}"
end
true
end
# Construct an error context object for Discourse.handle_exception
# Subclasses are encouraged to use this!
#
# `opts` is the arguments passed to execute().
# `code_desc` is a short string describing what the code was doing (optional).
# `extra` is for any other context you logged.
# Note that, when building your `extra`, that :opts, :job, and :code are used by this method,
# and :current_db and :current_hostname are used by handle_exception.
def error_context(opts, code_desc = nil, extra = {})
ctx = {}
ctx[:opts] = opts
ctx[:job] = self.class
ctx[:message] = code_desc if code_desc
ctx.merge!(extra) if extra != nil
ctx
end
def self.delayed_perform(opts = {})
self.new.perform(opts)
end
def execute(opts = {})
raise "Overwrite me!"
end
def last_db_duration
@db_duration || 0
end
def perform_immediately(*args)
opts = args.extract_options!.with_indifferent_access
if opts.has_key?(:current_site_id) &&
opts[:current_site_id] != RailsMultisite::ConnectionManagement.current_db
raise ArgumentError.new(
"You can't connect to another database when executing a job synchronously.",
)
else
begin
retval = execute(opts)
rescue => exc
Discourse.handle_job_exception(exc, error_context(opts))
end
retval
end
end
def self.cluster_concurrency_redis_key
"cluster_concurrency:#{self}"
end
def self.clear_cluster_concurrency_lock!
Discourse.redis.without_namespace.del(cluster_concurrency_redis_key)
end
def self.acquire_cluster_concurrency_lock!
!!Discourse.redis.without_namespace.set(cluster_concurrency_redis_key, 0, nx: true, ex: 120)
end
def perform(*args)
requeued = false
keepalive_thread = nil
finished = false
if self.class.get_cluster_concurrency
if !self.class.acquire_cluster_concurrency_lock!
self.class.perform_in(10.seconds, *args)
requeued = true
return
end
parent_thread = Thread.current
cluster_concurrency_redis_key = self.class.cluster_concurrency_redis_key
keepalive_thread =
Thread.new do
while parent_thread.alive? && !finished
Discourse.redis.without_namespace.expire(cluster_concurrency_redis_key, 120)
# Sleep for 60 seconds, but wake up every second to check if the job has been completed
60.times do
break if finished
sleep 1
end
end
end
end
opts = args.extract_options!.with_indifferent_access
Sidekiq.redis { |r| r.set("last_job_perform_at", Time.now.to_i) } if ::Jobs.run_later?
dbs =
if opts[:current_site_id]
[opts[:current_site_id]]
else
RailsMultisite::ConnectionManagement.all_dbs
end
exceptions = []
dbs.each do |db|
begin
exception = {}
RailsMultisite::ConnectionManagement.with_connection(db) do
job_instrumenter =
JobInstrumenter.new(job_class: self.class, opts: opts, db: db, jid: jid)
begin
I18n.locale =
SiteSetting.default_locale || SiteSettings::DefaultsProvider::DEFAULT_LOCALE
I18n.ensure_all_loaded!
begin
logster_env = {}
Logster.add_to_env(logster_env, :job, self.class.to_s)
Logster.add_to_env(logster_env, :db, db)
Thread.current[Logster::Logger::LOGSTER_ENV] = logster_env
execute(opts)
rescue => e
exception[:ex] = e
exception[:other] = { problem_db: db }
end
rescue => e
exception[:ex] = e
exception[:message] = "While establishing database connection to #{db}"
exception[:other] = { problem_db: db }
ensure
job_instrumenter.stop(exception: exception)
end
end
exceptions << exception unless exception.empty?
end
end
Thread.current[Logster::Logger::LOGSTER_ENV] = nil
if exceptions.length > 0
exceptions.each do |exception_hash|
Discourse.handle_job_exception(
exception_hash[:ex],
error_context(opts, exception_hash[:code], exception_hash[:other]),
)
end
raise HandledExceptionWrapper.new(exceptions[0][:ex])
end
nil
ensure
if self.class.get_cluster_concurrency && !requeued
finished = true
keepalive_thread.wakeup
keepalive_thread.join
self.class.clear_cluster_concurrency_lock!
end
ActiveRecord::Base.connection_handler.clear_active_connections!
end
end
class HandledExceptionWrapper < StandardError
attr_accessor :wrapped
def initialize(ex)
super("Wrapped #{ex.class}: #{ex.message}")
@wrapped = ex
end
end
class Scheduled < Base
extend MiniScheduler::Schedule
def perform(*args)
super if !Discourse.readonly_mode?
end
end
def self.enqueue(job, opts = {})
if job.instance_of?(Class)
klass = job
else
klass = "::Jobs::#{job.to_s.camelcase}".constantize
end
# Unless we want to work on all sites
unless opts.delete(:all_sites)
opts[:current_site_id] ||= RailsMultisite::ConnectionManagement.current_db
end
delay = opts.delete(:delay_for)
queue = opts.delete(:queue)
# Only string keys are allowed in JSON. We call `.with_indifferent_access`
# in Jobs::Base#perform, so this is invisible to developers
opts = opts.deep_stringify_keys
# Simulate the args being dumped/parsed through JSON
parsed_opts = JSON.parse(JSON.dump(opts))
if opts != parsed_opts
Discourse.deprecate(<<~TEXT.squish, since: "2.9", drop_from: "3.0", output_in_test: true)
#{klass.name} was enqueued with argument values which do not cleanly serialize to/from JSON.
This means that the job will be run with slightly different values than the ones supplied to `enqueue`.
Argument values should be strings, booleans, numbers, or nil (or arrays/hashes of those value types).
TEXT
end
opts = parsed_opts
if ::Jobs.run_later?
hash = { "class" => klass, "args" => [opts] }
if delay
hash["at"] = Time.now.to_f + delay.to_f if delay.to_f > 0
end
hash["queue"] = queue if queue
DB.after_commit { klass.client_push(hash) }
else
if Rails.env == "development"
Scheduler::Defer.later("job") { klass.new.perform(opts) }
else
# Run the job synchronously
# But never run a job inside another job
# That could cause deadlocks during test runs
queue = Thread.current[:discourse_nested_job_queue]
outermost_job = !queue
if outermost_job
queue = Queue.new
Thread.current[:discourse_nested_job_queue] = queue
end
queue.push([klass, opts])
if outermost_job
# responsible for executing the queue
begin
until queue.empty?
queued_klass, queued_opts = queue.pop(true)
queued_klass.new.perform_immediately(queued_opts)
end
ensure
Thread.current[:discourse_nested_job_queue] = nil
end
end
end
end
end
def self.enqueue_in(secs, job_name, opts = {})
enqueue(job_name, opts.merge!(delay_for: secs))
end
def self.enqueue_at(datetime, job_name, opts = {})
secs = [datetime.to_f - Time.zone.now.to_f, 0].max
enqueue_in(secs, job_name, opts)
end
def self.cancel_scheduled_job(job_name, opts = {})
scheduled_for(job_name, opts).each(&:delete)
end
def self.scheduled_for(job_name, opts = {})
opts = opts.with_indifferent_access
unless opts.delete(:all_sites)
opts[:current_site_id] ||= RailsMultisite::ConnectionManagement.current_db
end
job_class = "Jobs::#{job_name.to_s.camelcase}"
Sidekiq::ScheduledSet.new.select do |scheduled_job|
if scheduled_job.klass.to_s == job_class
matched = true
job_params = scheduled_job.item["args"][0].with_indifferent_access
opts.each do |key, value|
if job_params[key] != value
matched = false
break
end
end
matched
else
false
end
end
end
end