diff --git a/app/jobs/base.rb b/app/jobs/base.rb index 4d1d2529e5c..1f6839ad8d6 100644 --- a/app/jobs/base.rb +++ b/app/jobs/base.rb @@ -161,6 +161,15 @@ module Jobs include Sidekiq::Worker + def self.cluster_concurrency(val) + raise ArgumentError, "cluster_concurrency must be 1 or nil" if val != 1 && val != nil + @cluster_concurrency = val + end + + def self.get_cluster_concurrency + @cluster_concurrency + end + def log(*args) args.each do |arg| Rails.logger.info "#{Time.now.to_formatted_s(:db)}: [#{self.class.name.upcase}] #{arg}" @@ -216,7 +225,41 @@ module Jobs end end + def self.cluster_concurrency_redis_key + "cluster_concurrency:#{self}" + end + + def self.clear_cluster_concurrency_lock! + Discourse.redis.without_namespace.del(cluster_concurrency_redis_key) + end + + def self.acquire_cluster_concurrency_lock! + !!Discourse.redis.without_namespace.set(cluster_concurrency_redis_key, 0, nx: true, ex: 120) + end + def perform(*args) + requeued = false + keepalive_thread = nil + finished = false + + if self.class.get_cluster_concurrency + if !self.class.acquire_cluster_concurrency_lock! + self.class.perform_in(10.seconds, *args) + requeued = true + return + end + parent_thread = Thread.current + cluster_concurrency_redis_key = self.class.cluster_concurrency_redis_key + + keepalive_thread = + Thread.new do + while parent_thread.alive? && !finished + Discourse.redis.without_namespace.expire(cluster_concurrency_redis_key, 120) + sleep 60 + end + end + end + opts = args.extract_options!.with_indifferent_access Sidekiq.redis { |r| r.set("last_job_perform_at", Time.now.to_i) } if ::Jobs.run_later? @@ -278,6 +321,12 @@ module Jobs nil ensure + if self.class.get_cluster_concurrency && !requeued + finished = true + keepalive_thread.wakeup + keepalive_thread.join + self.class.clear_cluster_concurrency_lock! + end ActiveRecord::Base.connection_handler.clear_active_connections! end end diff --git a/app/jobs/regular/anonymize_user.rb b/app/jobs/regular/anonymize_user.rb index 7ae14c11732..71069f2c79c 100644 --- a/app/jobs/regular/anonymize_user.rb +++ b/app/jobs/regular/anonymize_user.rb @@ -3,6 +3,9 @@ module Jobs class AnonymizeUser < ::Jobs::Base sidekiq_options queue: "low" + # this is an extremely expensive job + # we are limiting it so only 1 per cluster runs + cluster_concurrency 1 def execute(args) @user_id = args[:user_id] diff --git a/spec/jobs/jobs_base_spec.rb b/spec/jobs/jobs_base_spec.rb index a76fed09e3b..c08d33d6393 100644 --- a/spec/jobs/jobs_base_spec.rb +++ b/spec/jobs/jobs_base_spec.rb @@ -22,6 +22,46 @@ RSpec.describe ::Jobs::Base do end end + class ConcurrentJob < ::Jobs::Base + cluster_concurrency 1 + + def self.running? + @running + end + + def self.running=(val) + @running = val + end + + def execute(args) + self.class.running = true + sleep 20 + ensure + self.class.running = false + end + end + + it "handles job concurrency" do + ConcurrentJob.clear_cluster_concurrency_lock! + + expect(ConcurrentJob.get_cluster_concurrency).to eq(1) + expect(BadJob.get_cluster_concurrency).to eq(nil) + + expect(Sidekiq::Queues["default"].size).to eq(0) + + thread = Thread.new { ConcurrentJob.new.perform({ "test" => 100 }) } + + wait_for { ConcurrentJob.running? } + + ConcurrentJob.new.perform({ "test" => 100 }) + expect(Sidekiq::Queues["default"].size).to eq(1) + + expect(Sidekiq::Queues["default"][0]["args"][0]).to eq("test" => 100) + + thread.wakeup + thread.join + end + it "handles correct jobs" do job = GoodJob.new job.perform({})