discourse/lib/sidekiq/pausable.rb
Sam Saffron 30990006a9 DEV: enable frozen string literal on all files
This reduces chances of errors where consumers of strings mutate inputs
and reduces memory usage of the app.

Test suite passes now, but there may be some stuff left, so we will run
a few sites on a branch prior to merging
2019-05-13 09:31:32 +08:00

156 lines
3.1 KiB
Ruby

# frozen_string_literal: true
require 'thread'
class SidekiqPauser
TTL = 60
PAUSED_KEY = "sidekiq_is_paused_v2"
def initialize
@mutex = Mutex.new
@dbs ||= Set.new
end
def pause!(value = "paused")
$redis.setex PAUSED_KEY, TTL, value
extend_lease_thread
true
end
def paused?
!!$redis.get(PAUSED_KEY)
end
def unpause_all!
@mutex.synchronize do
@dbs = Set.new
stop_extend_lease_thread
end
RailsMultisite::ConnectionManagement.each_connection do
unpause! if paused?
end
end
def paused_dbs
dbs = []
RailsMultisite::ConnectionManagement.each_connection do
dbs << RailsMultisite::ConnectionManagement.current_db if paused?
end
dbs
end
def unpause!
@mutex.synchronize do
@dbs.delete(RailsMultisite::ConnectionManagement.current_db)
stop_extend_lease_thread if @dbs.size == 0
end
$redis.del(PAUSED_KEY)
true
end
private
def stop_extend_lease_thread
# should always be called from a mutex
if t = @extend_lease_thread
@extend_lease_thread = nil
while t.alive?
begin
t.wakeup
rescue ThreadError => e
unless e.message =~ /killed thread/
raise e
end
end
sleep 0
end
end
end
def extend_lease_thread
@mutex.synchronize do
@dbs << RailsMultisite::ConnectionManagement.current_db
@extend_lease_thread ||= Thread.new do
while true do
break if !@extend_lease_thread
@mutex.synchronize do
@dbs.each do |db|
RailsMultisite::ConnectionManagement.with_connection(db) do
if !$redis.expire(PAUSED_KEY, TTL)
# if it was unpaused in another process we got to remove the
# bad key
@dbs.delete(db)
end
end
end
end
sleep(Rails.env.test? ? 0.01 : TTL / 2)
end
end
end
end
end
module Sidekiq
@pauser = SidekiqPauser.new
def self.pause!(key = nil)
key ? @pauser.pause!(key) : @pauser.pause!
end
def self.paused?
@pauser.paused?
end
def self.unpause!
@pauser.unpause!
end
def self.unpause_all!
@pauser.unpause_all!
end
def self.paused_dbs
@pauser.paused_dbs
end
end
# server middleware that will reschedule work whenever Sidekiq is paused
class Sidekiq::Pausable
def initialize(delay = 5.seconds)
@delay = delay
end
def call(worker, msg, queue)
if sidekiq_paused?(msg) && !(Jobs::RunHeartbeat === worker)
worker.class.perform_in(@delay, *msg['args'])
else
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
result = yield
duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
DiscourseEvent.trigger(:sidekiq_job_ran, worker, msg, queue, duration)
result
end
end
private
def sidekiq_paused?(msg)
if site_id = msg["args"]&.first&.dig("current_site_id")
RailsMultisite::ConnectionManagement.with_connection(site_id) do
Sidekiq.paused?
end
end
end
end