From 4ad7ce70ce03043c5149ea309e16362b8514dc1f Mon Sep 17 00:00:00 2001 From: Neil Lalonde Date: Tue, 31 Jul 2018 17:12:55 -0400 Subject: [PATCH] REFACTOR: extract scheduler to the mini_scheduler gem --- Gemfile | 1 + Gemfile.lock | 2 + app/jobs/base.rb | 4 +- config/initializers/100-sidekiq.rb | 36 +- config/routes.rb | 2 +- lib/discourse.rb | 2 + lib/scheduler/manager.rb | 360 ------------------ lib/scheduler/schedule.rb | 37 -- lib/scheduler/schedule_info.rb | 138 ------- lib/scheduler/scheduler.rb | 7 - lib/scheduler/views/history.erb | 47 --- lib/scheduler/views/scheduler.erb | 73 ---- lib/scheduler/web.rb | 65 ---- lib/tasks/scheduler.rake | 2 +- spec/components/scheduler/manager_spec.rb | 255 ------------- .../scheduler/schedule_info_spec.rb | 103 ----- 16 files changed, 28 insertions(+), 1106 deletions(-) delete mode 100644 lib/scheduler/manager.rb delete mode 100644 lib/scheduler/schedule.rb delete mode 100644 lib/scheduler/schedule_info.rb delete mode 100644 lib/scheduler/scheduler.rb delete mode 100644 lib/scheduler/views/history.erb delete mode 100644 lib/scheduler/views/scheduler.erb delete mode 100644 lib/scheduler/web.rb delete mode 100644 spec/components/scheduler/manager_spec.rb delete mode 100644 spec/components/scheduler/schedule_info_spec.rb diff --git a/Gemfile b/Gemfile index 3ec355c0a8f..fe489cb02ce 100644 --- a/Gemfile +++ b/Gemfile @@ -88,6 +88,7 @@ gem 'thor', require: false gem 'rinku' gem 'sanitize' gem 'sidekiq' +gem 'mini_scheduler' # for sidekiq web gem 'tilt', require: false diff --git a/Gemfile.lock b/Gemfile.lock index 2118599739d..f9b8ddc561e 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -200,6 +200,7 @@ GEM mini_portile2 (2.3.0) mini_racer (0.2.0) libv8 (>= 6.3) + mini_scheduler (0.8.1) mini_sql (0.1.10) mini_suffix (0.3.0) ffi (~> 1.9) @@ -489,6 +490,7 @@ DEPENDENCIES message_bus mini_mime mini_racer + mini_scheduler mini_sql mini_suffix minitest diff --git a/app/jobs/base.rb b/app/jobs/base.rb index fa16e3179b1..18281145131 100644 --- a/app/jobs/base.rb +++ b/app/jobs/base.rb @@ -1,5 +1,3 @@ -require 'scheduler/scheduler' - module Jobs def self.queued @@ -173,7 +171,7 @@ module Jobs end class Scheduled < Base - extend Scheduler::Schedule + extend MiniScheduler::Schedule def perform(*args) return if Discourse.readonly_mode? diff --git a/config/initializers/100-sidekiq.rb b/config/initializers/100-sidekiq.rb index f361c9fdeef..64b24e1f517 100644 --- a/config/initializers/100-sidekiq.rb +++ b/config/initializers/100-sidekiq.rb @@ -1,4 +1,5 @@ require "sidekiq/pausable" +require "sidekiq/web" Sidekiq.configure_client do |config| config.redis = Discourse.sidekiq_redis_config @@ -12,6 +13,24 @@ Sidekiq.configure_server do |config| end end +MiniScheduler.configure do |config| + + config.redis = $redis + + config.job_exception_handler do |ex, context| + Discourse.handle_job_exception(ex, context) + end + + config.job_ran do |stat| + DiscourseEvent.trigger(:scheduled_job_ran, stat) + end + + config.before_sidekiq_web_request do + RailsMultisite::ConnectionManagement.establish_connection(db: 'default') + end + +end + if Sidekiq.server? # defer queue should simply run in sidekiq Scheduler::Defer.async = false @@ -27,22 +46,7 @@ if Sidekiq.server? scheduler_hostname = ENV["UNICORN_SCHEDULER_HOSTNAME"] if !scheduler_hostname || scheduler_hostname.split(',').include?(`hostname`.strip) - require 'scheduler/scheduler' - manager = Scheduler::Manager.new($redis.without_namespace) - Scheduler::Manager.discover_schedules.each do |schedule| - manager.ensure_schedule!(schedule) - end - Thread.new do - while true - begin - manager.tick - rescue => e - # the show must go on - Discourse.handle_job_exception(e, message: "While ticking scheduling manager") - end - sleep 1 - end - end + MiniScheduler.start end end end diff --git a/config/routes.rb b/config/routes.rb index a9c0aa4a1ae..c140d73538f 100644 --- a/config/routes.rb +++ b/config/routes.rb @@ -1,5 +1,5 @@ require "sidekiq/web" -require_dependency "scheduler/web" +require "mini_scheduler/web" require_dependency "admin_constraint" require_dependency "staff_constraint" require_dependency "homepage_constraint" diff --git a/lib/discourse.rb b/lib/discourse.rb index 9160e887f04..a00aaea1e44 100644 --- a/lib/discourse.rb +++ b/lib/discourse.rb @@ -43,6 +43,8 @@ module Discourse # other desired context. # See app/jobs/base.rb for the error_context function. def self.handle_job_exception(ex, context = {}, parent_logger = nil) + return if ex.class == Jobs::HandledExceptionWrapper + context ||= {} parent_logger ||= SidekiqExceptionHandler diff --git a/lib/scheduler/manager.rb b/lib/scheduler/manager.rb deleted file mode 100644 index e9d98b9abd3..00000000000 --- a/lib/scheduler/manager.rb +++ /dev/null @@ -1,360 +0,0 @@ -# Initially we used sidetiq, this was a problem: -# -# 1. No mechnism to add "randomisation" into job execution -# 2. No stats about previous runs or failures -# 3. Dependency on ice_cube gem causes runaway CPU - -require_dependency 'distributed_mutex' - -module Scheduler - class Manager - attr_accessor :random_ratio, :redis, :enable_stats - - class Runner - def initialize(manager) - @stopped = false - @mutex = Mutex.new - @queue = Queue.new - @manager = manager - @reschedule_orphans_thread = Thread.new do - while !@stopped - sleep 1.minute - @mutex.synchronize do - reschedule_orphans - end - end - end - @keep_alive_thread = Thread.new do - while !@stopped - @mutex.synchronize do - keep_alive - end - sleep (@manager.keep_alive_duration / 2) - end - end - @thread = Thread.new do - while !@stopped - process_queue - end - end - end - - def keep_alive - @manager.keep_alive - rescue => ex - Discourse.handle_job_exception(ex, message: "Scheduling manager keep-alive") - end - - def reschedule_orphans - @manager.reschedule_orphans! - rescue => ex - Discourse.handle_job_exception(ex, message: "Scheduling manager orphan rescheduler") - end - - def hostname - @hostname ||= begin - `hostname` - rescue - "unknown" - end - end - - def process_queue - - klass = @queue.deq - return unless klass - - # hack alert, I need to both deq and set @running atomically. - @running = true - failed = false - start = Time.now.to_f - info = @mutex.synchronize { @manager.schedule_info(klass) } - stat = nil - error = nil - - begin - info.prev_result = "RUNNING" - @mutex.synchronize { info.write! } - - if @manager.enable_stats - RailsMultisite::ConnectionManagement.with_connection("default") do - stat = SchedulerStat.create!( - name: klass.to_s, - hostname: hostname, - pid: Process.pid, - started_at: Time.zone.now, - live_slots_start: GC.stat[:heap_live_slots] - ) - end - end - - klass.new.perform - rescue => e - if e.class != Jobs::HandledExceptionWrapper - Discourse.handle_job_exception(e, message: "Running a scheduled job", job: klass) - end - - error = "#{e.class}: #{e.message} #{e.backtrace.join("\n")}" - failed = true - end - duration = ((Time.now.to_f - start) * 1000).to_i - info.prev_duration = duration - info.prev_result = failed ? "FAILED" : "OK" - info.current_owner = nil - if stat - RailsMultisite::ConnectionManagement.with_connection("default") do - stat.update!( - duration_ms: duration, - live_slots_finish: GC.stat[:heap_live_slots], - success: !failed, - error: error - ) - DiscourseEvent.trigger(:scheduled_job_ran, stat) - end - end - attempts(3) do - @mutex.synchronize { info.write! } - end - rescue => ex - Discourse.handle_job_exception(ex, message: "Processing scheduled job queue") - ensure - @running = false - ActiveRecord::Base.connection_handler.clear_active_connections! - end - - def stop! - return if @stopped - - @mutex.synchronize do - @stopped = true - - @keep_alive_thread.kill - @reschedule_orphans_thread.kill - - @keep_alive_thread.join - @reschedule_orphans_thread.join - - enq(nil) - - kill_thread = Thread.new do - sleep 0.5 - @thread.kill - end - - @thread.join - kill_thread.kill - kill_thread.join - end - end - - def enq(klass) - @queue << klass - end - - def wait_till_done - while !@queue.empty? && !(@queue.num_waiting > 0) - sleep 0.001 - end - # this is a hack, but is only used for test anyway - sleep 0.001 - while @running - sleep 0.001 - end - end - - def attempts(n) - n.times { - begin - yield; break - rescue - sleep Random.rand - end - } - end - - end - - def self.without_runner(redis = nil) - self.new(redis, skip_runner: true) - end - - def initialize(redis = nil, options = nil) - @redis = $redis || redis - @random_ratio = 0.1 - unless options && options[:skip_runner] - @runner = Runner.new(self) - self.class.current = self - end - - @hostname = options && options[:hostname] - @manager_id = SecureRandom.hex - - if options && options.key?(:enable_stats) - @enable_stats = options[:enable_stats] - else - @enable_stats = true - end - end - - def self.current - @current - end - - def self.current=(manager) - @current = manager - end - - def hostname - @hostname ||= `hostname`.strip - end - - def schedule_info(klass) - ScheduleInfo.new(klass, self) - end - - def next_run(klass) - schedule_info(klass).next_run - end - - def ensure_schedule!(klass) - lock do - schedule_info(klass).schedule! - end - end - - def remove(klass) - lock do - schedule_info(klass).del! - end - end - - def reschedule_orphans! - lock do - reschedule_orphans_on! - reschedule_orphans_on!(hostname) - end - end - - def reschedule_orphans_on!(hostname = nil) - redis.zrange(Manager.queue_key(hostname), 0, -1).each do |key| - klass = get_klass(key) - next unless klass - info = schedule_info(klass) - - if ['QUEUED', 'RUNNING'].include?(info.prev_result) && - (info.current_owner.blank? || !redis.get(info.current_owner)) - info.prev_result = 'ORPHAN' - info.next_run = Time.now.to_i - info.write! - end - end - end - - def get_klass(name) - name.constantize - rescue NameError - nil - end - - def tick - lock do - schedule_next_job - schedule_next_job(hostname) - end - end - - def schedule_next_job(hostname = nil) - (key, due), _ = redis.zrange Manager.queue_key(hostname), 0, 0, withscores: true - return unless key - - if due.to_i <= Time.now.to_i - klass = get_klass(key) - unless klass - # corrupt key, nuke it (renamed job or something) - redis.zrem Manager.queue_key(hostname), key - return - end - info = schedule_info(klass) - info.prev_run = Time.now.to_i - info.prev_result = "QUEUED" - info.prev_duration = -1 - info.next_run = nil - info.current_owner = identity_key - info.schedule! - @runner.enq(klass) - end - end - - def blocking_tick - tick - @runner.wait_till_done - end - - def stop! - @runner.stop! - self.class.current = nil - end - - def keep_alive_duration - 60 - end - - def keep_alive - redis.setex identity_key, keep_alive_duration, "" - end - - def lock - DistributedMutex.new(Manager.lock_key).synchronize do - yield - end - end - - def self.discover_schedules - # hack for developemnt reloader is crazytown - # multiple classes with same name can be in - # object space - unique = Set.new - schedules = [] - ObjectSpace.each_object(Scheduler::Schedule) do |schedule| - if schedule.scheduled? - next if unique.include?(schedule.to_s) - schedules << schedule - unique << schedule.to_s - end - end - schedules - end - - @mutex = Mutex.new - def self.seq - @mutex.synchronize do - @i ||= 0 - @i += 1 - end - end - - def identity_key - @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}" - end - - def self.lock_key - "_scheduler_lock_" - end - - def self.queue_key(hostname = nil) - if hostname - "_scheduler_queue_#{hostname}_" - else - "_scheduler_queue_" - end - end - - def self.schedule_key(klass, hostname = nil) - if hostname - "_scheduler_#{klass}_#{hostname}" - else - "_scheduler_#{klass}" - end - end - end -end diff --git a/lib/scheduler/schedule.rb b/lib/scheduler/schedule.rb deleted file mode 100644 index 05c8085bc1f..00000000000 --- a/lib/scheduler/schedule.rb +++ /dev/null @@ -1,37 +0,0 @@ -module Scheduler::Schedule - - def daily(options = nil) - if options - @daily = options - end - @daily - end - - def every(duration = nil) - if duration - @every = duration - if manager = Scheduler::Manager.current - manager.ensure_schedule!(self) - end - end - @every - end - - # schedule job indepndently on each host (looking at hostname) - def per_host - @per_host = true - end - - def is_per_host - @per_host - end - - def schedule_info - manager = Scheduler::Manager.without_runner - manager.schedule_info self - end - - def scheduled? - !!@every || !!@daily - end -end diff --git a/lib/scheduler/schedule_info.rb b/lib/scheduler/schedule_info.rb deleted file mode 100644 index d9348b2ea6b..00000000000 --- a/lib/scheduler/schedule_info.rb +++ /dev/null @@ -1,138 +0,0 @@ -module Scheduler - class ScheduleInfo - attr_accessor :next_run, - :prev_run, - :prev_duration, - :prev_result, - :current_owner - - def initialize(klass, manager) - @klass = klass - @manager = manager - - data = nil - - if data = @manager.redis.get(key) - data = JSON.parse(data) - end - - if data - @next_run = data["next_run"] - @prev_run = data["prev_run"] - @prev_result = data["prev_result"] - @prev_duration = data["prev_duration"] - @current_owner = data["current_owner"] - end - rescue - # corrupt redis - @next_run = @prev_run = @prev_result = @prev_duration = @current_owner = nil - end - - # this means the schedule is going to fire, it is setup correctly - # invalid schedules are fixed by running "schedule!" - # this happens automatically after if fire by the manager - def valid? - return false unless @next_run - (!@prev_run && @next_run < Time.now.to_i + 5.minutes) || valid_every? || valid_daily? - end - - def valid_every? - return false unless @klass.every - !!@prev_run && - @prev_run <= Time.now.to_i && - @next_run < @prev_run + @klass.every * (1 + @manager.random_ratio) - end - - def valid_daily? - return false unless @klass.daily - return true if !@prev_run && @next_run && @next_run <= (Time.zone.now + 1.day).to_i - !!@prev_run && - @prev_run <= Time.zone.now.to_i && - @next_run < @prev_run + 1.day - end - - def schedule_every! - if !valid? && @prev_run - mixup = @klass.every * @manager.random_ratio - mixup = (mixup * Random.rand - mixup / 2).to_i - @next_run = @prev_run + mixup + @klass.every - end - - if !valid? - @next_run = Time.now.to_i + 5.minutes * Random.rand - end - end - - def schedule_daily! - return if valid? - - at = @klass.daily[:at] || 0 - today_begin = Time.zone.now.midnight.to_i - today_offset = DateTime.now.seconds_since_midnight - - # If it's later today - if at > today_offset - @next_run = today_begin + at - else - # Otherwise do it tomorrow - @next_run = today_begin + 1.day + at - end - end - - def schedule! - if @klass.every - schedule_every! - elsif @klass.daily - schedule_daily! - end - - write! - end - - def write! - - clear! - redis.set key, { - next_run: @next_run, - prev_run: @prev_run, - prev_duration: @prev_duration, - prev_result: @prev_result, - current_owner: @current_owner - }.to_json - - redis.zadd queue_key, @next_run, @klass if @next_run - end - - def del! - clear! - @next_run = @prev_run = @prev_result = @prev_duration = @current_owner = nil - end - - def key - if @klass.is_per_host - Manager.schedule_key(@klass, @manager.hostname) - else - Manager.schedule_key(@klass) - end - end - - def queue_key - if @klass.is_per_host - Manager.queue_key(@manager.hostname) - else - Manager.queue_key - end - end - - def redis - @manager.redis - end - - private - def clear! - redis.del key - redis.zrem queue_key, @klass - end - - end -end diff --git a/lib/scheduler/scheduler.rb b/lib/scheduler/scheduler.rb deleted file mode 100644 index e9f389194f0..00000000000 --- a/lib/scheduler/scheduler.rb +++ /dev/null @@ -1,7 +0,0 @@ -module Scheduler -end - -require_dependency 'scheduler/schedule' -require_dependency 'scheduler/schedule_info' -require_dependency 'scheduler/manager' -require_dependency 'scheduler/defer' diff --git a/lib/scheduler/views/history.erb b/lib/scheduler/views/history.erb deleted file mode 100644 index c1cc026ba71..00000000000 --- a/lib/scheduler/views/history.erb +++ /dev/null @@ -1,47 +0,0 @@ -
-
-

Scheduler History

-
-
- -
-
-
- <% if @scheduler_stats.length > 0 %> - - - - - - - - - - - - <% @scheduler_stats.each do |stat| %> - - - - - - - - - <% end %> - -
Job NameHostname:PidLive Slots deltaStarted AtDuration
<%= stat.name %><%= stat.hostname %>:<%= stat.pid %> - <% if stat.live_slots_start && stat.live_slots_finish %> - <%= stat.live_slots_finish - stat.live_slots_start %> - <% end %> - <%= sane_time stat.started_at %><%= sane_duration stat.duration_ms %> - <% if stat.success.nil? %> - RUNNING - <% elsif !stat.success %> - FAILED - <% end %> -
- <% end %> -
-
-
diff --git a/lib/scheduler/views/scheduler.erb b/lib/scheduler/views/scheduler.erb deleted file mode 100644 index 85fb6da8d97..00000000000 --- a/lib/scheduler/views/scheduler.erb +++ /dev/null @@ -1,73 +0,0 @@ -
- <% if Sidekiq.paused? %> -
-
-

SIDEKIQ IS PAUSED!

-
-
- <% end %> -
-

Recurring Jobs history

-
-
- -
-
- -
- <% if @schedules.length > 0 %> - - - - - - - - - - - <% @schedules.each do |schedule| %> - <% @info = schedule.schedule_info %> - - - - - - - - - <% end %> -
WorkerLast RunLast ResultLast DurationLast OwnerNext Run DueActions
- <%= schedule %> - - <% prev = @info.prev_run %> - <% if prev.nil? %> - Never - <% else %> - <%= relative_time(Time.at(prev)) %> - <% end %> - - <%= @info.prev_result %> - - <%= sane_duration @info.prev_duration %> - - <%= @info.current_owner %> - - <% next_run = @info.next_run %> - <% if next_run.nil? %> - Not Scheduled Yet - <% else %> - <%= relative_time(Time.at(next_run)) %> - <% end %> - -
" method="post"> - <%= csrf_tag if respond_to?(:csrf_tag) %> - -
-
- <% else %> -
No recurring jobs found.
- <% end %> -
-
-
diff --git a/lib/scheduler/web.rb b/lib/scheduler/web.rb deleted file mode 100644 index b2deac7bf6e..00000000000 --- a/lib/scheduler/web.rb +++ /dev/null @@ -1,65 +0,0 @@ -# Based off sidetiq https://github.com/tobiassvn/sidetiq/blob/master/lib/sidetiq/web.rb -module Scheduler - module Web - VIEWS = File.expand_path('views', File.dirname(__FILE__)) unless defined? VIEWS - - def self.registered(app) - - app.helpers do - def sane_time(time) - return unless time - time - end - - def sane_duration(duration) - return unless duration - if duration < 1000 - "#{duration}ms" - elsif duration < 60 * 1000 - "#{'%.2f' % (duration / 1000.0)} secs" - end - end - end - - app.get "/scheduler" do - RailsMultisite::ConnectionManagement.with_connection("default") do - @manager = Scheduler::Manager.without_runner - @schedules = Scheduler::Manager.discover_schedules.sort do |a, b| - a_next = a.schedule_info.next_run - b_next = b.schedule_info.next_run - if a_next && b_next - a_next <=> b_next - elsif a_next - -1 - else - 1 - end - end - erb File.read(File.join(VIEWS, 'scheduler.erb')), locals: { view_path: VIEWS } - end - end - - app.get "/scheduler/history" do - @scheduler_stats = SchedulerStat.order('started_at desc').limit(200) - erb File.read(File.join(VIEWS, 'history.erb')), locals: { view_path: VIEWS } - end - - app.post "/scheduler/:name/trigger" do - halt 404 unless (name = params[:name]) - - RailsMultisite::ConnectionManagement.with_connection("default") do - klass = name.constantize - info = klass.schedule_info - info.next_run = Time.now.to_i - info.write! - - redirect "#{root_path}scheduler" - end - end - - end - end -end - -Sidekiq::Web.register(Scheduler::Web) -Sidekiq::Web.tabs["Scheduler"] = "scheduler" diff --git a/lib/tasks/scheduler.rake b/lib/tasks/scheduler.rake index 14e40188281..23b7cacf691 100644 --- a/lib/tasks/scheduler.rake +++ b/lib/tasks/scheduler.rake @@ -28,7 +28,7 @@ end desc "run every task the scheduler knows about in that order, use only for debugging" task 'scheduler:run_all' => :environment do - Scheduler::Manager.discover_schedules.each do |schedule| + MiniScheduler::Manager.discover_schedules.each do |schedule| puts "Running #{schedule}" time { schedule.new.execute({}) } end diff --git a/spec/components/scheduler/manager_spec.rb b/spec/components/scheduler/manager_spec.rb deleted file mode 100644 index aeaa9a3e648..00000000000 --- a/spec/components/scheduler/manager_spec.rb +++ /dev/null @@ -1,255 +0,0 @@ -# encoding: utf-8 -require 'rails_helper' -require 'scheduler/scheduler' - -describe Scheduler::Manager do - - module Testing - class RandomJob - extend ::Scheduler::Schedule - - def self.runs=(val) - @runs = val - end - - def self.runs - @runs ||= 0 - end - - every 5.minutes - - def perform - self.class.runs += 1 - sleep 0.001 - end - end - - class SuperLongJob - extend ::Scheduler::Schedule - - every 10.minutes - - def perform - sleep 1000 - end - end - - class PerHostJob - extend ::Scheduler::Schedule - - per_host - every 10.minutes - - def self.runs=(val) - @runs = val - end - - def self.runs - @runs ||= 0 - end - - def perform - self.class.runs += 1 - end - end - end - - let(:manager) { - Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false) - } - - before do - expect(ActiveRecord::Base.connection_pool.connections.length).to eq(1) - @thread_count = Thread.list.count - - @backtraces = {} - Thread.list.each do |t| - @backtraces[t.object_id] = t.backtrace - end - end - - after do - manager.stop! - manager.remove(Testing::RandomJob) - manager.remove(Testing::SuperLongJob) - manager.remove(Testing::PerHostJob) - $redis.flushall - - # connections that are not in use must be removed - # otherwise active record gets super confused - ActiveRecord::Base.connection_pool.connections.reject { |c| c.in_use? }.each do |c| - ActiveRecord::Base.connection_pool.remove(c) - end - expect(ActiveRecord::Base.connection_pool.connections.length).to (be <= 1) - - on_thread_mismatch = lambda do - current = Thread.list.map { |t| t.object_id } - - old_threads = @backtraces.keys - extra = current - old_threads - - missing = old_threads - current - - if missing.length > 0 - STDERR.puts "\nMissing Threads #{missing.length} thread/s" - missing.each do |id| - STDERR.puts @backtraces[id] - STDERR.puts - end - end - - if extra.length > 0 - Thread.list.each do |thread| - if extra.include?(thread.object_id) - STDERR.puts "\nExtra Thread Backtrace:" - STDERR.puts thread.backtrace - STDERR.puts - end - end - end - end - - wait_for(on_fail: on_thread_mismatch) do - @thread_count == Thread.list.count - end - end - - it 'can disable stats' do - manager = Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false) - expect(manager.enable_stats).to eq(false) - manager.stop! - - manager = Scheduler::Manager.new(DiscourseRedis.new) - expect(manager.enable_stats).to eq(true) - manager.stop! - end - - describe 'per host jobs' do - it "correctly schedules on multiple hosts" do - - freeze_time - - Testing::PerHostJob.runs = 0 - - hosts = ['a', 'b', 'c'] - - hosts.map do |host| - - manager = Scheduler::Manager.new(DiscourseRedis.new, hostname: host, enable_stats: false) - manager.ensure_schedule!(Testing::PerHostJob) - - info = manager.schedule_info(Testing::PerHostJob) - info.next_run = Time.now.to_i - 10 - info.write! - - manager - - end.each do |manager| - - manager.blocking_tick - manager.stop! - - end - - expect(Testing::PerHostJob.runs).to eq(3) - end - end - - describe '#sync' do - - it 'increases' do - expect(Scheduler::Manager.seq).to eq(Scheduler::Manager.seq - 1) - end - end - - describe '#tick' do - - it 'should nuke missing jobs' do - $redis.zadd Scheduler::Manager.queue_key, Time.now.to_i - 1000, "BLABLA" - manager.tick - expect($redis.zcard(Scheduler::Manager.queue_key)).to eq(0) - end - - it 'should recover from crashed manager' do - - info = manager.schedule_info(Testing::SuperLongJob) - info.next_run = Time.now.to_i - 1 - info.write! - - manager.tick - manager.stop! - - $redis.del manager.identity_key - - manager = Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false) - manager.reschedule_orphans! - - info = manager.schedule_info(Testing::SuperLongJob) - expect(info.next_run).to be <= Time.now.to_i - - manager.stop! - end - - it 'should log when job finishes running' do - - Testing::RandomJob.runs = 0 - - info = manager.schedule_info(Testing::RandomJob) - info.next_run = Time.now.to_i - 1 - info.write! - - # with stats so we must be careful to cleanup - manager = Scheduler::Manager.new(DiscourseRedis.new) - manager.blocking_tick - manager.stop! - - stat = SchedulerStat.first - expect(stat).to be_present - expect(stat.duration_ms).to be > 0 - expect(stat.success).to be true - SchedulerStat.destroy_all - end - - it 'should only run pending job once' do - - Testing::RandomJob.runs = 0 - - info = manager.schedule_info(Testing::RandomJob) - info.next_run = Time.now.to_i - 1 - info.write! - - (0..5).map do - Thread.new do - manager = Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false) - manager.blocking_tick - manager.stop! - end - end.map(&:join) - - expect(Testing::RandomJob.runs).to eq(1) - - info = manager.schedule_info(Testing::RandomJob) - expect(info.prev_run).to be <= Time.now.to_i - expect(info.prev_duration).to be > 0 - expect(info.prev_result).to eq("OK") - end - - end - - describe '#discover_schedules' do - it 'Discovers Testing::RandomJob' do - expect(Scheduler::Manager.discover_schedules).to include(Testing::RandomJob) - end - end - - describe '#next_run' do - it 'should be within the next 5 mins if it never ran' do - - manager.remove(Testing::RandomJob) - manager.ensure_schedule!(Testing::RandomJob) - - expect(manager.next_run(Testing::RandomJob)) - .to be_within(5.minutes.to_i).of(Time.now.to_i + 5.minutes) - end - end -end diff --git a/spec/components/scheduler/schedule_info_spec.rb b/spec/components/scheduler/schedule_info_spec.rb deleted file mode 100644 index 75fc3bc1bbf..00000000000 --- a/spec/components/scheduler/schedule_info_spec.rb +++ /dev/null @@ -1,103 +0,0 @@ -# encoding: utf-8 -require 'rails_helper' -require 'scheduler/scheduler' - -describe Scheduler::ScheduleInfo do - - let(:manager) { Scheduler::Manager.new } - - context "every" do - class RandomJob - extend ::Scheduler::Schedule - - every 1.hour - - def perform - # work_it - end - end - - before do - @info = manager.schedule_info(RandomJob) - @info.del! - end - - after do - manager.stop! - $redis.del manager.class.queue_key - end - - it "is a scheduled job" do - expect(RandomJob).to be_scheduled - end - - it 'starts off invalid' do - expect(@info.valid?).to eq(false) - end - - it 'will have a due date in the next 5 minutes if it was blank' do - @info.schedule! - expect(@info.valid?).to eq(true) - expect(@info.next_run).to be_within(5.minutes).of(Time.now.to_i) - end - - it 'will have a due date within the next hour if it just ran' do - @info.prev_run = Time.now.to_i - @info.schedule! - expect(@info.valid?).to eq(true) - expect(@info.next_run).to be_within(1.hour * manager.random_ratio).of(Time.now.to_i + 1.hour) - end - - it 'is invalid if way in the future' do - @info.next_run = Time.now.to_i + 1.year - expect(@info.valid?).to eq(false) - end - end - - context "daily" do - - class DailyJob - extend ::Scheduler::Schedule - daily at: 11.hours - - def perform - end - end - - before do - freeze_time Time.parse("2010-01-10 10:00:00") - - @info = manager.schedule_info(DailyJob) - @info.del! - end - - after do - manager.stop! - $redis.del manager.class.queue_key - end - - it "is a scheduled job" do - expect(DailyJob).to be_scheduled - end - - it "starts off invalid" do - expect(@info.valid?).to eq(false) - end - - it "will have a due date at the appropriate time if blank" do - expect(@info.next_run).to eq(nil) - @info.schedule! - - expect(JSON.parse($redis.get(@info.key))["next_run"]) - .to eq((Time.zone.now.midnight + 11.hours).to_i) - - expect(@info.valid?).to eq(true) - end - - it 'is invalid if way in the future' do - @info.next_run = Time.now.to_i + 1.year - expect(@info.valid?).to eq(false) - end - end - -end