mirror of
https://github.com/discourse/discourse.git
synced 2025-01-27 11:25:16 +08:00
8a3d9d7036
When running jobs in tests, we use `Jobs.run_immediately!`. This means that jobs are run synchronously when they are enqueued. Jobs sometimes enqueue other jobs, which are also executed synchronously. This means that the outermost job will block until the inner jobs have finished executing. In some cases (e.g. process_post with hotlinked images) this can lead to a deadlock. This commit changes the behavior slightly. Now we will never run jobs inside other jobs. Instead, we will queue them up and run them sequentially in the order they were enqueued. As a whole, they are still executed synchronously. Consider the example ```ruby class Jobs::InnerJob < Jobs::Base def execute(args) puts "Running inner job" end end class Jobs::OuterJob < Jobs::Base def execute(args) puts "Starting outer job" Jobs.enqueue(:inner_job) puts "Finished outer job" end end Jobs.enqueue(:outer_job) puts "All jobs complete" ``` The old behavior would result in: ``` Starting outer job Running inner job Finished outer job All jobs complete ``` The new behavior will result in: ``` Starting outer job Finished outer job Running inner job All jobs complete ```
392 lines
11 KiB
Ruby
392 lines
11 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
module Jobs
|
|
|
|
def self.queued
|
|
Sidekiq::Stats.new.enqueued
|
|
end
|
|
|
|
def self.run_later?
|
|
!@run_immediately
|
|
end
|
|
|
|
def self.run_immediately?
|
|
!!@run_immediately
|
|
end
|
|
|
|
def self.run_immediately!
|
|
@run_immediately = true
|
|
end
|
|
|
|
def self.run_later!
|
|
@run_immediately = false
|
|
end
|
|
|
|
def self.last_job_performed_at
|
|
Sidekiq.redis do |r|
|
|
int = r.get('last_job_perform_at')
|
|
int ? Time.at(int.to_i) : nil
|
|
end
|
|
end
|
|
|
|
def self.num_email_retry_jobs
|
|
Sidekiq::RetrySet.new.count { |job| job.klass =~ /Email$/ }
|
|
end
|
|
|
|
class Base
|
|
class JobInstrumenter
|
|
def initialize(job_class:, opts:, db:, jid:)
|
|
return unless enabled?
|
|
self.class.mutex.synchronize do
|
|
@data = {}
|
|
|
|
@data["hostname"] = Discourse.os_hostname
|
|
@data["pid"] = Process.pid # Pid
|
|
@data["database"] = db # DB name - multisite db name it ran on
|
|
@data["job_id"] = jid # Job unique ID
|
|
@data["job_name"] = job_class.name # Job Name - eg: Jobs::AboutStats
|
|
@data["job_type"] = job_class.try(:scheduled?) ? "scheduled" : "regular" # Job Type - either s for scheduled or r for regular
|
|
@data["opts"] = opts.to_json # Params - json encoded params for the job
|
|
|
|
if ENV["DISCOURSE_LOG_SIDEKIQ_INTERVAL"]
|
|
@data["status"] = "starting"
|
|
write_to_log
|
|
end
|
|
|
|
@data["status"] = "pending"
|
|
@start_timestamp = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
|
|
|
self.class.ensure_interval_logging!
|
|
@@active_jobs ||= []
|
|
@@active_jobs << self
|
|
|
|
MethodProfiler.ensure_discourse_instrumentation!
|
|
MethodProfiler.start
|
|
end
|
|
end
|
|
|
|
def stop(exception:)
|
|
return unless enabled?
|
|
self.class.mutex.synchronize do
|
|
profile = MethodProfiler.stop
|
|
|
|
@@active_jobs.delete(self)
|
|
|
|
@data["duration"] = profile[:total_duration] # Duration - length in seconds it took to run
|
|
@data["sql_duration"] = profile.dig(:sql, :duration) || 0 # Sql Duration (s)
|
|
@data["sql_calls"] = profile.dig(:sql, :calls) || 0 # Sql Statements - how many statements ran
|
|
@data["redis_duration"] = profile.dig(:redis, :duration) || 0 # Redis Duration (s)
|
|
@data["redis_calls"] = profile.dig(:redis, :calls) || 0 # Redis commands
|
|
@data["net_duration"] = profile.dig(:net, :duration) || 0 # Redis Duration (s)
|
|
@data["net_calls"] = profile.dig(:net, :calls) || 0 # Redis commands
|
|
|
|
if exception.present?
|
|
@data["exception"] = exception # Exception - if job fails a json encoded exception
|
|
@data["status"] = 'failed'
|
|
else
|
|
@data["status"] = 'success' # Status - fail, success, pending
|
|
end
|
|
|
|
write_to_log
|
|
end
|
|
end
|
|
|
|
def self.raw_log(message)
|
|
@@logger ||= begin
|
|
f = File.open "#{Rails.root}/log/sidekiq.log", "a"
|
|
f.sync = true
|
|
Logger.new f
|
|
end
|
|
@@log_queue ||= Queue.new
|
|
@@log_thread ||= Thread.new do
|
|
begin
|
|
loop { @@logger << @@log_queue.pop }
|
|
rescue Exception => e
|
|
Discourse.warn_exception(e, message: "Sidekiq logging thread terminated unexpectedly")
|
|
end
|
|
end
|
|
@@log_queue.push(message)
|
|
end
|
|
|
|
def current_duration
|
|
Process.clock_gettime(Process::CLOCK_MONOTONIC) - @start_timestamp
|
|
end
|
|
|
|
def write_to_log
|
|
return unless enabled?
|
|
@data["@timestamp"] = Time.now
|
|
@data["duration"] = current_duration if @data["status"] == "pending"
|
|
self.class.raw_log("#{@data.to_json}\n")
|
|
end
|
|
|
|
def enabled?
|
|
ENV["DISCOURSE_LOG_SIDEKIQ"] == "1"
|
|
end
|
|
|
|
def self.mutex
|
|
@@mutex ||= Mutex.new
|
|
end
|
|
|
|
def self.ensure_interval_logging!
|
|
interval = ENV["DISCOURSE_LOG_SIDEKIQ_INTERVAL"]
|
|
return if !interval
|
|
interval = interval.to_i
|
|
@@interval_thread ||= Thread.new do
|
|
begin
|
|
loop do
|
|
sleep interval
|
|
mutex.synchronize do
|
|
@@active_jobs.each { |j| j.write_to_log if j.current_duration > interval }
|
|
end
|
|
end
|
|
rescue Exception => e
|
|
Discourse.warn_exception(e, message: "Sidekiq interval logging thread terminated unexpectedly")
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
include Sidekiq::Worker
|
|
|
|
def log(*args)
|
|
args.each do |arg|
|
|
Rails.logger.info "#{Time.now.to_formatted_s(:db)}: [#{self.class.name.upcase}] #{arg}"
|
|
end
|
|
true
|
|
end
|
|
|
|
# Construct an error context object for Discourse.handle_exception
|
|
# Subclasses are encouraged to use this!
|
|
#
|
|
# `opts` is the arguments passed to execute().
|
|
# `code_desc` is a short string describing what the code was doing (optional).
|
|
# `extra` is for any other context you logged.
|
|
# Note that, when building your `extra`, that :opts, :job, and :code are used by this method,
|
|
# and :current_db and :current_hostname are used by handle_exception.
|
|
def error_context(opts, code_desc = nil, extra = {})
|
|
ctx = {}
|
|
ctx[:opts] = opts
|
|
ctx[:job] = self.class
|
|
ctx[:message] = code_desc if code_desc
|
|
ctx.merge!(extra) if extra != nil
|
|
ctx
|
|
end
|
|
|
|
def self.delayed_perform(opts = {})
|
|
self.new.perform(opts)
|
|
end
|
|
|
|
def execute(opts = {})
|
|
raise "Overwrite me!"
|
|
end
|
|
|
|
def last_db_duration
|
|
@db_duration || 0
|
|
end
|
|
|
|
def perform(*args)
|
|
opts = args.extract_options!.with_indifferent_access
|
|
|
|
if ::Jobs.run_later?
|
|
Sidekiq.redis do |r|
|
|
r.set('last_job_perform_at', Time.now.to_i)
|
|
end
|
|
end
|
|
|
|
if opts.delete(:sync_exec)
|
|
if opts.has_key?(:current_site_id) && opts[:current_site_id] != RailsMultisite::ConnectionManagement.current_db
|
|
raise ArgumentError.new("You can't connect to another database when executing a job synchronously.")
|
|
else
|
|
begin
|
|
retval = execute(opts)
|
|
rescue => exc
|
|
Discourse.handle_job_exception(exc, error_context(opts))
|
|
end
|
|
return retval
|
|
end
|
|
end
|
|
|
|
dbs =
|
|
if opts[:current_site_id]
|
|
[opts[:current_site_id]]
|
|
else
|
|
RailsMultisite::ConnectionManagement.all_dbs
|
|
end
|
|
|
|
exceptions = []
|
|
dbs.each do |db|
|
|
begin
|
|
exception = {}
|
|
|
|
RailsMultisite::ConnectionManagement.with_connection(db) do
|
|
job_instrumenter = JobInstrumenter.new(job_class: self.class, opts: opts, db: db, jid: jid)
|
|
begin
|
|
I18n.locale = SiteSetting.default_locale || SiteSettings::DefaultsProvider::DEFAULT_LOCALE
|
|
I18n.ensure_all_loaded!
|
|
begin
|
|
logster_env = {}
|
|
Logster.add_to_env(logster_env, :job, self.class.to_s)
|
|
Logster.add_to_env(logster_env, :db, db)
|
|
Thread.current[Logster::Logger::LOGSTER_ENV] = logster_env
|
|
|
|
execute(opts)
|
|
rescue => e
|
|
exception[:ex] = e
|
|
exception[:other] = { problem_db: db }
|
|
end
|
|
rescue => e
|
|
exception[:ex] = e
|
|
exception[:message] = "While establishing database connection to #{db}"
|
|
exception[:other] = { problem_db: db }
|
|
ensure
|
|
job_instrumenter.stop(exception: exception)
|
|
end
|
|
end
|
|
|
|
exceptions << exception unless exception.empty?
|
|
end
|
|
end
|
|
|
|
Thread.current[Logster::Logger::LOGSTER_ENV] = nil
|
|
|
|
if exceptions.length > 0
|
|
exceptions.each do |exception_hash|
|
|
Discourse.handle_job_exception(exception_hash[:ex], error_context(opts, exception_hash[:code], exception_hash[:other]))
|
|
end
|
|
raise HandledExceptionWrapper.new(exceptions[0][:ex])
|
|
end
|
|
|
|
nil
|
|
ensure
|
|
ActiveRecord::Base.connection_handler.clear_active_connections!
|
|
end
|
|
|
|
end
|
|
|
|
class HandledExceptionWrapper < StandardError
|
|
attr_accessor :wrapped
|
|
def initialize(ex)
|
|
super("Wrapped #{ex.class}: #{ex.message}")
|
|
@wrapped = ex
|
|
end
|
|
end
|
|
|
|
class Scheduled < Base
|
|
extend MiniScheduler::Schedule
|
|
|
|
def perform(*args)
|
|
if (::Jobs::Heartbeat === self) || !Discourse.readonly_mode?
|
|
super
|
|
end
|
|
end
|
|
end
|
|
|
|
def self.enqueue(job, opts = {})
|
|
if job.instance_of?(Class)
|
|
klass = job
|
|
else
|
|
klass = "::Jobs::#{job.to_s.camelcase}".constantize
|
|
end
|
|
|
|
# Unless we want to work on all sites
|
|
unless opts.delete(:all_sites)
|
|
opts[:current_site_id] ||= RailsMultisite::ConnectionManagement.current_db
|
|
end
|
|
|
|
# If we are able to queue a job, do it
|
|
|
|
if ::Jobs.run_later?
|
|
hash = {
|
|
'class' => klass,
|
|
'args' => [opts]
|
|
}
|
|
|
|
if delay = opts.delete(:delay_for)
|
|
if delay.to_f > 0
|
|
hash['at'] = Time.now.to_f + delay.to_f
|
|
end
|
|
end
|
|
|
|
if queue = opts.delete(:queue)
|
|
hash['queue'] = queue
|
|
end
|
|
|
|
klass.client_push(hash)
|
|
else
|
|
# Otherwise execute the job right away
|
|
opts.delete(:delay_for)
|
|
opts.delete(:queue)
|
|
|
|
opts[:sync_exec] = true
|
|
if Rails.env == "development"
|
|
Scheduler::Defer.later("job") do
|
|
klass.new.perform(opts)
|
|
end
|
|
else
|
|
# Run the job synchronously
|
|
# But never run a job inside another job
|
|
# That could cause deadlocks during test runs
|
|
queue = Thread.current[:discourse_nested_job_queue]
|
|
outermost_job = !queue
|
|
|
|
if outermost_job
|
|
queue = Queue.new
|
|
Thread.current[:discourse_nested_job_queue] = queue
|
|
end
|
|
|
|
queue.push([klass, opts])
|
|
|
|
if outermost_job
|
|
# responsible for executing the queue
|
|
begin
|
|
until queue.empty?
|
|
queued_klass, queued_opts = queue.pop(true)
|
|
queued_klass.new.perform(queued_opts)
|
|
end
|
|
ensure
|
|
Thread.current[:discourse_nested_job_queue] = nil
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
end
|
|
|
|
def self.enqueue_in(secs, job_name, opts = {})
|
|
enqueue(job_name, opts.merge!(delay_for: secs))
|
|
end
|
|
|
|
def self.enqueue_at(datetime, job_name, opts = {})
|
|
secs = [(datetime - Time.zone.now).to_i, 0].max
|
|
enqueue_in(secs, job_name, opts)
|
|
end
|
|
|
|
def self.cancel_scheduled_job(job_name, opts = {})
|
|
scheduled_for(job_name, opts).each(&:delete)
|
|
end
|
|
|
|
def self.scheduled_for(job_name, opts = {})
|
|
opts = opts.with_indifferent_access
|
|
unless opts.delete(:all_sites)
|
|
opts[:current_site_id] ||= RailsMultisite::ConnectionManagement.current_db
|
|
end
|
|
|
|
job_class = "Jobs::#{job_name.to_s.camelcase}"
|
|
Sidekiq::ScheduledSet.new.select do |scheduled_job|
|
|
if scheduled_job.klass.to_s == job_class
|
|
matched = true
|
|
job_params = scheduled_job.item["args"][0].with_indifferent_access
|
|
opts.each do |key, value|
|
|
if job_params[key] != value
|
|
matched = false
|
|
break
|
|
end
|
|
end
|
|
matched
|
|
else
|
|
false
|
|
end
|
|
end
|
|
end
|
|
end
|