mirror of
https://github.com/discourse/discourse.git
synced 2025-01-22 08:12:50 +08:00
30990006a9
This reduces chances of errors where consumers of strings mutate inputs and reduces memory usage of the app. Test suite passes now, but there may be some stuff left, so we will run a few sites on a branch prior to merging
156 lines
3.1 KiB
Ruby
156 lines
3.1 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require 'thread'
|
|
|
|
class SidekiqPauser
|
|
TTL = 60
|
|
PAUSED_KEY = "sidekiq_is_paused_v2"
|
|
|
|
def initialize
|
|
@mutex = Mutex.new
|
|
@dbs ||= Set.new
|
|
end
|
|
|
|
def pause!(value = "paused")
|
|
$redis.setex PAUSED_KEY, TTL, value
|
|
extend_lease_thread
|
|
true
|
|
end
|
|
|
|
def paused?
|
|
!!$redis.get(PAUSED_KEY)
|
|
end
|
|
|
|
def unpause_all!
|
|
@mutex.synchronize do
|
|
@dbs = Set.new
|
|
stop_extend_lease_thread
|
|
end
|
|
|
|
RailsMultisite::ConnectionManagement.each_connection do
|
|
unpause! if paused?
|
|
end
|
|
end
|
|
|
|
def paused_dbs
|
|
dbs = []
|
|
|
|
RailsMultisite::ConnectionManagement.each_connection do
|
|
dbs << RailsMultisite::ConnectionManagement.current_db if paused?
|
|
end
|
|
|
|
dbs
|
|
end
|
|
|
|
def unpause!
|
|
@mutex.synchronize do
|
|
@dbs.delete(RailsMultisite::ConnectionManagement.current_db)
|
|
stop_extend_lease_thread if @dbs.size == 0
|
|
end
|
|
|
|
$redis.del(PAUSED_KEY)
|
|
true
|
|
end
|
|
|
|
private
|
|
|
|
def stop_extend_lease_thread
|
|
# should always be called from a mutex
|
|
if t = @extend_lease_thread
|
|
@extend_lease_thread = nil
|
|
while t.alive?
|
|
begin
|
|
t.wakeup
|
|
rescue ThreadError => e
|
|
unless e.message =~ /killed thread/
|
|
raise e
|
|
end
|
|
end
|
|
|
|
sleep 0
|
|
end
|
|
end
|
|
end
|
|
|
|
def extend_lease_thread
|
|
@mutex.synchronize do
|
|
@dbs << RailsMultisite::ConnectionManagement.current_db
|
|
|
|
@extend_lease_thread ||= Thread.new do
|
|
while true do
|
|
break if !@extend_lease_thread
|
|
|
|
@mutex.synchronize do
|
|
@dbs.each do |db|
|
|
RailsMultisite::ConnectionManagement.with_connection(db) do
|
|
if !$redis.expire(PAUSED_KEY, TTL)
|
|
# if it was unpaused in another process we got to remove the
|
|
# bad key
|
|
@dbs.delete(db)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
sleep(Rails.env.test? ? 0.01 : TTL / 2)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
module Sidekiq
|
|
@pauser = SidekiqPauser.new
|
|
|
|
def self.pause!(key = nil)
|
|
key ? @pauser.pause!(key) : @pauser.pause!
|
|
end
|
|
|
|
def self.paused?
|
|
@pauser.paused?
|
|
end
|
|
|
|
def self.unpause!
|
|
@pauser.unpause!
|
|
end
|
|
|
|
def self.unpause_all!
|
|
@pauser.unpause_all!
|
|
end
|
|
|
|
def self.paused_dbs
|
|
@pauser.paused_dbs
|
|
end
|
|
end
|
|
|
|
# server middleware that will reschedule work whenever Sidekiq is paused
|
|
class Sidekiq::Pausable
|
|
|
|
def initialize(delay = 5.seconds)
|
|
@delay = delay
|
|
end
|
|
|
|
def call(worker, msg, queue)
|
|
if sidekiq_paused?(msg) && !(Jobs::RunHeartbeat === worker)
|
|
worker.class.perform_in(@delay, *msg['args'])
|
|
else
|
|
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
|
result = yield
|
|
duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
|
|
DiscourseEvent.trigger(:sidekiq_job_ran, worker, msg, queue, duration)
|
|
result
|
|
end
|
|
end
|
|
|
|
private
|
|
|
|
def sidekiq_paused?(msg)
|
|
if site_id = msg["args"]&.first&.dig("current_site_id")
|
|
RailsMultisite::ConnectionManagement.with_connection(site_id) do
|
|
Sidekiq.paused?
|
|
end
|
|
end
|
|
end
|
|
|
|
end
|