From 04eac9f14a7e812442cfaf5aaafad21f7a5c879b Mon Sep 17 00:00:00 2001 From: Sam Date: Mon, 24 Jul 2017 18:35:20 -0400 Subject: [PATCH] lets attempt to get these specs working! --- lib/scheduler/manager.rb | 1 - spec/components/scheduler/manager_spec.rb | 413 +++++++++++----------- 2 files changed, 211 insertions(+), 203 deletions(-) diff --git a/lib/scheduler/manager.rb b/lib/scheduler/manager.rb index bbdc14eea9e..48466a6687c 100644 --- a/lib/scheduler/manager.rb +++ b/lib/scheduler/manager.rb @@ -120,7 +120,6 @@ module Scheduler Discourse.handle_job_exception(ex, {message: "Processing scheduled job queue"}) ensure @running = false - ActiveRecord::Base.connection_handler.clear_active_connections! end diff --git a/spec/components/scheduler/manager_spec.rb b/spec/components/scheduler/manager_spec.rb index 61b4e1b4161..90f44f3fefb 100644 --- a/spec/components/scheduler/manager_spec.rb +++ b/spec/components/scheduler/manager_spec.rb @@ -4,206 +4,215 @@ require 'scheduler/scheduler' describe Scheduler::Manager do - # SAM: working on this at the moment, will take me a few days to clean up - # - # module Testing - # class RandomJob - # extend ::Scheduler::Schedule - # - # def self.runs=(val) - # @runs = val - # end - # - # def self.runs - # @runs ||= 0 - # end - # - # every 5.minutes - # - # def perform - # self.class.runs+=1 - # sleep 0.001 - # end - # end - # - # class SuperLongJob - # extend ::Scheduler::Schedule - # - # every 10.minutes - # - # def perform - # sleep 1000 - # end - # end - # - # class PerHostJob - # extend ::Scheduler::Schedule - # - # per_host - # every 10.minutes - # - # def self.runs=(val) - # @runs = val - # end - # - # def self.runs - # @runs ||= 0 - # end - # - # def perform - # self.class.runs += 1 - # end - # end - # end - # - # let(:manager) { - # Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false) - # } - # - # before do - # expect(ActiveRecord::Base.connection_pool.connections.reject{|c| !c.in_use?}.length).to eq(1) - # end - # - # after do - # manager.stop! - # manager.remove(Testing::RandomJob) - # manager.remove(Testing::SuperLongJob) - # manager.remove(Testing::PerHostJob) - # $redis.flushall - # expect(ActiveRecord::Base.connection_pool.connections.reject{|c| !c.in_use?}.length).to eq(1) - # end - # - # it 'can disable stats' do - # manager = Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false) - # expect(manager.enable_stats).to eq(false) - # - # manager = Scheduler::Manager.new(DiscourseRedis.new) - # expect(manager.enable_stats).to eq(true) - # end - # - # describe 'per host jobs' do - # it "correctly schedules on multiple hosts" do - # Testing::PerHostJob.runs = 0 - # - # hosts = ['a','b','c'] - # - # hosts.map do |host| - # - # manager = Scheduler::Manager.new(DiscourseRedis.new, hostname: host, enable_stats: false) - # manager.ensure_schedule!(Testing::PerHostJob) - # - # info = manager.schedule_info(Testing::PerHostJob) - # info.next_run = Time.now.to_i - 1 - # info.write! - # - # manager - # - # end.each do |manager| - # - # manager.blocking_tick - # manager.stop! - # - # end - # - # expect(Testing::PerHostJob.runs).to eq(3) - # - # end - # end - # - # describe '#sync' do - # - # it 'increases' do - # expect(Scheduler::Manager.seq).to eq(Scheduler::Manager.seq - 1) - # end - # end - # - # describe '#tick' do - # - # it 'should nuke missing jobs' do - # $redis.zadd Scheduler::Manager.queue_key, Time.now.to_i - 1000, "BLABLA" - # manager.tick - # expect($redis.zcard(Scheduler::Manager.queue_key)).to eq(0) - # end - # - # it 'should recover from crashed manager' do - # - # info = manager.schedule_info(Testing::SuperLongJob) - # info.next_run = Time.now.to_i - 1 - # info.write! - # - # manager.tick - # manager.stop! - # - # $redis.del manager.identity_key - # - # manager = Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false) - # manager.reschedule_orphans! - # - # info = manager.schedule_info(Testing::SuperLongJob) - # expect(info.next_run).to be <= Time.now.to_i - # end - # - # # something about logging jobs causing a leak in connection pool in test - # it 'should log when job finishes running' do - # - # Testing::RandomJob.runs = 0 - # - # info = manager.schedule_info(Testing::RandomJob) - # info.next_run = Time.now.to_i - 1 - # info.write! - # - # # with stats so we must be careful to cleanup - # manager = Scheduler::Manager.new(DiscourseRedis.new) - # manager.blocking_tick - # manager.stop! - # - # stat = SchedulerStat.first - # expect(stat).to be_present - # expect(stat.duration_ms).to be > 0 - # expect(stat.success).to be true - # SchedulerStat.destroy_all - # end - # - # it 'should only run pending job once' do - # - # Testing::RandomJob.runs = 0 - # - # info = manager.schedule_info(Testing::RandomJob) - # info.next_run = Time.now.to_i - 1 - # info.write! - # - # (0..5).map do - # Thread.new do - # manager = Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false) - # manager.blocking_tick - # manager.stop! - # end - # end.map(&:join) - # - # expect(Testing::RandomJob.runs).to eq(1) - # - # info = manager.schedule_info(Testing::RandomJob) - # expect(info.prev_run).to be <= Time.now.to_i - # expect(info.prev_duration).to be > 0 - # expect(info.prev_result).to eq("OK") - # end - # - # end - # - # describe '#discover_schedules' do - # it 'Discovers Testing::RandomJob' do - # expect(Scheduler::Manager.discover_schedules).to include(Testing::RandomJob) - # end - # end - # - # describe '#next_run' do - # it 'should be within the next 5 mins if it never ran' do - # - # manager.remove(Testing::RandomJob) - # manager.ensure_schedule!(Testing::RandomJob) - # - # expect(manager.next_run(Testing::RandomJob)) - # .to be_within(5.minutes.to_i).of(Time.now.to_i + 5.minutes) - # end - # end + module Testing + class RandomJob + extend ::Scheduler::Schedule + + def self.runs=(val) + @runs = val + end + + def self.runs + @runs ||= 0 + end + + every 5.minutes + + def perform + self.class.runs+=1 + sleep 0.001 + end + end + + class SuperLongJob + extend ::Scheduler::Schedule + + every 10.minutes + + def perform + sleep 1000 + end + end + + class PerHostJob + extend ::Scheduler::Schedule + + per_host + every 10.minutes + + def self.runs=(val) + @runs = val + end + + def self.runs + @runs ||= 0 + end + + def perform + self.class.runs += 1 + end + end + end + + let(:manager) { + Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false) + } + + before do + expect(ActiveRecord::Base.connection_pool.connections.length).to eq(1) + @thread_count = Thread.list.count + end + + after do + manager.stop! + manager.remove(Testing::RandomJob) + manager.remove(Testing::SuperLongJob) + manager.remove(Testing::PerHostJob) + $redis.flushall + expect(ActiveRecord::Base.connection_pool.connections.reject{|c| !c.in_use?}.length).to eq(1) + + # connections that are not in use must be removed + # otherwise active record gets super confused + ActiveRecord::Base.connection_pool.connections.reject{|c| c.in_use?}.each do |c| + ActiveRecord::Base.connection_pool.remove(c) + end + expect(ActiveRecord::Base.connection_pool.connections.length).to eq(1) + expect(@thread_count).to eq(Thread.list.count) + end + + it 'can disable stats' do + manager = Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false) + expect(manager.enable_stats).to eq(false) + manager.stop! + + manager = Scheduler::Manager.new(DiscourseRedis.new) + expect(manager.enable_stats).to eq(true) + manager.stop! + end + + describe 'per host jobs' do + it "correctly schedules on multiple hosts" do + Testing::PerHostJob.runs = 0 + + hosts = ['a','b','c'] + + hosts.map do |host| + + manager = Scheduler::Manager.new(DiscourseRedis.new, hostname: host, enable_stats: false) + manager.ensure_schedule!(Testing::PerHostJob) + + info = manager.schedule_info(Testing::PerHostJob) + info.next_run = Time.now.to_i - 1 + info.write! + + manager + + end.each do |manager| + + manager.blocking_tick + manager.stop! + + end + expect(Testing::PerHostJob.runs).to eq(3) + end + end + + describe '#sync' do + + it 'increases' do + expect(Scheduler::Manager.seq).to eq(Scheduler::Manager.seq - 1) + end + end + + describe '#tick' do + + it 'should nuke missing jobs' do + $redis.zadd Scheduler::Manager.queue_key, Time.now.to_i - 1000, "BLABLA" + manager.tick + expect($redis.zcard(Scheduler::Manager.queue_key)).to eq(0) + end + + it 'should recover from crashed manager' do + + info = manager.schedule_info(Testing::SuperLongJob) + info.next_run = Time.now.to_i - 1 + info.write! + + manager.tick + manager.stop! + + $redis.del manager.identity_key + + manager = Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false) + manager.reschedule_orphans! + + info = manager.schedule_info(Testing::SuperLongJob) + expect(info.next_run).to be <= Time.now.to_i + + manager.stop! + end + + # something about logging jobs causing a leak in connection pool in test + it 'should log when job finishes running' do + + Testing::RandomJob.runs = 0 + + info = manager.schedule_info(Testing::RandomJob) + info.next_run = Time.now.to_i - 1 + info.write! + + # with stats so we must be careful to cleanup + manager = Scheduler::Manager.new(DiscourseRedis.new) + manager.blocking_tick + manager.stop! + + stat = SchedulerStat.first + expect(stat).to be_present + expect(stat.duration_ms).to be > 0 + expect(stat.success).to be true + SchedulerStat.destroy_all + end + + it 'should only run pending job once' do + + Testing::RandomJob.runs = 0 + + info = manager.schedule_info(Testing::RandomJob) + info.next_run = Time.now.to_i - 1 + info.write! + + (0..5).map do + Thread.new do + manager = Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false) + manager.blocking_tick + manager.stop! + end + end.map(&:join) + + expect(Testing::RandomJob.runs).to eq(1) + + info = manager.schedule_info(Testing::RandomJob) + expect(info.prev_run).to be <= Time.now.to_i + expect(info.prev_duration).to be > 0 + expect(info.prev_result).to eq("OK") + end + + end + + describe '#discover_schedules' do + it 'Discovers Testing::RandomJob' do + expect(Scheduler::Manager.discover_schedules).to include(Testing::RandomJob) + end + end + + describe '#next_run' do + it 'should be within the next 5 mins if it never ran' do + + manager.remove(Testing::RandomJob) + manager.ensure_schedule!(Testing::RandomJob) + + expect(manager.next_run(Testing::RandomJob)) + .to be_within(5.minutes.to_i).of(Time.now.to_i + 5.minutes) + end + end end