PERF: limit anonymization to 1 per cluster (#21992)

Anonymization is among the most expensive operations we can perform with
extreme potential to impact the database. To mitigate risk we only allow a
single anonymization across the entire cluster concurrently.

This commit introduces support for `cluster_concurrency 1`. When you set that on a Job it will only allow 1 concurrent execution per cluster.
This commit is contained in:
Sam 2023-06-14 08:30:23 +10:00 committed by GitHub
parent 1a27e715ee
commit eb603b246b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 92 additions and 0 deletions

View File

@ -161,6 +161,15 @@ module Jobs
include Sidekiq::Worker
def self.cluster_concurrency(val)
raise ArgumentError, "cluster_concurrency must be 1 or nil" if val != 1 && val != nil
@cluster_concurrency = val
end
def self.get_cluster_concurrency
@cluster_concurrency
end
def log(*args)
args.each do |arg|
Rails.logger.info "#{Time.now.to_formatted_s(:db)}: [#{self.class.name.upcase}] #{arg}"
@ -216,7 +225,41 @@ module Jobs
end
end
def self.cluster_concurrency_redis_key
"cluster_concurrency:#{self}"
end
def self.clear_cluster_concurrency_lock!
Discourse.redis.without_namespace.del(cluster_concurrency_redis_key)
end
def self.acquire_cluster_concurrency_lock!
!!Discourse.redis.without_namespace.set(cluster_concurrency_redis_key, 0, nx: true, ex: 120)
end
def perform(*args)
requeued = false
keepalive_thread = nil
finished = false
if self.class.get_cluster_concurrency
if !self.class.acquire_cluster_concurrency_lock!
self.class.perform_in(10.seconds, *args)
requeued = true
return
end
parent_thread = Thread.current
cluster_concurrency_redis_key = self.class.cluster_concurrency_redis_key
keepalive_thread =
Thread.new do
while parent_thread.alive? && !finished
Discourse.redis.without_namespace.expire(cluster_concurrency_redis_key, 120)
sleep 60
end
end
end
opts = args.extract_options!.with_indifferent_access
Sidekiq.redis { |r| r.set("last_job_perform_at", Time.now.to_i) } if ::Jobs.run_later?
@ -278,6 +321,12 @@ module Jobs
nil
ensure
if self.class.get_cluster_concurrency && !requeued
finished = true
keepalive_thread.wakeup
keepalive_thread.join
self.class.clear_cluster_concurrency_lock!
end
ActiveRecord::Base.connection_handler.clear_active_connections!
end
end

View File

@ -3,6 +3,9 @@
module Jobs
class AnonymizeUser < ::Jobs::Base
sidekiq_options queue: "low"
# this is an extremely expensive job
# we are limiting it so only 1 per cluster runs
cluster_concurrency 1
def execute(args)
@user_id = args[:user_id]

View File

@ -22,6 +22,46 @@ RSpec.describe ::Jobs::Base do
end
end
class ConcurrentJob < ::Jobs::Base
cluster_concurrency 1
def self.running?
@running
end
def self.running=(val)
@running = val
end
def execute(args)
self.class.running = true
sleep 20
ensure
self.class.running = false
end
end
it "handles job concurrency" do
ConcurrentJob.clear_cluster_concurrency_lock!
expect(ConcurrentJob.get_cluster_concurrency).to eq(1)
expect(BadJob.get_cluster_concurrency).to eq(nil)
expect(Sidekiq::Queues["default"].size).to eq(0)
thread = Thread.new { ConcurrentJob.new.perform({ "test" => 100 }) }
wait_for { ConcurrentJob.running? }
ConcurrentJob.new.perform({ "test" => 100 })
expect(Sidekiq::Queues["default"].size).to eq(1)
expect(Sidekiq::Queues["default"][0]["args"][0]).to eq("test" => 100)
thread.wakeup
thread.join
end
it "handles correct jobs" do
job = GoodJob.new
job.perform({})