FEATURE: automatic orphan recovery

BUGFIX: improve scheduler robustness, in case redis is disconnected during operation

If sidekiq is terminated while task is running, it will be picked up and ran again
New owner on tasks to help debugging
better #stop semantics for tests
This commit is contained in:
Sam 2014-02-12 13:32:34 +11:00
parent 2e5413437d
commit 71a38542a4
4 changed files with 177 additions and 26 deletions

View File

@ -12,32 +12,78 @@ module Scheduler
class Runner
def initialize(manager)
@mutex = Mutex.new
@queue = Queue.new
@manager = manager
@reschedule_orphans_thread = Thread.new do
while true
sleep 1.minute
@mutex.synchronize do
reschedule_orphans
end
end
end
@keep_alive_thread = Thread.new do
while true
@mutex.synchronize do
keep_alive
end
sleep (@manager.keep_alive_duration / 2)
end
end
@thread = Thread.new do
while true
klass = @queue.deq
failed = false
start = Time.now.to_f
info = @manager.schedule_info(klass)
begin
info.prev_result = "RUNNING"
info.write!
klass.new.perform
rescue => e
Scheduler::Manager.handle_exception(e)
failed = true
end
duration = ((Time.now.to_f - start) * 1000).to_i
info.prev_duration = duration
info.prev_result = failed ? "FAILED" : "OK"
info.write!
process_queue
end
end
end
def keep_alive
@manager.keep_alive
rescue => ex
Scheduler::Manager.handle_exception(ex)
end
def reschedule_orphans
@manager.reschedule_orphans!
rescue => ex
Scheduler::Manager.handle_exception(ex)
end
def process_queue
klass = @queue.deq
# hack alert, I need to both deq and set @running atomically.
@running = true
failed = false
start = Time.now.to_f
info = @mutex.synchronize { @manager.schedule_info(klass) }
begin
info.prev_result = "RUNNING"
@mutex.synchronize { info.write! }
klass.new.perform
rescue => e
Scheduler::Manager.handle_exception(e)
failed = true
end
duration = ((Time.now.to_f - start) * 1000).to_i
info.prev_duration = duration
info.prev_result = failed ? "FAILED" : "OK"
info.current_owner = nil
attempts(3) do
@mutex.synchronize { info.write! }
end
rescue => ex
Scheduler::Manager.handle_exception(ex)
ensure
@running = false
end
def stop!
@thread.kill
@mutex.synchronize do
@thread.kill
@keep_alive_thread.kill
@reschedule_orphans_thread.kill
end
end
def enq(klass)
@ -48,7 +94,23 @@ module Scheduler
while !@queue.empty? && !(@queue.num_waiting > 0)
sleep 0.001
end
# this is a hack, but is only used for test anyway
sleep 0.001
while @running
sleep 0.001
end
end
def attempts(n)
n.times {
begin
yield; break
rescue
sleep Random.rand
end
}
end
end
def self.without_runner(redis=nil)
@ -94,22 +156,42 @@ module Scheduler
end
end
def reschedule_orphans!
lock do
redis.zrange(Manager.queue_key, 0, -1).each do |key|
klass = get_klass(key)
next unless klass
info = schedule_info(klass)
if ['QUEUED', 'RUNNING'].include?(info.prev_result) &&
(info.current_owner.blank? || !redis.get(info.current_owner))
info.prev_result = 'ORPHAN'
info.next_run = Time.now.to_i
info.write!
end
end
end
end
def get_klass(name)
name.constantize
rescue NameError
nil
end
def tick
lock do
(key, due), _ = redis.zrange Manager.queue_key, 0, 0, withscores: true
return unless key
if due.to_i <= Time.now.to_i
klass = begin
key.constantize
rescue NameError
nil
end
klass = get_klass(key)
return unless klass
info = schedule_info(klass)
info.prev_run = Time.now.to_i
info.prev_result = "QUEUED"
info.prev_duration = -1
info.next_run = nil
info.current_owner = identity_key
info.schedule!
@runner.enq(klass)
end
@ -126,6 +208,13 @@ module Scheduler
self.class.current = nil
end
def keep_alive_duration
60
end
def keep_alive
redis.setex identity_key, keep_alive_duration, ""
end
def lock
got_lock = false
@ -157,6 +246,7 @@ module Scheduler
redis.del Manager.lock_key
end
def self.discover_schedules
schedules = []
ObjectSpace.each_object(Scheduler::Schedule) do |schedule|
@ -165,6 +255,18 @@ module Scheduler
schedules
end
@mutex = Mutex.new
def self.seq
@mutex.synchronize do
@i ||= 0
@i += 1
end
end
def identity_key
@identity_key ||= "_scheduler_#{`hostname`}:#{Process.pid}:#{self.class.seq}"
end
def self.lock_key
"_scheduler_lock_"
end

View File

@ -3,7 +3,8 @@ module Scheduler
attr_accessor :next_run,
:prev_run,
:prev_duration,
:prev_result
:prev_result,
:current_owner
def initialize(klass, manager)
@klass = klass
@ -21,10 +22,11 @@ module Scheduler
@prev_run = data["prev_run"]
@prev_result = data["prev_result"]
@prev_duration = data["prev_duration"]
@current_owner = data["current_owner"]
end
rescue
# corrupt redis
@next_run = @prev_run = @prev_result = @prev_duration = nil
@next_run = @prev_run = @prev_result = @prev_duration = @current_owner = nil
end
def valid?
@ -57,14 +59,15 @@ module Scheduler
next_run: @next_run,
prev_run: @prev_run,
prev_duration: @prev_duration,
prev_result: @prev_result
prev_result: @prev_result,
current_owner: @current_owner
}.to_json
redis.zadd Manager.queue_key, @next_run , @klass
end
def del!
clear!
@next_run = @prev_run = @prev_result = @prev_duration = nil
@next_run = @prev_run = @prev_result = @prev_duration = @current_owner = nil
end
def key

View File

@ -15,6 +15,7 @@
<th style="width: 15%">Last Run</th>
<th style="width: 15%">Last Result</th>
<th style="width: 15%">Last Duration</th>
<th style="width: 15%">Last Owner</th>
<th style="width: 15%">Next Run Due</th>
<th style="width: 10%">Actions</th>
</thead>
@ -37,6 +38,9 @@
<td>
<%= @info.prev_duration %>
</td>
<td>
<%= @info.current_owner %>
</td>
<td>
<% next_run = @info.next_run %>
<% if next_run.nil? %>

View File

@ -23,19 +23,60 @@ describe Scheduler::Manager do
sleep 0.001
end
end
class SuperLongJob
extend ::Scheduler::Schedule
every 10.minutes
def perform
sleep 1000
end
end
end
let(:manager) { Scheduler::Manager.new(DiscourseRedis.new) }
before do
$redis.del manager.class.lock_key
$redis.del manager.class.queue_key
manager.remove(Testing::RandomJob)
manager.remove(Testing::SuperLongJob)
end
after do
manager.stop!
manager.remove(Testing::RandomJob)
manager.remove(Testing::SuperLongJob)
end
describe '#sync' do
it 'increases' do
Scheduler::Manager.seq.should == Scheduler::Manager.seq - 1
end
end
describe '#tick' do
it 'should recover from crashed manager' do
info = manager.schedule_info(Testing::SuperLongJob)
info.next_run = Time.now.to_i - 1
info.write!
manager.tick
manager.stop!
$redis.del manager.identity_key
manager = Scheduler::Manager.new(DiscourseRedis.new)
manager.reschedule_orphans!
info = manager.schedule_info(Testing::SuperLongJob)
info.next_run.should <= Time.now.to_i
end
it 'should only run pending job once' do
Testing::RandomJob.runs = 0
@ -48,6 +89,7 @@ describe Scheduler::Manager do
Thread.new do
manager = Scheduler::Manager.new(DiscourseRedis.new)
manager.blocking_tick
manager.stop!
end
end.map(&:join)