discourse/app/jobs/base.rb
David Taylor 8a3d9d7036
DEV: Run jobs sequentially in test mode (#9897)
When running jobs in tests, we use `Jobs.run_immediately!`. This means that jobs are run synchronously when they are enqueued. Jobs sometimes enqueue other jobs, which are also executed synchronously. This means that the outermost job will block until the inner jobs have finished executing. In some cases (e.g. process_post with hotlinked images) this can lead to a deadlock.

This commit changes the behavior slightly. Now we will never run jobs inside other jobs. Instead, we will queue them up and run them sequentially in the order they were enqueued. As a whole, they are still executed synchronously. Consider the example

```ruby
class Jobs::InnerJob < Jobs::Base
  def execute(args)
    puts "Running inner job"
  end
end

class Jobs::OuterJob < Jobs::Base
  def execute(args)
    puts "Starting outer job"
    Jobs.enqueue(:inner_job)
    puts "Finished outer job"
  end
end

Jobs.enqueue(:outer_job)
puts "All jobs complete"
```

The old behavior would result in:

```
Starting outer job
Running inner job
Finished outer job
All jobs complete
```

The new behavior will result in:
```
Starting outer job
Finished outer job
Running inner job
All jobs complete
```
2020-05-28 12:52:27 +01:00

392 lines
11 KiB
Ruby

# frozen_string_literal: true
module Jobs
def self.queued
Sidekiq::Stats.new.enqueued
end
def self.run_later?
!@run_immediately
end
def self.run_immediately?
!!@run_immediately
end
def self.run_immediately!
@run_immediately = true
end
def self.run_later!
@run_immediately = false
end
def self.last_job_performed_at
Sidekiq.redis do |r|
int = r.get('last_job_perform_at')
int ? Time.at(int.to_i) : nil
end
end
def self.num_email_retry_jobs
Sidekiq::RetrySet.new.count { |job| job.klass =~ /Email$/ }
end
class Base
class JobInstrumenter
def initialize(job_class:, opts:, db:, jid:)
return unless enabled?
self.class.mutex.synchronize do
@data = {}
@data["hostname"] = Discourse.os_hostname
@data["pid"] = Process.pid # Pid
@data["database"] = db # DB name - multisite db name it ran on
@data["job_id"] = jid # Job unique ID
@data["job_name"] = job_class.name # Job Name - eg: Jobs::AboutStats
@data["job_type"] = job_class.try(:scheduled?) ? "scheduled" : "regular" # Job Type - either s for scheduled or r for regular
@data["opts"] = opts.to_json # Params - json encoded params for the job
if ENV["DISCOURSE_LOG_SIDEKIQ_INTERVAL"]
@data["status"] = "starting"
write_to_log
end
@data["status"] = "pending"
@start_timestamp = Process.clock_gettime(Process::CLOCK_MONOTONIC)
self.class.ensure_interval_logging!
@@active_jobs ||= []
@@active_jobs << self
MethodProfiler.ensure_discourse_instrumentation!
MethodProfiler.start
end
end
def stop(exception:)
return unless enabled?
self.class.mutex.synchronize do
profile = MethodProfiler.stop
@@active_jobs.delete(self)
@data["duration"] = profile[:total_duration] # Duration - length in seconds it took to run
@data["sql_duration"] = profile.dig(:sql, :duration) || 0 # Sql Duration (s)
@data["sql_calls"] = profile.dig(:sql, :calls) || 0 # Sql Statements - how many statements ran
@data["redis_duration"] = profile.dig(:redis, :duration) || 0 # Redis Duration (s)
@data["redis_calls"] = profile.dig(:redis, :calls) || 0 # Redis commands
@data["net_duration"] = profile.dig(:net, :duration) || 0 # Redis Duration (s)
@data["net_calls"] = profile.dig(:net, :calls) || 0 # Redis commands
if exception.present?
@data["exception"] = exception # Exception - if job fails a json encoded exception
@data["status"] = 'failed'
else
@data["status"] = 'success' # Status - fail, success, pending
end
write_to_log
end
end
def self.raw_log(message)
@@logger ||= begin
f = File.open "#{Rails.root}/log/sidekiq.log", "a"
f.sync = true
Logger.new f
end
@@log_queue ||= Queue.new
@@log_thread ||= Thread.new do
begin
loop { @@logger << @@log_queue.pop }
rescue Exception => e
Discourse.warn_exception(e, message: "Sidekiq logging thread terminated unexpectedly")
end
end
@@log_queue.push(message)
end
def current_duration
Process.clock_gettime(Process::CLOCK_MONOTONIC) - @start_timestamp
end
def write_to_log
return unless enabled?
@data["@timestamp"] = Time.now
@data["duration"] = current_duration if @data["status"] == "pending"
self.class.raw_log("#{@data.to_json}\n")
end
def enabled?
ENV["DISCOURSE_LOG_SIDEKIQ"] == "1"
end
def self.mutex
@@mutex ||= Mutex.new
end
def self.ensure_interval_logging!
interval = ENV["DISCOURSE_LOG_SIDEKIQ_INTERVAL"]
return if !interval
interval = interval.to_i
@@interval_thread ||= Thread.new do
begin
loop do
sleep interval
mutex.synchronize do
@@active_jobs.each { |j| j.write_to_log if j.current_duration > interval }
end
end
rescue Exception => e
Discourse.warn_exception(e, message: "Sidekiq interval logging thread terminated unexpectedly")
end
end
end
end
include Sidekiq::Worker
def log(*args)
args.each do |arg|
Rails.logger.info "#{Time.now.to_formatted_s(:db)}: [#{self.class.name.upcase}] #{arg}"
end
true
end
# Construct an error context object for Discourse.handle_exception
# Subclasses are encouraged to use this!
#
# `opts` is the arguments passed to execute().
# `code_desc` is a short string describing what the code was doing (optional).
# `extra` is for any other context you logged.
# Note that, when building your `extra`, that :opts, :job, and :code are used by this method,
# and :current_db and :current_hostname are used by handle_exception.
def error_context(opts, code_desc = nil, extra = {})
ctx = {}
ctx[:opts] = opts
ctx[:job] = self.class
ctx[:message] = code_desc if code_desc
ctx.merge!(extra) if extra != nil
ctx
end
def self.delayed_perform(opts = {})
self.new.perform(opts)
end
def execute(opts = {})
raise "Overwrite me!"
end
def last_db_duration
@db_duration || 0
end
def perform(*args)
opts = args.extract_options!.with_indifferent_access
if ::Jobs.run_later?
Sidekiq.redis do |r|
r.set('last_job_perform_at', Time.now.to_i)
end
end
if opts.delete(:sync_exec)
if opts.has_key?(:current_site_id) && opts[:current_site_id] != RailsMultisite::ConnectionManagement.current_db
raise ArgumentError.new("You can't connect to another database when executing a job synchronously.")
else
begin
retval = execute(opts)
rescue => exc
Discourse.handle_job_exception(exc, error_context(opts))
end
return retval
end
end
dbs =
if opts[:current_site_id]
[opts[:current_site_id]]
else
RailsMultisite::ConnectionManagement.all_dbs
end
exceptions = []
dbs.each do |db|
begin
exception = {}
RailsMultisite::ConnectionManagement.with_connection(db) do
job_instrumenter = JobInstrumenter.new(job_class: self.class, opts: opts, db: db, jid: jid)
begin
I18n.locale = SiteSetting.default_locale || SiteSettings::DefaultsProvider::DEFAULT_LOCALE
I18n.ensure_all_loaded!
begin
logster_env = {}
Logster.add_to_env(logster_env, :job, self.class.to_s)
Logster.add_to_env(logster_env, :db, db)
Thread.current[Logster::Logger::LOGSTER_ENV] = logster_env
execute(opts)
rescue => e
exception[:ex] = e
exception[:other] = { problem_db: db }
end
rescue => e
exception[:ex] = e
exception[:message] = "While establishing database connection to #{db}"
exception[:other] = { problem_db: db }
ensure
job_instrumenter.stop(exception: exception)
end
end
exceptions << exception unless exception.empty?
end
end
Thread.current[Logster::Logger::LOGSTER_ENV] = nil
if exceptions.length > 0
exceptions.each do |exception_hash|
Discourse.handle_job_exception(exception_hash[:ex], error_context(opts, exception_hash[:code], exception_hash[:other]))
end
raise HandledExceptionWrapper.new(exceptions[0][:ex])
end
nil
ensure
ActiveRecord::Base.connection_handler.clear_active_connections!
end
end
class HandledExceptionWrapper < StandardError
attr_accessor :wrapped
def initialize(ex)
super("Wrapped #{ex.class}: #{ex.message}")
@wrapped = ex
end
end
class Scheduled < Base
extend MiniScheduler::Schedule
def perform(*args)
if (::Jobs::Heartbeat === self) || !Discourse.readonly_mode?
super
end
end
end
def self.enqueue(job, opts = {})
if job.instance_of?(Class)
klass = job
else
klass = "::Jobs::#{job.to_s.camelcase}".constantize
end
# Unless we want to work on all sites
unless opts.delete(:all_sites)
opts[:current_site_id] ||= RailsMultisite::ConnectionManagement.current_db
end
# If we are able to queue a job, do it
if ::Jobs.run_later?
hash = {
'class' => klass,
'args' => [opts]
}
if delay = opts.delete(:delay_for)
if delay.to_f > 0
hash['at'] = Time.now.to_f + delay.to_f
end
end
if queue = opts.delete(:queue)
hash['queue'] = queue
end
klass.client_push(hash)
else
# Otherwise execute the job right away
opts.delete(:delay_for)
opts.delete(:queue)
opts[:sync_exec] = true
if Rails.env == "development"
Scheduler::Defer.later("job") do
klass.new.perform(opts)
end
else
# Run the job synchronously
# But never run a job inside another job
# That could cause deadlocks during test runs
queue = Thread.current[:discourse_nested_job_queue]
outermost_job = !queue
if outermost_job
queue = Queue.new
Thread.current[:discourse_nested_job_queue] = queue
end
queue.push([klass, opts])
if outermost_job
# responsible for executing the queue
begin
until queue.empty?
queued_klass, queued_opts = queue.pop(true)
queued_klass.new.perform(queued_opts)
end
ensure
Thread.current[:discourse_nested_job_queue] = nil
end
end
end
end
end
def self.enqueue_in(secs, job_name, opts = {})
enqueue(job_name, opts.merge!(delay_for: secs))
end
def self.enqueue_at(datetime, job_name, opts = {})
secs = [(datetime - Time.zone.now).to_i, 0].max
enqueue_in(secs, job_name, opts)
end
def self.cancel_scheduled_job(job_name, opts = {})
scheduled_for(job_name, opts).each(&:delete)
end
def self.scheduled_for(job_name, opts = {})
opts = opts.with_indifferent_access
unless opts.delete(:all_sites)
opts[:current_site_id] ||= RailsMultisite::ConnectionManagement.current_db
end
job_class = "Jobs::#{job_name.to_s.camelcase}"
Sidekiq::ScheduledSet.new.select do |scheduled_job|
if scheduled_job.klass.to_s == job_class
matched = true
job_params = scheduled_job.item["args"][0].with_indifferent_access
opts.each do |key, value|
if job_params[key] != value
matched = false
break
end
end
matched
else
false
end
end
end
end