mirror of
https://github.com/discourse/discourse.git
synced 2024-11-26 19:13:39 +08:00
correct some test concurrency bugs
This commit is contained in:
parent
c9dcffe434
commit
cc088956bc
|
@ -8,7 +8,7 @@ require_dependency 'distributed_mutex'
|
|||
|
||||
module Scheduler
|
||||
class Manager
|
||||
attr_accessor :random_ratio, :redis
|
||||
attr_accessor :random_ratio, :redis, :enable_stats
|
||||
|
||||
class Runner
|
||||
def initialize(manager)
|
||||
|
@ -69,13 +69,15 @@ module Scheduler
|
|||
begin
|
||||
info.prev_result = "RUNNING"
|
||||
@mutex.synchronize { info.write! }
|
||||
stat = SchedulerStat.create!(
|
||||
name: klass.to_s,
|
||||
hostname: hostname,
|
||||
pid: Process.pid,
|
||||
started_at: Time.zone.now,
|
||||
live_slots_start: GC.stat[:heap_live_slots]
|
||||
)
|
||||
if @manager.enable_stats
|
||||
stat = SchedulerStat.create!(
|
||||
name: klass.to_s,
|
||||
hostname: hostname,
|
||||
pid: Process.pid,
|
||||
started_at: Time.zone.now,
|
||||
live_slots_start: GC.stat[:heap_live_slots]
|
||||
)
|
||||
end
|
||||
klass.new.perform
|
||||
rescue Jobs::HandledExceptionWrapper
|
||||
# Discourse.handle_exception was already called, and we don't have any extra info to give
|
||||
|
@ -88,11 +90,13 @@ module Scheduler
|
|||
info.prev_duration = duration
|
||||
info.prev_result = failed ? "FAILED" : "OK"
|
||||
info.current_owner = nil
|
||||
stat.update_columns(
|
||||
duration_ms: duration,
|
||||
live_slots_finish: GC.stat[:heap_live_slots],
|
||||
success: !failed
|
||||
)
|
||||
if stat
|
||||
stat.update_columns(
|
||||
duration_ms: duration,
|
||||
live_slots_finish: GC.stat[:heap_live_slots],
|
||||
success: !failed
|
||||
)
|
||||
end
|
||||
attempts(3) do
|
||||
@mutex.synchronize { info.write! }
|
||||
end
|
||||
|
@ -151,6 +155,12 @@ module Scheduler
|
|||
|
||||
@hostname = options && options[:hostname]
|
||||
@manager_id = SecureRandom.hex
|
||||
|
||||
if options && options.key?(:enable_stats)
|
||||
@enable_stats = options[:enable_stats]
|
||||
else
|
||||
@enable_stats = true
|
||||
end
|
||||
end
|
||||
|
||||
def self.current
|
||||
|
|
|
@ -54,7 +54,9 @@ describe Scheduler::Manager do
|
|||
end
|
||||
end
|
||||
|
||||
let(:manager) { Scheduler::Manager.new(DiscourseRedis.new) }
|
||||
let(:manager) {
|
||||
Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false)
|
||||
}
|
||||
|
||||
before do
|
||||
$redis.del manager.class.lock_key
|
||||
|
@ -79,7 +81,7 @@ describe Scheduler::Manager do
|
|||
|
||||
hosts.map do |host|
|
||||
|
||||
manager = Scheduler::Manager.new(DiscourseRedis.new, hostname: host)
|
||||
manager = Scheduler::Manager.new(DiscourseRedis.new, hostname: host, enable_stats: false)
|
||||
manager.ensure_schedule!(Testing::PerHostJob)
|
||||
|
||||
info = manager.schedule_info(Testing::PerHostJob)
|
||||
|
@ -126,7 +128,7 @@ describe Scheduler::Manager do
|
|||
|
||||
$redis.del manager.identity_key
|
||||
|
||||
manager = Scheduler::Manager.new(DiscourseRedis.new)
|
||||
manager = Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false)
|
||||
manager.reschedule_orphans!
|
||||
|
||||
info = manager.schedule_info(Testing::SuperLongJob)
|
||||
|
@ -174,7 +176,7 @@ describe Scheduler::Manager do
|
|||
|
||||
(0..5).map do
|
||||
Thread.new do
|
||||
manager = Scheduler::Manager.new(DiscourseRedis.new)
|
||||
manager = Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false)
|
||||
manager.blocking_tick
|
||||
manager.stop!
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue
Block a user