discourse/spec/jobs/jobs_base_spec.rb
Alan Guo Xiang Tan 043ba1d179
DEV: Fix job cluster concurrency spec timing out (#25035)
Why this change?

On CI, we have been seeing the "handles job concurrency" job timing out
on CI after 45 seconds. Upon closer inspection of `Jobs::Base#perform`
when cluster concurrency has been set, we see that a thread is spun up
to extend the expiring of a redis key by 120 seconds every 60 seconds
while the job is still being executed. The thread looks like this before
the fix:

```
keepalive_thread =
  Thread.new do
    while parent_thread.alive? && !finished
      Discourse.redis.without_namespace.expire(cluster_concurrency_redis_key, 120)
      sleep 60
    end
  end
```

In an ensure block of `Jobs::Base#perform`, the thread is stop by doing
something like this:

```
finished = true
keepalive_thread.wakeup
keepalive_thread.join
```

If the thread is sleeping, `keepalive_thread.wakeup` will stop the
`sleep` method and run the next iteration causing the thread to
complete. However, there is a timing issue at play here. If
`keepalive_thread.wakeup` is called at a time when the thread is not
sleeping, it will have no effect and the thread may end up sleeping for
60 seconds which is longer than our timeout on CI of 45 seconds.

What does this change do?

1. Change `sleep 60` to sleep in intervals of 1 second checking if the
   job has been finished each time.

2. Add `use_redis_snapshotting` to `Jobs::Base` spec since Redis is
   involved in scheduling and we want to ensure we don't leak Redis
keys.

3. Add `ConcurrentJob.stop!` and `thread.join` to `ensure` block in "handles job concurrency"
   test since a failing expectation will cause us to not clean up the
thread we created in the test.
2023-12-26 14:47:03 +08:00

154 lines
3.6 KiB
Ruby

# frozen_string_literal: true
RSpec.describe ::Jobs::Base do
use_redis_snapshotting
class GoodJob < ::Jobs::Base
attr_accessor :count
def execute(args)
self.count ||= 0
self.count += 1
end
end
class BadJob < ::Jobs::Base
class BadJobError < StandardError
end
attr_accessor :fail_count
def execute(args)
@fail_count ||= 0
@fail_count += 1
raise BadJobError
end
end
class ConcurrentJob < ::Jobs::Base
cluster_concurrency 1
def self.stop!
@stop = true
end
def self.stop
@stop
end
def self.running?
@running
end
def self.running=(val)
@running = val
end
def execute(args)
self.class.running = true
sleep 0.0001 while !self.class.stop
ensure
self.class.running = false
end
end
it "handles job concurrency" do
ConcurrentJob.clear_cluster_concurrency_lock!
expect(ConcurrentJob.get_cluster_concurrency).to eq(1)
expect(BadJob.get_cluster_concurrency).to eq(nil)
expect(Sidekiq::Queues["default"].size).to eq(0)
thread = Thread.new { ConcurrentJob.new.perform({ "test" => 100 }) }
wait_for { ConcurrentJob.running? }
ConcurrentJob.new.perform({ "test" => 100 })
expect(Sidekiq::Queues["default"].size).to eq(1)
expect(Sidekiq::Queues["default"][0]["args"][0]).to eq("test" => 100)
ensure
ConcurrentJob.stop!
thread.join
end
it "handles correct jobs" do
job = GoodJob.new
job.perform({})
expect(job.count).to eq(1)
end
it "handles errors in multisite" do
RailsMultisite::ConnectionManagement.expects(:all_dbs).returns(%w[default default default])
# one exception per database
Discourse.expects(:handle_job_exception).times(3)
bad = BadJob.new
expect { bad.perform({}) }.to raise_error(Jobs::HandledExceptionWrapper)
expect(bad.fail_count).to eq(3)
end
describe "#perform" do
context "when a job raises an error" do
before { Discourse.reset_job_exception_stats! }
after { Discourse.reset_job_exception_stats! }
it "collects stats for failing jobs in Discourse.job_exception_stats" do
bad = BadJob.new
3.times do
# During test env handle_job_exception errors out
# in production this is suppressed
expect { bad.perform({}) }.to raise_error(BadJob::BadJobError)
end
expect(Discourse.job_exception_stats).to eq({ BadJob => 3 })
end
end
end
it "delegates the process call to execute" do
::Jobs::Base.any_instance.expects(:execute).with({ "hello" => "world" })
::Jobs::Base.new.perform("hello" => "world")
end
it "converts to an indifferent access hash" do
::Jobs::Base.any_instance.expects(:execute).with(instance_of(HashWithIndifferentAccess))
::Jobs::Base.new.perform("hello" => "world")
end
context "with fake jobs" do
let(:common_state) { [] }
let(:test_job_1) do
Class
.new(Jobs::Base)
.tap do |klass|
state = common_state
klass.define_method(:execute) { |args| state << "job_1_executed" }
end
end
let(:test_job_2) do
Class
.new(Jobs::Base)
.tap do |klass|
state = common_state
job_1 = test_job_1
klass.define_method(:execute) do |args|
state << "job_2_started"
Jobs.enqueue(job_1)
state << "job_2_finished"
end
end
end
it "runs jobs synchronously sequentially in tests" do
Jobs.run_immediately!
Jobs.enqueue(test_job_2)
expect(common_state).to eq(%w[job_2_started job_2_finished job_1_executed])
end
end
end