discourse/app/jobs/base.rb

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

489 lines
14 KiB
Ruby
Raw Normal View History

# frozen_string_literal: true
2013-02-06 03:16:51 +08:00
module Jobs
def self.queued
Sidekiq::Stats.new.enqueued
end
def self.run_later?
!@run_immediately
end
def self.run_immediately?
!!@run_immediately
end
def self.run_immediately!
@run_immediately = true
end
def self.run_later!
@run_immediately = false
end
def self.with_immediate_jobs
prior = @run_immediately
run_immediately!
yield
ensure
@run_immediately = prior
end
def self.last_job_performed_at
Sidekiq.redis do |r|
int = r.get("last_job_perform_at")
int ? Time.at(int.to_i) : nil
end
end
def self.num_email_retry_jobs
Sidekiq::RetrySet.new.count { |job| job.klass =~ /Email\z/ }
end
2013-02-06 03:16:51 +08:00
class Base
class JobInstrumenter
def initialize(job_class:, opts:, db:, jid:)
return unless enabled?
self.class.mutex.synchronize do
@data = {}
@data["hostname"] = Discourse.os_hostname
@data["pid"] = Process.pid # Pid
@data["database"] = db # DB name - multisite db name it ran on
@data["job_id"] = jid # Job unique ID
@data["job_name"] = job_class.name # Job Name - eg: Jobs::AboutStats
@data["job_type"] = job_class.try(:scheduled?) ? "scheduled" : "regular" # Job Type - either s for scheduled or r for regular
@data["opts"] = opts.to_json # Params - json encoded params for the job
if ENV["DISCOURSE_LOG_SIDEKIQ_INTERVAL"]
@data["status"] = "starting"
write_to_log
end
@data["status"] = "pending"
@start_timestamp = Process.clock_gettime(Process::CLOCK_MONOTONIC)
self.class.ensure_interval_logging!
@@active_jobs ||= []
@@active_jobs << self
MethodProfiler.ensure_discourse_instrumentation!
MethodProfiler.start
@data["live_slots_start"] = GC.stat[:heap_live_slots]
end
end
def stop(exception:)
return unless enabled?
self.class.mutex.synchronize do
profile = MethodProfiler.stop
@@active_jobs.delete(self)
@data["duration"] = profile[:total_duration] # Duration - length in seconds it took to run
@data["sql_duration"] = profile.dig(:sql, :duration) || 0 # Sql Duration (s)
@data["sql_calls"] = profile.dig(:sql, :calls) || 0 # Sql Statements - how many statements ran
@data["redis_duration"] = profile.dig(:redis, :duration) || 0 # Redis Duration (s)
@data["redis_calls"] = profile.dig(:redis, :calls) || 0 # Redis commands
@data["net_duration"] = profile.dig(:net, :duration) || 0 # Redis Duration (s)
@data["net_calls"] = profile.dig(:net, :calls) || 0 # Redis commands
@data["live_slots_finish"] = GC.stat[:heap_live_slots]
if exception.present?
@data["exception"] = exception # Exception - if job fails a json encoded exception
@data["status"] = "failed"
else
@data["status"] = "success" # Status - fail, success, pending
end
write_to_log
end
end
def self.raw_log(message)
@@logger ||=
begin
f = File.open "#{Rails.root}/log/sidekiq.log", "a"
f.sync = true
Logger.new f
end
@@log_queue ||= Queue.new
if !defined?(@@log_thread) || !@@log_thread.alive?
@@log_thread =
Thread.new do
loop do
@@logger << @@log_queue.pop
rescue Exception => e
Discourse.warn_exception(
e,
message: "Exception encountered while logging Sidekiq job",
)
end
end
end
@@log_queue.push(message)
end
def current_duration
Process.clock_gettime(Process::CLOCK_MONOTONIC) - @start_timestamp
end
def write_to_log
return unless enabled?
@data["@timestamp"] = Time.now
@data["duration"] = current_duration if @data["status"] == "pending"
self.class.raw_log("#{@data.to_json}\n")
if live_slots_limit > 0 && @data["live_slots_start"].present? &&
@data["live_slots_finish"].present?
live_slots = @data["live_slots_finish"] - @data["live_slots_start"]
if live_slots >= live_slots_limit
Rails.logger.warn(
"Sidekiq Job '#{@data["job_name"]}' allocated #{live_slots} objects in the heap: #{@data.inspect}",
)
end
end
end
def enabled?
Discourse.enable_sidekiq_logging?
end
2013-02-06 03:16:51 +08:00
def live_slots_limit
@live_slots_limit ||= ENV["DISCOURSE_LIVE_SLOTS_SIDEKIQ_LIMIT"].to_i
end
def self.mutex
@@mutex ||= Mutex.new
end
def self.ensure_interval_logging!
interval = ENV["DISCOURSE_LOG_SIDEKIQ_INTERVAL"]
return if !interval
interval = interval.to_i
@@interval_thread ||=
Thread.new do
begin
loop do
sleep interval
mutex.synchronize do
@@active_jobs.each { |j| j.write_to_log if j.current_duration > interval }
end
end
rescue Exception => e
Discourse.warn_exception(
e,
message: "Sidekiq interval logging thread terminated unexpectedly",
)
end
end
end
end
include Sidekiq::Worker
def self.cluster_concurrency(val)
raise ArgumentError, "cluster_concurrency must be 1 or nil" if val != 1 && val != nil
@cluster_concurrency = val
end
def self.get_cluster_concurrency
@cluster_concurrency
end
2013-07-01 02:00:23 +08:00
def log(*args)
args.each do |arg|
Rails.logger.info "#{Time.now.to_formatted_s(:db)}: [#{self.class.name.upcase}] #{arg}"
end
true
end
# Construct an error context object for Discourse.handle_exception
# Subclasses are encouraged to use this!
#
# `opts` is the arguments passed to execute().
# `code_desc` is a short string describing what the code was doing (optional).
# `extra` is for any other context you logged.
# Note that, when building your `extra`, that :opts, :job, and :code are used by this method,
# and :current_db and :current_hostname are used by handle_exception.
def error_context(opts, code_desc = nil, extra = {})
ctx = {}
ctx[:opts] = opts
ctx[:job] = self.class
2014-07-18 06:19:58 +08:00
ctx[:message] = code_desc if code_desc
ctx.merge!(extra) if extra != nil
ctx
end
2013-02-06 03:16:51 +08:00
def self.delayed_perform(opts = {})
self.new.perform(opts)
end
def execute(opts = {})
raise "Overwrite me!"
end
def last_db_duration
@db_duration || 0
end
DEV: Fix threading error when running jobs immediately in system tests (#19811) ``` class Jobs::DummyDelayedJob < Jobs::Base def execute(args = {}) end end RSpec.describe "Jobs.run_immediately!" do before { Jobs.run_immediately! } it "explodes" do current_user = Fabricate(:user) Jobs.enqueue_in(1.seconds, :dummy_delayed_job) sign_in(current_user) end end ``` The test above will fail with the following error if `ActiveRecord::Base.connection_handler.clear_active_connections!` is called before the configured Capybara server checks out a connection from the connection pool. ``` ActiveRecord::ActiveRecordError: Cannot expire connection, it is owned by a different thread: #<Thread:0x00007f437391df58@puma srv tp 001 /home/tgxworld/.asdf/installs/ruby/3.1.3/lib/ruby/gems/3.1.0/gems/puma-6.0.2/lib/puma/thread_pool.rb:106 sleep_forever>. Current thread: #<Thread:0x00007f437d6cfc60 run>. ``` We're not exactly sure if this is an ActiveRecord bug or not but we've invested too much time into investigating this problem. Fundamentally, we also no longer understand why `ActiveRecord::Base.connection_handler.clear_active_connections!` is being called in an ensure block within `Jobs::Base#perform` which was added in ceddb6e0da92e874b3bee9543915726f1d27d60b 10 years ago. This commit moves the logic for running jobs immediately out of the `Jobs::Base#perform` method into another `Jobs::Base#perform_immediately` method such that `ActiveRecord::Base.connection_handler.clear_active_connections!` is not called. This change will only impact the test environment.
2023-01-10 13:41:25 +08:00
def perform_immediately(*args)
2013-08-08 01:25:05 +08:00
opts = args.extract_options!.with_indifferent_access
2013-02-26 00:42:20 +08:00
DEV: Fix threading error when running jobs immediately in system tests (#19811) ``` class Jobs::DummyDelayedJob < Jobs::Base def execute(args = {}) end end RSpec.describe "Jobs.run_immediately!" do before { Jobs.run_immediately! } it "explodes" do current_user = Fabricate(:user) Jobs.enqueue_in(1.seconds, :dummy_delayed_job) sign_in(current_user) end end ``` The test above will fail with the following error if `ActiveRecord::Base.connection_handler.clear_active_connections!` is called before the configured Capybara server checks out a connection from the connection pool. ``` ActiveRecord::ActiveRecordError: Cannot expire connection, it is owned by a different thread: #<Thread:0x00007f437391df58@puma srv tp 001 /home/tgxworld/.asdf/installs/ruby/3.1.3/lib/ruby/gems/3.1.0/gems/puma-6.0.2/lib/puma/thread_pool.rb:106 sleep_forever>. Current thread: #<Thread:0x00007f437d6cfc60 run>. ``` We're not exactly sure if this is an ActiveRecord bug or not but we've invested too much time into investigating this problem. Fundamentally, we also no longer understand why `ActiveRecord::Base.connection_handler.clear_active_connections!` is being called in an ensure block within `Jobs::Base#perform` which was added in ceddb6e0da92e874b3bee9543915726f1d27d60b 10 years ago. This commit moves the logic for running jobs immediately out of the `Jobs::Base#perform` method into another `Jobs::Base#perform_immediately` method such that `ActiveRecord::Base.connection_handler.clear_active_connections!` is not called. This change will only impact the test environment.
2023-01-10 13:41:25 +08:00
if opts.has_key?(:current_site_id) &&
opts[:current_site_id] != RailsMultisite::ConnectionManagement.current_db
raise ArgumentError.new(
"You can't connect to another database when executing a job synchronously.",
)
else
begin
retval = execute(opts)
rescue => exc
Discourse.handle_job_exception(exc, error_context(opts))
2013-02-06 03:16:51 +08:00
end
DEV: Fix threading error when running jobs immediately in system tests (#19811) ``` class Jobs::DummyDelayedJob < Jobs::Base def execute(args = {}) end end RSpec.describe "Jobs.run_immediately!" do before { Jobs.run_immediately! } it "explodes" do current_user = Fabricate(:user) Jobs.enqueue_in(1.seconds, :dummy_delayed_job) sign_in(current_user) end end ``` The test above will fail with the following error if `ActiveRecord::Base.connection_handler.clear_active_connections!` is called before the configured Capybara server checks out a connection from the connection pool. ``` ActiveRecord::ActiveRecordError: Cannot expire connection, it is owned by a different thread: #<Thread:0x00007f437391df58@puma srv tp 001 /home/tgxworld/.asdf/installs/ruby/3.1.3/lib/ruby/gems/3.1.0/gems/puma-6.0.2/lib/puma/thread_pool.rb:106 sleep_forever>. Current thread: #<Thread:0x00007f437d6cfc60 run>. ``` We're not exactly sure if this is an ActiveRecord bug or not but we've invested too much time into investigating this problem. Fundamentally, we also no longer understand why `ActiveRecord::Base.connection_handler.clear_active_connections!` is being called in an ensure block within `Jobs::Base#perform` which was added in ceddb6e0da92e874b3bee9543915726f1d27d60b 10 years ago. This commit moves the logic for running jobs immediately out of the `Jobs::Base#perform` method into another `Jobs::Base#perform_immediately` method such that `ActiveRecord::Base.connection_handler.clear_active_connections!` is not called. This change will only impact the test environment.
2023-01-10 13:41:25 +08:00
retval
2013-02-06 03:16:51 +08:00
end
DEV: Fix threading error when running jobs immediately in system tests (#19811) ``` class Jobs::DummyDelayedJob < Jobs::Base def execute(args = {}) end end RSpec.describe "Jobs.run_immediately!" do before { Jobs.run_immediately! } it "explodes" do current_user = Fabricate(:user) Jobs.enqueue_in(1.seconds, :dummy_delayed_job) sign_in(current_user) end end ``` The test above will fail with the following error if `ActiveRecord::Base.connection_handler.clear_active_connections!` is called before the configured Capybara server checks out a connection from the connection pool. ``` ActiveRecord::ActiveRecordError: Cannot expire connection, it is owned by a different thread: #<Thread:0x00007f437391df58@puma srv tp 001 /home/tgxworld/.asdf/installs/ruby/3.1.3/lib/ruby/gems/3.1.0/gems/puma-6.0.2/lib/puma/thread_pool.rb:106 sleep_forever>. Current thread: #<Thread:0x00007f437d6cfc60 run>. ``` We're not exactly sure if this is an ActiveRecord bug or not but we've invested too much time into investigating this problem. Fundamentally, we also no longer understand why `ActiveRecord::Base.connection_handler.clear_active_connections!` is being called in an ensure block within `Jobs::Base#perform` which was added in ceddb6e0da92e874b3bee9543915726f1d27d60b 10 years ago. This commit moves the logic for running jobs immediately out of the `Jobs::Base#perform` method into another `Jobs::Base#perform_immediately` method such that `ActiveRecord::Base.connection_handler.clear_active_connections!` is not called. This change will only impact the test environment.
2023-01-10 13:41:25 +08:00
end
def self.cluster_concurrency_redis_key
"cluster_concurrency:#{self}"
end
def self.clear_cluster_concurrency_lock!
Discourse.redis.without_namespace.del(cluster_concurrency_redis_key)
end
def self.acquire_cluster_concurrency_lock!
!!Discourse.redis.without_namespace.set(cluster_concurrency_redis_key, 0, nx: true, ex: 120)
end
DEV: Fix threading error when running jobs immediately in system tests (#19811) ``` class Jobs::DummyDelayedJob < Jobs::Base def execute(args = {}) end end RSpec.describe "Jobs.run_immediately!" do before { Jobs.run_immediately! } it "explodes" do current_user = Fabricate(:user) Jobs.enqueue_in(1.seconds, :dummy_delayed_job) sign_in(current_user) end end ``` The test above will fail with the following error if `ActiveRecord::Base.connection_handler.clear_active_connections!` is called before the configured Capybara server checks out a connection from the connection pool. ``` ActiveRecord::ActiveRecordError: Cannot expire connection, it is owned by a different thread: #<Thread:0x00007f437391df58@puma srv tp 001 /home/tgxworld/.asdf/installs/ruby/3.1.3/lib/ruby/gems/3.1.0/gems/puma-6.0.2/lib/puma/thread_pool.rb:106 sleep_forever>. Current thread: #<Thread:0x00007f437d6cfc60 run>. ``` We're not exactly sure if this is an ActiveRecord bug or not but we've invested too much time into investigating this problem. Fundamentally, we also no longer understand why `ActiveRecord::Base.connection_handler.clear_active_connections!` is being called in an ensure block within `Jobs::Base#perform` which was added in ceddb6e0da92e874b3bee9543915726f1d27d60b 10 years ago. This commit moves the logic for running jobs immediately out of the `Jobs::Base#perform` method into another `Jobs::Base#perform_immediately` method such that `ActiveRecord::Base.connection_handler.clear_active_connections!` is not called. This change will only impact the test environment.
2023-01-10 13:41:25 +08:00
def perform(*args)
requeued = false
keepalive_thread = nil
finished = false
if self.class.get_cluster_concurrency
if !self.class.acquire_cluster_concurrency_lock!
self.class.perform_in(10.seconds, *args)
requeued = true
return
end
parent_thread = Thread.current
cluster_concurrency_redis_key = self.class.cluster_concurrency_redis_key
keepalive_thread =
Thread.new do
while parent_thread.alive? && !finished
Discourse.redis.without_namespace.expire(cluster_concurrency_redis_key, 120)
DEV: Fix job cluster concurrency spec timing out (#25035) Why this change? On CI, we have been seeing the "handles job concurrency" job timing out on CI after 45 seconds. Upon closer inspection of `Jobs::Base#perform` when cluster concurrency has been set, we see that a thread is spun up to extend the expiring of a redis key by 120 seconds every 60 seconds while the job is still being executed. The thread looks like this before the fix: ``` keepalive_thread = Thread.new do while parent_thread.alive? && !finished Discourse.redis.without_namespace.expire(cluster_concurrency_redis_key, 120) sleep 60 end end ``` In an ensure block of `Jobs::Base#perform`, the thread is stop by doing something like this: ``` finished = true keepalive_thread.wakeup keepalive_thread.join ``` If the thread is sleeping, `keepalive_thread.wakeup` will stop the `sleep` method and run the next iteration causing the thread to complete. However, there is a timing issue at play here. If `keepalive_thread.wakeup` is called at a time when the thread is not sleeping, it will have no effect and the thread may end up sleeping for 60 seconds which is longer than our timeout on CI of 45 seconds. What does this change do? 1. Change `sleep 60` to sleep in intervals of 1 second checking if the job has been finished each time. 2. Add `use_redis_snapshotting` to `Jobs::Base` spec since Redis is involved in scheduling and we want to ensure we don't leak Redis keys. 3. Add `ConcurrentJob.stop!` and `thread.join` to `ensure` block in "handles job concurrency" test since a failing expectation will cause us to not clean up the thread we created in the test.
2023-12-26 14:47:03 +08:00
# Sleep for 60 seconds, but wake up every second to check if the job has been completed
60.times do
break if finished
sleep 1
end
end
end
end
DEV: Fix threading error when running jobs immediately in system tests (#19811) ``` class Jobs::DummyDelayedJob < Jobs::Base def execute(args = {}) end end RSpec.describe "Jobs.run_immediately!" do before { Jobs.run_immediately! } it "explodes" do current_user = Fabricate(:user) Jobs.enqueue_in(1.seconds, :dummy_delayed_job) sign_in(current_user) end end ``` The test above will fail with the following error if `ActiveRecord::Base.connection_handler.clear_active_connections!` is called before the configured Capybara server checks out a connection from the connection pool. ``` ActiveRecord::ActiveRecordError: Cannot expire connection, it is owned by a different thread: #<Thread:0x00007f437391df58@puma srv tp 001 /home/tgxworld/.asdf/installs/ruby/3.1.3/lib/ruby/gems/3.1.0/gems/puma-6.0.2/lib/puma/thread_pool.rb:106 sleep_forever>. Current thread: #<Thread:0x00007f437d6cfc60 run>. ``` We're not exactly sure if this is an ActiveRecord bug or not but we've invested too much time into investigating this problem. Fundamentally, we also no longer understand why `ActiveRecord::Base.connection_handler.clear_active_connections!` is being called in an ensure block within `Jobs::Base#perform` which was added in ceddb6e0da92e874b3bee9543915726f1d27d60b 10 years ago. This commit moves the logic for running jobs immediately out of the `Jobs::Base#perform` method into another `Jobs::Base#perform_immediately` method such that `ActiveRecord::Base.connection_handler.clear_active_connections!` is not called. This change will only impact the test environment.
2023-01-10 13:41:25 +08:00
opts = args.extract_options!.with_indifferent_access
Sidekiq.redis { |r| r.set("last_job_perform_at", Time.now.to_i) } if ::Jobs.run_later?
2013-02-06 03:16:51 +08:00
dbs =
2013-02-26 00:42:20 +08:00
if opts[:current_site_id]
2013-02-06 03:16:51 +08:00
[opts[:current_site_id]]
2013-02-26 00:42:20 +08:00
else
2013-02-06 03:16:51 +08:00
RailsMultisite::ConnectionManagement.all_dbs
end
exceptions = []
2013-02-06 03:16:51 +08:00
dbs.each do |db|
begin
exception = {}
2017-10-11 17:45:19 +08:00
RailsMultisite::ConnectionManagement.with_connection(db) do
job_instrumenter =
JobInstrumenter.new(job_class: self.class, opts: opts, db: db, jid: jid)
begin
I18n.locale =
SiteSetting.default_locale || SiteSettings::DefaultsProvider::DEFAULT_LOCALE
I18n.ensure_all_loaded!
begin
logster_env = {}
Logster.add_to_env(logster_env, :job, self.class.to_s)
Logster.add_to_env(logster_env, :db, db)
Thread.current[Logster::Logger::LOGSTER_ENV] = logster_env
execute(opts)
rescue => e
exception[:ex] = e
exception[:other] = { problem_db: db }
end
rescue => e
exception[:ex] = e
exception[:message] = "While establishing database connection to #{db}"
exception[:other] = { problem_db: db }
ensure
job_instrumenter.stop(exception: exception)
end
end
exceptions << exception unless exception.empty?
2013-02-06 03:16:51 +08:00
end
end
Thread.current[Logster::Logger::LOGSTER_ENV] = nil
if exceptions.length > 0
exceptions.each do |exception_hash|
Discourse.handle_job_exception(
exception_hash[:ex],
error_context(opts, exception_hash[:code], exception_hash[:other]),
)
end
raise HandledExceptionWrapper.new(exceptions[0][:ex])
end
nil
ensure
if self.class.get_cluster_concurrency && !requeued
finished = true
keepalive_thread.wakeup
keepalive_thread.join
self.class.clear_cluster_concurrency_lock!
end
ActiveRecord::Base.connection_handler.clear_active_connections!
2013-02-06 03:16:51 +08:00
end
end
class HandledExceptionWrapper < StandardError
attr_accessor :wrapped
def initialize(ex)
super("Wrapped #{ex.class}: #{ex.message}")
@wrapped = ex
end
end
2013-08-08 01:25:05 +08:00
class Scheduled < Base
extend MiniScheduler::Schedule
def perform(*args)
super if (::Jobs::Heartbeat === self) || !Discourse.readonly_mode?
end
2013-08-08 01:25:05 +08:00
end
2013-02-06 03:16:51 +08:00
def self.enqueue(job, opts = {})
if job.instance_of?(Class)
klass = job
else
klass = "::Jobs::#{job.to_s.camelcase}".constantize
end
2013-02-06 03:16:51 +08:00
# Unless we want to work on all sites
unless opts.delete(:all_sites)
opts[:current_site_id] ||= RailsMultisite::ConnectionManagement.current_db
end
delay = opts.delete(:delay_for)
queue = opts.delete(:queue)
# Only string keys are allowed in JSON. We call `.with_indifferent_access`
# in Jobs::Base#perform, so this is invisible to developers
opts = opts.deep_stringify_keys
# Simulate the args being dumped/parsed through JSON
parsed_opts = JSON.parse(JSON.dump(opts))
if opts != parsed_opts
Discourse.deprecate(<<~TEXT.squish, since: "2.9", drop_from: "3.0", output_in_test: true)
#{klass.name} was enqueued with argument values which do not cleanly serialize to/from JSON.
This means that the job will be run with slightly different values than the ones supplied to `enqueue`.
Argument values should be strings, booleans, numbers, or nil (or arrays/hashes of those value types).
TEXT
end
opts = parsed_opts
if ::Jobs.run_later?
hash = { "class" => klass, "args" => [opts] }
if delay
hash["at"] = Time.now.to_f + delay.to_f if delay.to_f > 0
end
hash["queue"] = queue if queue
DB.after_commit { klass.client_push(hash) }
2013-02-06 03:16:51 +08:00
else
if Rails.env == "development"
Scheduler::Defer.later("job") { klass.new.perform(opts) }
else
# Run the job synchronously
# But never run a job inside another job
# That could cause deadlocks during test runs
queue = Thread.current[:discourse_nested_job_queue]
outermost_job = !queue
if outermost_job
queue = Queue.new
Thread.current[:discourse_nested_job_queue] = queue
end
queue.push([klass, opts])
if outermost_job
# responsible for executing the queue
begin
until queue.empty?
queued_klass, queued_opts = queue.pop(true)
DEV: Fix threading error when running jobs immediately in system tests (#19811) ``` class Jobs::DummyDelayedJob < Jobs::Base def execute(args = {}) end end RSpec.describe "Jobs.run_immediately!" do before { Jobs.run_immediately! } it "explodes" do current_user = Fabricate(:user) Jobs.enqueue_in(1.seconds, :dummy_delayed_job) sign_in(current_user) end end ``` The test above will fail with the following error if `ActiveRecord::Base.connection_handler.clear_active_connections!` is called before the configured Capybara server checks out a connection from the connection pool. ``` ActiveRecord::ActiveRecordError: Cannot expire connection, it is owned by a different thread: #<Thread:0x00007f437391df58@puma srv tp 001 /home/tgxworld/.asdf/installs/ruby/3.1.3/lib/ruby/gems/3.1.0/gems/puma-6.0.2/lib/puma/thread_pool.rb:106 sleep_forever>. Current thread: #<Thread:0x00007f437d6cfc60 run>. ``` We're not exactly sure if this is an ActiveRecord bug or not but we've invested too much time into investigating this problem. Fundamentally, we also no longer understand why `ActiveRecord::Base.connection_handler.clear_active_connections!` is being called in an ensure block within `Jobs::Base#perform` which was added in ceddb6e0da92e874b3bee9543915726f1d27d60b 10 years ago. This commit moves the logic for running jobs immediately out of the `Jobs::Base#perform` method into another `Jobs::Base#perform_immediately` method such that `ActiveRecord::Base.connection_handler.clear_active_connections!` is not called. This change will only impact the test environment.
2023-01-10 13:41:25 +08:00
queued_klass.new.perform_immediately(queued_opts)
end
ensure
Thread.current[:discourse_nested_job_queue] = nil
end
end
end
2013-02-06 03:16:51 +08:00
end
end
def self.enqueue_in(secs, job_name, opts = {})
enqueue(job_name, opts.merge!(delay_for: secs))
end
def self.enqueue_at(datetime, job_name, opts = {})
2020-07-24 17:16:52 +08:00
secs = [datetime.to_f - Time.zone.now.to_f, 0].max
2015-07-29 22:34:21 +08:00
enqueue_in(secs, job_name, opts)
end
def self.cancel_scheduled_job(job_name, opts = {})
scheduled_for(job_name, opts).each(&:delete)
end
def self.scheduled_for(job_name, opts = {})
opts = opts.with_indifferent_access
unless opts.delete(:all_sites)
opts[:current_site_id] ||= RailsMultisite::ConnectionManagement.current_db
end
job_class = "Jobs::#{job_name.to_s.camelcase}"
Sidekiq::ScheduledSet.new.select do |scheduled_job|
if scheduled_job.klass.to_s == job_class
matched = true
job_params = scheduled_job.item["args"][0].with_indifferent_access
opts.each do |key, value|
if job_params[key] != value
matched = false
break
end
end
matched
else
false
end
end
end
2013-02-06 03:16:51 +08:00
end