discourse/spec/jobs/jobs_spec.rb
Alan Guo Xiang Tan 9812407f76
FIX: Redo Sidekiq monitoring to restart stuck sidekiq processes (#30198)
This commit reimplements how we monitor Sidekiq processes that are
forked from the Unicorn master process. Prior to this change, we rely on
`Jobs::Heartbeat` to enqueue a `Jobs::RunHeartbeat` job every 3 minutes.
The `Jobs::RunHeartbeat` job then sets a Redis key with a timestamp. In
the Unicorn master process, we then fetch the timestamp that has been set
by the job from Redis every 30 minutes. If the timestamp has not been
updated for more than 30 minutes, we restart the Sidekiq process. The
fundamental flaw with this approach is that it fails to consider
deployments with multiple hosts and multiple Sidekiq processes. A
sidekiq process on a host may be in a bad state but the heartbeat check
will not restart the process because the `Jobs::RunHeartbeat` job is
still being executed by the working Sidekiq processes on other hosts.

In order to properly ensure that stuck Sidekiq processs are restarted,
we now rely on the [Sidekiq::ProcessSet](https://github.com/sidekiq/sidekiq/wiki/API#processes)
API that is supported by Sidekiq. The API provides us with "near real-time (updated every 5 sec)
info about the current set of Sidekiq processes running". The API
provides useful information like the hostname, pid and also when Sidekiq
last did its own heartbeat check. With that information, we can easily
determine if a Sidekiq process needs to be restarted from the Unicorn
master process.
2024-12-18 12:48:50 +08:00

179 lines
5.1 KiB
Ruby

# frozen_string_literal: true
RSpec.describe Jobs do
describe "enqueue" do
describe "run_later!" do
before { Jobs.run_later! }
it "enqueues a job in sidekiq" do
Sidekiq::Testing.fake! do
jobs = Jobs::ProcessPost.jobs
jobs.clear
Jobs.enqueue(:process_post, post_id: 1)
expect(jobs.length).to eq(1)
job = jobs.first
expected = {
"class" => "Jobs::ProcessPost",
"args" => [{ "post_id" => 1, "current_site_id" => "default" }],
"queue" => "default",
}
expect(job.slice("class", "args", "queue")).to eq(expected)
end
end
it "enqueues the job after the current transaction has committed" do
jobs = Jobs::ProcessPost.jobs
expect(jobs.length).to eq(0)
Jobs.enqueue(:process_post, post_id: 1)
expect(jobs.length).to eq(1)
ActiveRecord::Base.transaction do
Jobs.enqueue(:process_post, post_id: 1)
expect(jobs.length).to eq(1)
end
expect(jobs.length).to eq(2)
# Failed transaction
ActiveRecord::Base.transaction do
Jobs.enqueue(:process_post, post_id: 1)
raise ActiveRecord::Rollback
end
expect(jobs.length).to eq(2) # No change
end
it "does not pass current_site_id when 'all_sites' is present" do
Sidekiq::Testing.fake! do
jobs = Jobs::ProcessPost.jobs
jobs.clear
Jobs.enqueue(:process_post, post_id: 1, all_sites: true)
expect(jobs.length).to eq(1)
job = jobs.first
expected = {
"class" => "Jobs::ProcessPost",
"args" => [{ "post_id" => 1 }],
"queue" => "default",
}
expect(job.slice("class", "args", "queue")).to eq(expected)
end
end
it "doesn't execute the job" do
Sidekiq::Client.stubs(:enqueue)
Jobs::ProcessPost.any_instance.expects(:perform).never
Jobs.enqueue(:process_post, post_id: 1)
end
it "should enqueue with the correct database id when the current_site_id option is given" do
Sidekiq::Testing.fake! do
jobs = Jobs::ProcessPost.jobs
jobs.clear
Jobs.enqueue(:process_post, post_id: 1, current_site_id: "test_db")
expect(jobs.length).to eq(1)
job = jobs.first
expected = {
"class" => "Jobs::ProcessPost",
"args" => [{ "post_id" => 1, "current_site_id" => "test_db" }],
"queue" => "default",
}
expect(job.slice("class", "args", "queue")).to eq(expected)
end
end
end
describe "run_immediately!" do
before { Jobs.run_immediately! }
it "doesn't enqueue in sidekiq" do
Sidekiq::Client.expects(:enqueue).with(Jobs::ProcessPost, {}).never
Jobs.enqueue(:process_post, post_id: 1)
end
it "executes the job right away" do
Jobs::ProcessPost
.any_instance
.expects(:perform_immediately)
.with({ "post_id" => 1, "current_site_id" => "default" })
Jobs.enqueue(:process_post, post_id: 1)
end
context "when current_site_id option is given and does not match the current connection" do
before do
Sidekiq::Client.stubs(:enqueue)
Jobs::ProcessPost.any_instance.stubs(:execute).returns(true)
end
it "should raise an exception" do
Jobs::ProcessPost.any_instance.expects(:execute).never
RailsMultisite::ConnectionManagement.expects(:establish_connection).never
expect {
Jobs.enqueue(:process_post, post_id: 1, current_site_id: "test_db")
}.to raise_error(ArgumentError)
end
end
end
end
describe "cancel_scheduled_job" do
class Jobs::SomeJob < ::Jobs::Base
def execute(args)
end
end
let(:scheduled_jobs) { Sidekiq::ScheduledSet.new }
after { scheduled_jobs.clear }
it "deletes the matching job" do
Sidekiq::Testing.disable! do
scheduled_jobs.clear
expect(scheduled_jobs.size).to eq(0)
Jobs.enqueue_in(1.year, :some_job, topic_id: 123)
Jobs.enqueue_in(2.years, :some_job, topic_id: 456)
Jobs.enqueue_in(3.years, :some_job, topic_id: 123, current_site_id: "foo")
Jobs.enqueue_in(4.years, :some_job, topic_id: 123, current_site_id: "bar")
expect(scheduled_jobs.size).to eq(4)
Jobs.cancel_scheduled_job(:some_job, topic_id: 123)
expect(scheduled_jobs.size).to eq(3)
Jobs.cancel_scheduled_job(:some_job, topic_id: 123, all_sites: true)
expect(scheduled_jobs.size).to eq(1)
end
end
end
describe "enqueue_at" do
it "calls enqueue_in for you" do
freeze_time
expect_enqueued_with(job: :process_post, at: 3.hours.from_now) do
Jobs.enqueue_at(3.hours.from_now, :process_post, {})
end
end
it "handles datetimes that are in the past" do
freeze_time
expect_enqueued_with(job: :process_post, at: Time.zone.now) do
Jobs.enqueue_at(3.hours.ago, :process_post, {})
end
end
end
end