diff --git a/Gemfile b/Gemfile
index 3ec355c0a8f..fe489cb02ce 100644
--- a/Gemfile
+++ b/Gemfile
@@ -88,6 +88,7 @@ gem 'thor', require: false
gem 'rinku'
gem 'sanitize'
gem 'sidekiq'
+gem 'mini_scheduler'
# for sidekiq web
gem 'tilt', require: false
diff --git a/Gemfile.lock b/Gemfile.lock
index 2118599739d..f9b8ddc561e 100644
--- a/Gemfile.lock
+++ b/Gemfile.lock
@@ -200,6 +200,7 @@ GEM
mini_portile2 (2.3.0)
mini_racer (0.2.0)
libv8 (>= 6.3)
+ mini_scheduler (0.8.1)
mini_sql (0.1.10)
mini_suffix (0.3.0)
ffi (~> 1.9)
@@ -489,6 +490,7 @@ DEPENDENCIES
message_bus
mini_mime
mini_racer
+ mini_scheduler
mini_sql
mini_suffix
minitest
diff --git a/app/jobs/base.rb b/app/jobs/base.rb
index fa16e3179b1..18281145131 100644
--- a/app/jobs/base.rb
+++ b/app/jobs/base.rb
@@ -1,5 +1,3 @@
-require 'scheduler/scheduler'
-
module Jobs
def self.queued
@@ -173,7 +171,7 @@ module Jobs
end
class Scheduled < Base
- extend Scheduler::Schedule
+ extend MiniScheduler::Schedule
def perform(*args)
return if Discourse.readonly_mode?
diff --git a/config/initializers/100-sidekiq.rb b/config/initializers/100-sidekiq.rb
index f361c9fdeef..64b24e1f517 100644
--- a/config/initializers/100-sidekiq.rb
+++ b/config/initializers/100-sidekiq.rb
@@ -1,4 +1,5 @@
require "sidekiq/pausable"
+require "sidekiq/web"
Sidekiq.configure_client do |config|
config.redis = Discourse.sidekiq_redis_config
@@ -12,6 +13,24 @@ Sidekiq.configure_server do |config|
end
end
+MiniScheduler.configure do |config|
+
+ config.redis = $redis
+
+ config.job_exception_handler do |ex, context|
+ Discourse.handle_job_exception(ex, context)
+ end
+
+ config.job_ran do |stat|
+ DiscourseEvent.trigger(:scheduled_job_ran, stat)
+ end
+
+ config.before_sidekiq_web_request do
+ RailsMultisite::ConnectionManagement.establish_connection(db: 'default')
+ end
+
+end
+
if Sidekiq.server?
# defer queue should simply run in sidekiq
Scheduler::Defer.async = false
@@ -27,22 +46,7 @@ if Sidekiq.server?
scheduler_hostname = ENV["UNICORN_SCHEDULER_HOSTNAME"]
if !scheduler_hostname || scheduler_hostname.split(',').include?(`hostname`.strip)
- require 'scheduler/scheduler'
- manager = Scheduler::Manager.new($redis.without_namespace)
- Scheduler::Manager.discover_schedules.each do |schedule|
- manager.ensure_schedule!(schedule)
- end
- Thread.new do
- while true
- begin
- manager.tick
- rescue => e
- # the show must go on
- Discourse.handle_job_exception(e, message: "While ticking scheduling manager")
- end
- sleep 1
- end
- end
+ MiniScheduler.start
end
end
end
diff --git a/config/routes.rb b/config/routes.rb
index a9c0aa4a1ae..c140d73538f 100644
--- a/config/routes.rb
+++ b/config/routes.rb
@@ -1,5 +1,5 @@
require "sidekiq/web"
-require_dependency "scheduler/web"
+require "mini_scheduler/web"
require_dependency "admin_constraint"
require_dependency "staff_constraint"
require_dependency "homepage_constraint"
diff --git a/lib/discourse.rb b/lib/discourse.rb
index 9160e887f04..a00aaea1e44 100644
--- a/lib/discourse.rb
+++ b/lib/discourse.rb
@@ -43,6 +43,8 @@ module Discourse
# other desired context.
# See app/jobs/base.rb for the error_context function.
def self.handle_job_exception(ex, context = {}, parent_logger = nil)
+ return if ex.class == Jobs::HandledExceptionWrapper
+
context ||= {}
parent_logger ||= SidekiqExceptionHandler
diff --git a/lib/scheduler/manager.rb b/lib/scheduler/manager.rb
deleted file mode 100644
index e9d98b9abd3..00000000000
--- a/lib/scheduler/manager.rb
+++ /dev/null
@@ -1,360 +0,0 @@
-# Initially we used sidetiq, this was a problem:
-#
-# 1. No mechnism to add "randomisation" into job execution
-# 2. No stats about previous runs or failures
-# 3. Dependency on ice_cube gem causes runaway CPU
-
-require_dependency 'distributed_mutex'
-
-module Scheduler
- class Manager
- attr_accessor :random_ratio, :redis, :enable_stats
-
- class Runner
- def initialize(manager)
- @stopped = false
- @mutex = Mutex.new
- @queue = Queue.new
- @manager = manager
- @reschedule_orphans_thread = Thread.new do
- while !@stopped
- sleep 1.minute
- @mutex.synchronize do
- reschedule_orphans
- end
- end
- end
- @keep_alive_thread = Thread.new do
- while !@stopped
- @mutex.synchronize do
- keep_alive
- end
- sleep (@manager.keep_alive_duration / 2)
- end
- end
- @thread = Thread.new do
- while !@stopped
- process_queue
- end
- end
- end
-
- def keep_alive
- @manager.keep_alive
- rescue => ex
- Discourse.handle_job_exception(ex, message: "Scheduling manager keep-alive")
- end
-
- def reschedule_orphans
- @manager.reschedule_orphans!
- rescue => ex
- Discourse.handle_job_exception(ex, message: "Scheduling manager orphan rescheduler")
- end
-
- def hostname
- @hostname ||= begin
- `hostname`
- rescue
- "unknown"
- end
- end
-
- def process_queue
-
- klass = @queue.deq
- return unless klass
-
- # hack alert, I need to both deq and set @running atomically.
- @running = true
- failed = false
- start = Time.now.to_f
- info = @mutex.synchronize { @manager.schedule_info(klass) }
- stat = nil
- error = nil
-
- begin
- info.prev_result = "RUNNING"
- @mutex.synchronize { info.write! }
-
- if @manager.enable_stats
- RailsMultisite::ConnectionManagement.with_connection("default") do
- stat = SchedulerStat.create!(
- name: klass.to_s,
- hostname: hostname,
- pid: Process.pid,
- started_at: Time.zone.now,
- live_slots_start: GC.stat[:heap_live_slots]
- )
- end
- end
-
- klass.new.perform
- rescue => e
- if e.class != Jobs::HandledExceptionWrapper
- Discourse.handle_job_exception(e, message: "Running a scheduled job", job: klass)
- end
-
- error = "#{e.class}: #{e.message} #{e.backtrace.join("\n")}"
- failed = true
- end
- duration = ((Time.now.to_f - start) * 1000).to_i
- info.prev_duration = duration
- info.prev_result = failed ? "FAILED" : "OK"
- info.current_owner = nil
- if stat
- RailsMultisite::ConnectionManagement.with_connection("default") do
- stat.update!(
- duration_ms: duration,
- live_slots_finish: GC.stat[:heap_live_slots],
- success: !failed,
- error: error
- )
- DiscourseEvent.trigger(:scheduled_job_ran, stat)
- end
- end
- attempts(3) do
- @mutex.synchronize { info.write! }
- end
- rescue => ex
- Discourse.handle_job_exception(ex, message: "Processing scheduled job queue")
- ensure
- @running = false
- ActiveRecord::Base.connection_handler.clear_active_connections!
- end
-
- def stop!
- return if @stopped
-
- @mutex.synchronize do
- @stopped = true
-
- @keep_alive_thread.kill
- @reschedule_orphans_thread.kill
-
- @keep_alive_thread.join
- @reschedule_orphans_thread.join
-
- enq(nil)
-
- kill_thread = Thread.new do
- sleep 0.5
- @thread.kill
- end
-
- @thread.join
- kill_thread.kill
- kill_thread.join
- end
- end
-
- def enq(klass)
- @queue << klass
- end
-
- def wait_till_done
- while !@queue.empty? && !(@queue.num_waiting > 0)
- sleep 0.001
- end
- # this is a hack, but is only used for test anyway
- sleep 0.001
- while @running
- sleep 0.001
- end
- end
-
- def attempts(n)
- n.times {
- begin
- yield; break
- rescue
- sleep Random.rand
- end
- }
- end
-
- end
-
- def self.without_runner(redis = nil)
- self.new(redis, skip_runner: true)
- end
-
- def initialize(redis = nil, options = nil)
- @redis = $redis || redis
- @random_ratio = 0.1
- unless options && options[:skip_runner]
- @runner = Runner.new(self)
- self.class.current = self
- end
-
- @hostname = options && options[:hostname]
- @manager_id = SecureRandom.hex
-
- if options && options.key?(:enable_stats)
- @enable_stats = options[:enable_stats]
- else
- @enable_stats = true
- end
- end
-
- def self.current
- @current
- end
-
- def self.current=(manager)
- @current = manager
- end
-
- def hostname
- @hostname ||= `hostname`.strip
- end
-
- def schedule_info(klass)
- ScheduleInfo.new(klass, self)
- end
-
- def next_run(klass)
- schedule_info(klass).next_run
- end
-
- def ensure_schedule!(klass)
- lock do
- schedule_info(klass).schedule!
- end
- end
-
- def remove(klass)
- lock do
- schedule_info(klass).del!
- end
- end
-
- def reschedule_orphans!
- lock do
- reschedule_orphans_on!
- reschedule_orphans_on!(hostname)
- end
- end
-
- def reschedule_orphans_on!(hostname = nil)
- redis.zrange(Manager.queue_key(hostname), 0, -1).each do |key|
- klass = get_klass(key)
- next unless klass
- info = schedule_info(klass)
-
- if ['QUEUED', 'RUNNING'].include?(info.prev_result) &&
- (info.current_owner.blank? || !redis.get(info.current_owner))
- info.prev_result = 'ORPHAN'
- info.next_run = Time.now.to_i
- info.write!
- end
- end
- end
-
- def get_klass(name)
- name.constantize
- rescue NameError
- nil
- end
-
- def tick
- lock do
- schedule_next_job
- schedule_next_job(hostname)
- end
- end
-
- def schedule_next_job(hostname = nil)
- (key, due), _ = redis.zrange Manager.queue_key(hostname), 0, 0, withscores: true
- return unless key
-
- if due.to_i <= Time.now.to_i
- klass = get_klass(key)
- unless klass
- # corrupt key, nuke it (renamed job or something)
- redis.zrem Manager.queue_key(hostname), key
- return
- end
- info = schedule_info(klass)
- info.prev_run = Time.now.to_i
- info.prev_result = "QUEUED"
- info.prev_duration = -1
- info.next_run = nil
- info.current_owner = identity_key
- info.schedule!
- @runner.enq(klass)
- end
- end
-
- def blocking_tick
- tick
- @runner.wait_till_done
- end
-
- def stop!
- @runner.stop!
- self.class.current = nil
- end
-
- def keep_alive_duration
- 60
- end
-
- def keep_alive
- redis.setex identity_key, keep_alive_duration, ""
- end
-
- def lock
- DistributedMutex.new(Manager.lock_key).synchronize do
- yield
- end
- end
-
- def self.discover_schedules
- # hack for developemnt reloader is crazytown
- # multiple classes with same name can be in
- # object space
- unique = Set.new
- schedules = []
- ObjectSpace.each_object(Scheduler::Schedule) do |schedule|
- if schedule.scheduled?
- next if unique.include?(schedule.to_s)
- schedules << schedule
- unique << schedule.to_s
- end
- end
- schedules
- end
-
- @mutex = Mutex.new
- def self.seq
- @mutex.synchronize do
- @i ||= 0
- @i += 1
- end
- end
-
- def identity_key
- @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}"
- end
-
- def self.lock_key
- "_scheduler_lock_"
- end
-
- def self.queue_key(hostname = nil)
- if hostname
- "_scheduler_queue_#{hostname}_"
- else
- "_scheduler_queue_"
- end
- end
-
- def self.schedule_key(klass, hostname = nil)
- if hostname
- "_scheduler_#{klass}_#{hostname}"
- else
- "_scheduler_#{klass}"
- end
- end
- end
-end
diff --git a/lib/scheduler/schedule.rb b/lib/scheduler/schedule.rb
deleted file mode 100644
index 05c8085bc1f..00000000000
--- a/lib/scheduler/schedule.rb
+++ /dev/null
@@ -1,37 +0,0 @@
-module Scheduler::Schedule
-
- def daily(options = nil)
- if options
- @daily = options
- end
- @daily
- end
-
- def every(duration = nil)
- if duration
- @every = duration
- if manager = Scheduler::Manager.current
- manager.ensure_schedule!(self)
- end
- end
- @every
- end
-
- # schedule job indepndently on each host (looking at hostname)
- def per_host
- @per_host = true
- end
-
- def is_per_host
- @per_host
- end
-
- def schedule_info
- manager = Scheduler::Manager.without_runner
- manager.schedule_info self
- end
-
- def scheduled?
- !!@every || !!@daily
- end
-end
diff --git a/lib/scheduler/schedule_info.rb b/lib/scheduler/schedule_info.rb
deleted file mode 100644
index d9348b2ea6b..00000000000
--- a/lib/scheduler/schedule_info.rb
+++ /dev/null
@@ -1,138 +0,0 @@
-module Scheduler
- class ScheduleInfo
- attr_accessor :next_run,
- :prev_run,
- :prev_duration,
- :prev_result,
- :current_owner
-
- def initialize(klass, manager)
- @klass = klass
- @manager = manager
-
- data = nil
-
- if data = @manager.redis.get(key)
- data = JSON.parse(data)
- end
-
- if data
- @next_run = data["next_run"]
- @prev_run = data["prev_run"]
- @prev_result = data["prev_result"]
- @prev_duration = data["prev_duration"]
- @current_owner = data["current_owner"]
- end
- rescue
- # corrupt redis
- @next_run = @prev_run = @prev_result = @prev_duration = @current_owner = nil
- end
-
- # this means the schedule is going to fire, it is setup correctly
- # invalid schedules are fixed by running "schedule!"
- # this happens automatically after if fire by the manager
- def valid?
- return false unless @next_run
- (!@prev_run && @next_run < Time.now.to_i + 5.minutes) || valid_every? || valid_daily?
- end
-
- def valid_every?
- return false unless @klass.every
- !!@prev_run &&
- @prev_run <= Time.now.to_i &&
- @next_run < @prev_run + @klass.every * (1 + @manager.random_ratio)
- end
-
- def valid_daily?
- return false unless @klass.daily
- return true if !@prev_run && @next_run && @next_run <= (Time.zone.now + 1.day).to_i
- !!@prev_run &&
- @prev_run <= Time.zone.now.to_i &&
- @next_run < @prev_run + 1.day
- end
-
- def schedule_every!
- if !valid? && @prev_run
- mixup = @klass.every * @manager.random_ratio
- mixup = (mixup * Random.rand - mixup / 2).to_i
- @next_run = @prev_run + mixup + @klass.every
- end
-
- if !valid?
- @next_run = Time.now.to_i + 5.minutes * Random.rand
- end
- end
-
- def schedule_daily!
- return if valid?
-
- at = @klass.daily[:at] || 0
- today_begin = Time.zone.now.midnight.to_i
- today_offset = DateTime.now.seconds_since_midnight
-
- # If it's later today
- if at > today_offset
- @next_run = today_begin + at
- else
- # Otherwise do it tomorrow
- @next_run = today_begin + 1.day + at
- end
- end
-
- def schedule!
- if @klass.every
- schedule_every!
- elsif @klass.daily
- schedule_daily!
- end
-
- write!
- end
-
- def write!
-
- clear!
- redis.set key, {
- next_run: @next_run,
- prev_run: @prev_run,
- prev_duration: @prev_duration,
- prev_result: @prev_result,
- current_owner: @current_owner
- }.to_json
-
- redis.zadd queue_key, @next_run, @klass if @next_run
- end
-
- def del!
- clear!
- @next_run = @prev_run = @prev_result = @prev_duration = @current_owner = nil
- end
-
- def key
- if @klass.is_per_host
- Manager.schedule_key(@klass, @manager.hostname)
- else
- Manager.schedule_key(@klass)
- end
- end
-
- def queue_key
- if @klass.is_per_host
- Manager.queue_key(@manager.hostname)
- else
- Manager.queue_key
- end
- end
-
- def redis
- @manager.redis
- end
-
- private
- def clear!
- redis.del key
- redis.zrem queue_key, @klass
- end
-
- end
-end
diff --git a/lib/scheduler/scheduler.rb b/lib/scheduler/scheduler.rb
deleted file mode 100644
index e9f389194f0..00000000000
--- a/lib/scheduler/scheduler.rb
+++ /dev/null
@@ -1,7 +0,0 @@
-module Scheduler
-end
-
-require_dependency 'scheduler/schedule'
-require_dependency 'scheduler/schedule_info'
-require_dependency 'scheduler/manager'
-require_dependency 'scheduler/defer'
diff --git a/lib/scheduler/views/history.erb b/lib/scheduler/views/history.erb
deleted file mode 100644
index c1cc026ba71..00000000000
--- a/lib/scheduler/views/history.erb
+++ /dev/null
@@ -1,47 +0,0 @@
-
-
-
Scheduler History
-
-
-
-
-
-
- <% if @scheduler_stats.length > 0 %>
-
-
-
- Job Name |
- Hostname:Pid |
- Live Slots delta |
- Started At |
- Duration |
- |
-
-
- <% @scheduler_stats.each do |stat| %>
-
- <%= stat.name %> |
- <%= stat.hostname %>:<%= stat.pid %> |
-
- <% if stat.live_slots_start && stat.live_slots_finish %>
- <%= stat.live_slots_finish - stat.live_slots_start %>
- <% end %>
- |
- <%= sane_time stat.started_at %> |
- <%= sane_duration stat.duration_ms %> |
-
- <% if stat.success.nil? %>
- RUNNING
- <% elsif !stat.success %>
- FAILED
- <% end %>
- |
-
- <% end %>
-
-
- <% end %>
-
-
-
diff --git a/lib/scheduler/views/scheduler.erb b/lib/scheduler/views/scheduler.erb
deleted file mode 100644
index 85fb6da8d97..00000000000
--- a/lib/scheduler/views/scheduler.erb
+++ /dev/null
@@ -1,73 +0,0 @@
-
- <% if Sidekiq.paused? %>
-
-
-
SIDEKIQ IS PAUSED!
-
-
- <% end %>
-
-
-
-
-
-
-
- <% if @schedules.length > 0 %>
-
-
- Worker |
- Last Run |
- Last Result |
- Last Duration |
- Last Owner |
- Next Run Due |
- Actions |
-
- <% @schedules.each do |schedule| %>
- <% @info = schedule.schedule_info %>
-
-
- <%= schedule %>
- |
- <% prev = @info.prev_run %>
- <% if prev.nil? %>
- Never
- <% else %>
- <%= relative_time(Time.at(prev)) %>
- <% end %>
- |
-
- <%= @info.prev_result %>
- |
-
- <%= sane_duration @info.prev_duration %>
- |
-
- <%= @info.current_owner %>
- |
-
- <% next_run = @info.next_run %>
- <% if next_run.nil? %>
- Not Scheduled Yet
- <% else %>
- <%= relative_time(Time.at(next_run)) %>
- <% end %>
- |
-
-
- |
-
- <% end %>
-
- <% else %>
-
No recurring jobs found.
- <% end %>
-
-
-
diff --git a/lib/scheduler/web.rb b/lib/scheduler/web.rb
deleted file mode 100644
index b2deac7bf6e..00000000000
--- a/lib/scheduler/web.rb
+++ /dev/null
@@ -1,65 +0,0 @@
-# Based off sidetiq https://github.com/tobiassvn/sidetiq/blob/master/lib/sidetiq/web.rb
-module Scheduler
- module Web
- VIEWS = File.expand_path('views', File.dirname(__FILE__)) unless defined? VIEWS
-
- def self.registered(app)
-
- app.helpers do
- def sane_time(time)
- return unless time
- time
- end
-
- def sane_duration(duration)
- return unless duration
- if duration < 1000
- "#{duration}ms"
- elsif duration < 60 * 1000
- "#{'%.2f' % (duration / 1000.0)} secs"
- end
- end
- end
-
- app.get "/scheduler" do
- RailsMultisite::ConnectionManagement.with_connection("default") do
- @manager = Scheduler::Manager.without_runner
- @schedules = Scheduler::Manager.discover_schedules.sort do |a, b|
- a_next = a.schedule_info.next_run
- b_next = b.schedule_info.next_run
- if a_next && b_next
- a_next <=> b_next
- elsif a_next
- -1
- else
- 1
- end
- end
- erb File.read(File.join(VIEWS, 'scheduler.erb')), locals: { view_path: VIEWS }
- end
- end
-
- app.get "/scheduler/history" do
- @scheduler_stats = SchedulerStat.order('started_at desc').limit(200)
- erb File.read(File.join(VIEWS, 'history.erb')), locals: { view_path: VIEWS }
- end
-
- app.post "/scheduler/:name/trigger" do
- halt 404 unless (name = params[:name])
-
- RailsMultisite::ConnectionManagement.with_connection("default") do
- klass = name.constantize
- info = klass.schedule_info
- info.next_run = Time.now.to_i
- info.write!
-
- redirect "#{root_path}scheduler"
- end
- end
-
- end
- end
-end
-
-Sidekiq::Web.register(Scheduler::Web)
-Sidekiq::Web.tabs["Scheduler"] = "scheduler"
diff --git a/lib/tasks/scheduler.rake b/lib/tasks/scheduler.rake
index 14e40188281..23b7cacf691 100644
--- a/lib/tasks/scheduler.rake
+++ b/lib/tasks/scheduler.rake
@@ -28,7 +28,7 @@ end
desc "run every task the scheduler knows about in that order, use only for debugging"
task 'scheduler:run_all' => :environment do
- Scheduler::Manager.discover_schedules.each do |schedule|
+ MiniScheduler::Manager.discover_schedules.each do |schedule|
puts "Running #{schedule}"
time { schedule.new.execute({}) }
end
diff --git a/spec/components/scheduler/manager_spec.rb b/spec/components/scheduler/manager_spec.rb
deleted file mode 100644
index aeaa9a3e648..00000000000
--- a/spec/components/scheduler/manager_spec.rb
+++ /dev/null
@@ -1,255 +0,0 @@
-# encoding: utf-8
-require 'rails_helper'
-require 'scheduler/scheduler'
-
-describe Scheduler::Manager do
-
- module Testing
- class RandomJob
- extend ::Scheduler::Schedule
-
- def self.runs=(val)
- @runs = val
- end
-
- def self.runs
- @runs ||= 0
- end
-
- every 5.minutes
-
- def perform
- self.class.runs += 1
- sleep 0.001
- end
- end
-
- class SuperLongJob
- extend ::Scheduler::Schedule
-
- every 10.minutes
-
- def perform
- sleep 1000
- end
- end
-
- class PerHostJob
- extend ::Scheduler::Schedule
-
- per_host
- every 10.minutes
-
- def self.runs=(val)
- @runs = val
- end
-
- def self.runs
- @runs ||= 0
- end
-
- def perform
- self.class.runs += 1
- end
- end
- end
-
- let(:manager) {
- Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false)
- }
-
- before do
- expect(ActiveRecord::Base.connection_pool.connections.length).to eq(1)
- @thread_count = Thread.list.count
-
- @backtraces = {}
- Thread.list.each do |t|
- @backtraces[t.object_id] = t.backtrace
- end
- end
-
- after do
- manager.stop!
- manager.remove(Testing::RandomJob)
- manager.remove(Testing::SuperLongJob)
- manager.remove(Testing::PerHostJob)
- $redis.flushall
-
- # connections that are not in use must be removed
- # otherwise active record gets super confused
- ActiveRecord::Base.connection_pool.connections.reject { |c| c.in_use? }.each do |c|
- ActiveRecord::Base.connection_pool.remove(c)
- end
- expect(ActiveRecord::Base.connection_pool.connections.length).to (be <= 1)
-
- on_thread_mismatch = lambda do
- current = Thread.list.map { |t| t.object_id }
-
- old_threads = @backtraces.keys
- extra = current - old_threads
-
- missing = old_threads - current
-
- if missing.length > 0
- STDERR.puts "\nMissing Threads #{missing.length} thread/s"
- missing.each do |id|
- STDERR.puts @backtraces[id]
- STDERR.puts
- end
- end
-
- if extra.length > 0
- Thread.list.each do |thread|
- if extra.include?(thread.object_id)
- STDERR.puts "\nExtra Thread Backtrace:"
- STDERR.puts thread.backtrace
- STDERR.puts
- end
- end
- end
- end
-
- wait_for(on_fail: on_thread_mismatch) do
- @thread_count == Thread.list.count
- end
- end
-
- it 'can disable stats' do
- manager = Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false)
- expect(manager.enable_stats).to eq(false)
- manager.stop!
-
- manager = Scheduler::Manager.new(DiscourseRedis.new)
- expect(manager.enable_stats).to eq(true)
- manager.stop!
- end
-
- describe 'per host jobs' do
- it "correctly schedules on multiple hosts" do
-
- freeze_time
-
- Testing::PerHostJob.runs = 0
-
- hosts = ['a', 'b', 'c']
-
- hosts.map do |host|
-
- manager = Scheduler::Manager.new(DiscourseRedis.new, hostname: host, enable_stats: false)
- manager.ensure_schedule!(Testing::PerHostJob)
-
- info = manager.schedule_info(Testing::PerHostJob)
- info.next_run = Time.now.to_i - 10
- info.write!
-
- manager
-
- end.each do |manager|
-
- manager.blocking_tick
- manager.stop!
-
- end
-
- expect(Testing::PerHostJob.runs).to eq(3)
- end
- end
-
- describe '#sync' do
-
- it 'increases' do
- expect(Scheduler::Manager.seq).to eq(Scheduler::Manager.seq - 1)
- end
- end
-
- describe '#tick' do
-
- it 'should nuke missing jobs' do
- $redis.zadd Scheduler::Manager.queue_key, Time.now.to_i - 1000, "BLABLA"
- manager.tick
- expect($redis.zcard(Scheduler::Manager.queue_key)).to eq(0)
- end
-
- it 'should recover from crashed manager' do
-
- info = manager.schedule_info(Testing::SuperLongJob)
- info.next_run = Time.now.to_i - 1
- info.write!
-
- manager.tick
- manager.stop!
-
- $redis.del manager.identity_key
-
- manager = Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false)
- manager.reschedule_orphans!
-
- info = manager.schedule_info(Testing::SuperLongJob)
- expect(info.next_run).to be <= Time.now.to_i
-
- manager.stop!
- end
-
- it 'should log when job finishes running' do
-
- Testing::RandomJob.runs = 0
-
- info = manager.schedule_info(Testing::RandomJob)
- info.next_run = Time.now.to_i - 1
- info.write!
-
- # with stats so we must be careful to cleanup
- manager = Scheduler::Manager.new(DiscourseRedis.new)
- manager.blocking_tick
- manager.stop!
-
- stat = SchedulerStat.first
- expect(stat).to be_present
- expect(stat.duration_ms).to be > 0
- expect(stat.success).to be true
- SchedulerStat.destroy_all
- end
-
- it 'should only run pending job once' do
-
- Testing::RandomJob.runs = 0
-
- info = manager.schedule_info(Testing::RandomJob)
- info.next_run = Time.now.to_i - 1
- info.write!
-
- (0..5).map do
- Thread.new do
- manager = Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false)
- manager.blocking_tick
- manager.stop!
- end
- end.map(&:join)
-
- expect(Testing::RandomJob.runs).to eq(1)
-
- info = manager.schedule_info(Testing::RandomJob)
- expect(info.prev_run).to be <= Time.now.to_i
- expect(info.prev_duration).to be > 0
- expect(info.prev_result).to eq("OK")
- end
-
- end
-
- describe '#discover_schedules' do
- it 'Discovers Testing::RandomJob' do
- expect(Scheduler::Manager.discover_schedules).to include(Testing::RandomJob)
- end
- end
-
- describe '#next_run' do
- it 'should be within the next 5 mins if it never ran' do
-
- manager.remove(Testing::RandomJob)
- manager.ensure_schedule!(Testing::RandomJob)
-
- expect(manager.next_run(Testing::RandomJob))
- .to be_within(5.minutes.to_i).of(Time.now.to_i + 5.minutes)
- end
- end
-end
diff --git a/spec/components/scheduler/schedule_info_spec.rb b/spec/components/scheduler/schedule_info_spec.rb
deleted file mode 100644
index 75fc3bc1bbf..00000000000
--- a/spec/components/scheduler/schedule_info_spec.rb
+++ /dev/null
@@ -1,103 +0,0 @@
-# encoding: utf-8
-require 'rails_helper'
-require 'scheduler/scheduler'
-
-describe Scheduler::ScheduleInfo do
-
- let(:manager) { Scheduler::Manager.new }
-
- context "every" do
- class RandomJob
- extend ::Scheduler::Schedule
-
- every 1.hour
-
- def perform
- # work_it
- end
- end
-
- before do
- @info = manager.schedule_info(RandomJob)
- @info.del!
- end
-
- after do
- manager.stop!
- $redis.del manager.class.queue_key
- end
-
- it "is a scheduled job" do
- expect(RandomJob).to be_scheduled
- end
-
- it 'starts off invalid' do
- expect(@info.valid?).to eq(false)
- end
-
- it 'will have a due date in the next 5 minutes if it was blank' do
- @info.schedule!
- expect(@info.valid?).to eq(true)
- expect(@info.next_run).to be_within(5.minutes).of(Time.now.to_i)
- end
-
- it 'will have a due date within the next hour if it just ran' do
- @info.prev_run = Time.now.to_i
- @info.schedule!
- expect(@info.valid?).to eq(true)
- expect(@info.next_run).to be_within(1.hour * manager.random_ratio).of(Time.now.to_i + 1.hour)
- end
-
- it 'is invalid if way in the future' do
- @info.next_run = Time.now.to_i + 1.year
- expect(@info.valid?).to eq(false)
- end
- end
-
- context "daily" do
-
- class DailyJob
- extend ::Scheduler::Schedule
- daily at: 11.hours
-
- def perform
- end
- end
-
- before do
- freeze_time Time.parse("2010-01-10 10:00:00")
-
- @info = manager.schedule_info(DailyJob)
- @info.del!
- end
-
- after do
- manager.stop!
- $redis.del manager.class.queue_key
- end
-
- it "is a scheduled job" do
- expect(DailyJob).to be_scheduled
- end
-
- it "starts off invalid" do
- expect(@info.valid?).to eq(false)
- end
-
- it "will have a due date at the appropriate time if blank" do
- expect(@info.next_run).to eq(nil)
- @info.schedule!
-
- expect(JSON.parse($redis.get(@info.key))["next_run"])
- .to eq((Time.zone.now.midnight + 11.hours).to_i)
-
- expect(@info.valid?).to eq(true)
- end
-
- it 'is invalid if way in the future' do
- @info.next_run = Time.now.to_i + 1.year
- expect(@info.valid?).to eq(false)
- end
- end
-
-end