discourse/lib/sidekiq/pausable.rb

84 lines
1.2 KiB
Ruby
Raw Normal View History

require 'thread'
class SidekiqPauser
def initialize
@mutex = Mutex.new
end
def pause!
2014-08-19 13:50:17 +08:00
redis.setex paused_key, 60, "paused"
@mutex.synchronize do
2014-08-19 13:50:17 +08:00
@extend_lease_thread ||= extend_lease_thread
sleep 0.001 while !paused?
end
2014-02-13 12:27:04 +08:00
true
end
def paused?
2014-08-19 13:50:17 +08:00
!!redis.get(paused_key)
2014-02-13 12:27:04 +08:00
end
def unpause!
@mutex.synchronize do
2014-08-19 18:56:25 +08:00
@extend_lease_thread = nil
end
2014-08-19 13:50:17 +08:00
redis.del(paused_key)
2014-02-13 12:27:04 +08:00
true
end
private
2014-08-19 13:50:17 +08:00
def extend_lease_thread
Thread.new do
2014-08-19 13:50:17 +08:00
while true do
2014-08-19 18:56:25 +08:00
break unless @mutex.synchronize { @extend_lease_thread }
2014-08-19 13:50:17 +08:00
redis.expire paused_key, 60
sleep 30
end
end
end
2014-08-19 13:50:17 +08:00
def redis
$redis.without_namespace
end
def paused_key
"sidekiq_is_paused_v2"
2014-02-13 12:27:04 +08:00
end
end
module Sidekiq
@pauser = SidekiqPauser.new
def self.pause!
@pauser.pause!
end
def self.paused?
@pauser.paused?
end
def self.unpause!
@pauser.unpause!
end
end
2014-02-13 12:27:04 +08:00
# server middleware that will reschedule work whenever Sidekiq is paused
class Sidekiq::Pausable
def initialize(delay = 5.seconds)
@delay = delay
end
def call(worker, msg, queue)
if Sidekiq.paused?
2016-04-01 05:33:25 +08:00
worker.class.perform_in(@delay, *msg['args'])
2014-02-13 12:27:04 +08:00
else
yield
end
end
end