discourse/lib/sidekiq/pausable.rb

89 lines
1.5 KiB
Ruby
Raw Normal View History

require 'thread'
class SidekiqPauser
def initialize
@mutex = Mutex.new
end
def pause!
2014-08-19 13:50:17 +08:00
redis.setex paused_key, 60, "paused"
@mutex.synchronize do
2014-08-19 13:50:17 +08:00
@extend_lease_thread ||= extend_lease_thread
sleep 0.001 while !paused?
end
2014-02-13 12:27:04 +08:00
true
end
def paused?
2014-08-19 13:50:17 +08:00
!!redis.get(paused_key)
2014-02-13 12:27:04 +08:00
end
def unpause!
@mutex.synchronize do
2014-08-19 18:56:25 +08:00
@extend_lease_thread = nil
end
2014-08-19 13:50:17 +08:00
redis.del(paused_key)
2014-02-13 12:27:04 +08:00
true
end
private
2014-08-19 13:50:17 +08:00
def extend_lease_thread
Thread.new do
2014-08-19 13:50:17 +08:00
while true do
2014-08-19 18:56:25 +08:00
break unless @mutex.synchronize { @extend_lease_thread }
2014-08-19 13:50:17 +08:00
redis.expire paused_key, 60
sleep(Rails.env.test? ? 0.01 : 30)
end
end
end
2014-08-19 13:50:17 +08:00
def redis
$redis.without_namespace
end
def paused_key
"sidekiq_is_paused_v2"
2014-02-13 12:27:04 +08:00
end
end
module Sidekiq
@pauser = SidekiqPauser.new
def self.pause!
@pauser.pause!
end
def self.paused?
@pauser.paused?
end
def self.unpause!
@pauser.unpause!
end
end
2014-02-13 12:27:04 +08:00
# server middleware that will reschedule work whenever Sidekiq is paused
class Sidekiq::Pausable
def initialize(delay = 5.seconds)
@delay = delay
end
def call(worker, msg, queue)
if Sidekiq.paused? && !(Jobs::RunHeartbeat === worker)
worker.class.perform_in(@delay, *msg['args'])
2014-02-13 12:27:04 +08:00
else
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
result = yield
duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
DiscourseEvent.trigger(:sidekiq_job_ran, worker, msg, queue, duration)
result
2014-02-13 12:27:04 +08:00
end
end
end