mirror of
https://github.com/discourse/discourse.git
synced 2024-11-22 09:12:45 +08:00
FEATURE: Optional detailed performance logging for Sidekiq jobs (#7091)
By default, this does nothing. Two environment variables are available: - `DISCOURSE_LOG_SIDEKIQ` Set to `"1"` to enable logging. This will log all completed jobs to `log/rails/sidekiq.log`, along with various db/redis/network statistics. This is useful to track down poorly performing jobs. - `DISCOURSE_LOG_SIDEKIQ_INTERVAL` (seconds) Check running jobs periodically, and log their current duration. They will appear in the logs with `status:pending`. This is useful to track down jobs which take a long time, then crash sidekiq before completing.
This commit is contained in:
parent
73e4204d20
commit
8963f1af30
112
app/jobs/base.rb
112
app/jobs/base.rb
|
@ -16,35 +16,102 @@ module Jobs
|
|||
end
|
||||
|
||||
class Base
|
||||
class JobInstrumenter
|
||||
def initialize(job_class:, opts:, db:)
|
||||
return unless enabled?
|
||||
@data = {}
|
||||
|
||||
class Instrumenter
|
||||
@data["hostname"] = `hostname`.strip # Hostname
|
||||
@data["pid"] = Process.pid # Pid
|
||||
@data["database"] = db # DB name - multisite db name it ran on
|
||||
@data["job_name"] = job_class.name # Job Name - eg: Jobs::AboutStats
|
||||
@data["job_type"] = job_class.try(:scheduled?) ? "scheduled" : "regular" # Job Type - either s for scheduled or r for regular
|
||||
@data["opts"] = opts # Params - json encoded params for the job
|
||||
|
||||
def self.stats
|
||||
Thread.current[:db_stats] ||= Stats.new
|
||||
@data["status"] = 'pending'
|
||||
@start_timestamp = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
||||
|
||||
self.class.ensure_interval_logging!
|
||||
@@active_jobs ||= []
|
||||
@@active_jobs << self
|
||||
|
||||
MethodProfiler.ensure_discourse_instrumentation!
|
||||
MethodProfiler.start
|
||||
end
|
||||
|
||||
class Stats
|
||||
attr_accessor :query_count, :duration_ms
|
||||
def stop(exception:)
|
||||
return unless enabled?
|
||||
|
||||
def initialize
|
||||
@query_count = 0
|
||||
@duration_ms = 0
|
||||
profile = MethodProfiler.stop
|
||||
|
||||
@@active_jobs.delete(self)
|
||||
|
||||
@data["duration"] = profile[:total_duration] # Duration - length in seconds it took to run
|
||||
@data["sql_duration"] = profile.dig(:sql, :duration) || 0 # Sql Duration (s)
|
||||
@data["sql_calls"] = profile.dig(:sql, :calls) || 0 # Sql Statements - how many statements ran
|
||||
@data["redis_duration"] = profile.dig(:redis, :duration) || 0 # Redis Duration (s)
|
||||
@data["redis_calls"] = profile.dig(:redis, :calls) || 0 # Redis commands
|
||||
@data["net_duration"] = profile.dig(:net, :duration) || 0 # Redis Duration (s)
|
||||
@data["net_calls"] = profile.dig(:net, :calls) || 0 # Redis commands
|
||||
|
||||
if exception.present?
|
||||
@data["exception"] = exception # Exception - if job fails a json encoded exception
|
||||
@data["status"] = 'failed'
|
||||
else
|
||||
@data["status"] = 'success' # Status - fail, success, pending
|
||||
end
|
||||
|
||||
write_to_log
|
||||
end
|
||||
|
||||
def call(name, start, finish, message_id, values)
|
||||
stats = Instrumenter.stats
|
||||
stats.query_count += 1
|
||||
stats.duration_ms += (((finish - start).to_f) * 1000).to_i
|
||||
def self.raw_log(message)
|
||||
@@logger ||= Logger.new("#{Rails.root}/log/sidekiq.log")
|
||||
@@log_queue ||= Queue.new
|
||||
unless @log_thread&.alive?
|
||||
@@log_thread = Thread.new do
|
||||
begin
|
||||
loop { @@logger << @@log_queue.pop }
|
||||
rescue Exception => e
|
||||
Discourse.warn_exception(e, message: "Sidekiq logging thread terminated unexpectedly")
|
||||
end
|
||||
end
|
||||
end
|
||||
@@log_queue.push(message)
|
||||
end
|
||||
|
||||
def current_duration
|
||||
Process.clock_gettime(Process::CLOCK_MONOTONIC) - @start_timestamp
|
||||
end
|
||||
|
||||
def write_to_log
|
||||
return unless enabled?
|
||||
@data["@timestamp"] = Time.now
|
||||
@data["duration"] = current_duration if @data["status"] == "pending"
|
||||
self.class.raw_log("#{@data.to_json}\n")
|
||||
end
|
||||
|
||||
def enabled?
|
||||
ENV["DISCOURSE_LOG_SIDEKIQ"] == "1"
|
||||
end
|
||||
|
||||
def self.ensure_interval_logging!
|
||||
interval = ENV["DISCOURSE_LOG_SIDEKIQ_INTERVAL"]
|
||||
return if !interval
|
||||
@@interval_thread ||= Thread.new do
|
||||
begin
|
||||
loop do
|
||||
sleep interval.to_i
|
||||
@@active_jobs.each { |j| j.write_to_log if j.current_duration > interval }
|
||||
end
|
||||
rescue Exception => e
|
||||
Discourse.warn_exception(e, message: "Sidekiq interval logging thread terminated unexpectedly")
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
include Sidekiq::Worker
|
||||
|
||||
def initialize
|
||||
@db_duration = 0
|
||||
end
|
||||
|
||||
def log(*args)
|
||||
args.each do |arg|
|
||||
Rails.logger.info "#{Time.now.to_formatted_s(:db)}: [#{self.class.name.upcase}] #{arg}"
|
||||
|
@ -81,16 +148,7 @@ module Jobs
|
|||
@db_duration || 0
|
||||
end
|
||||
|
||||
def ensure_db_instrumented
|
||||
@@instrumented ||= begin
|
||||
ActiveSupport::Notifications.subscribe('sql.active_record', Instrumenter.new)
|
||||
true
|
||||
end
|
||||
end
|
||||
|
||||
def perform(*args)
|
||||
total_db_time = 0
|
||||
ensure_db_instrumented
|
||||
opts = args.extract_options!.with_indifferent_access
|
||||
|
||||
if SiteSetting.queue_jobs?
|
||||
|
@ -125,6 +183,7 @@ module Jobs
|
|||
exception = {}
|
||||
|
||||
RailsMultisite::ConnectionManagement.with_connection(db) do
|
||||
job_instrumenter = JobInstrumenter.new(job_class: self.class, opts: opts, db: db)
|
||||
begin
|
||||
I18n.locale = SiteSetting.default_locale || "en"
|
||||
I18n.ensure_all_loaded!
|
||||
|
@ -144,7 +203,7 @@ module Jobs
|
|||
exception[:message] = "While establishing database connection to #{db}"
|
||||
exception[:other] = { problem_db: db }
|
||||
ensure
|
||||
total_db_time += Instrumenter.stats.duration_ms
|
||||
job_instrumenter.stop(exception: exception)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -164,7 +223,6 @@ module Jobs
|
|||
nil
|
||||
ensure
|
||||
ActiveRecord::Base.connection_handler.clear_active_connections!
|
||||
@db_duration = total_db_time
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -61,4 +61,26 @@ class MethodProfiler
|
|||
end
|
||||
data
|
||||
end
|
||||
|
||||
def self.ensure_discourse_instrumentation!
|
||||
@@instrumentation_setup ||= begin
|
||||
MethodProfiler.patch(PG::Connection, [
|
||||
:exec, :async_exec, :exec_prepared, :send_query_prepared, :query, :exec_params
|
||||
], :sql)
|
||||
|
||||
MethodProfiler.patch(Redis::Client, [
|
||||
:call, :call_pipeline
|
||||
], :redis)
|
||||
|
||||
MethodProfiler.patch(Net::HTTP, [
|
||||
:request
|
||||
], :net, no_recurse: true)
|
||||
|
||||
MethodProfiler.patch(Excon::Connection, [
|
||||
:request
|
||||
], :net)
|
||||
true
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -15,26 +15,7 @@ class Middleware::RequestTracker
|
|||
# # do stuff with env and data
|
||||
# end
|
||||
def self.register_detailed_request_logger(callback)
|
||||
|
||||
unless @patched_instrumentation
|
||||
MethodProfiler.patch(PG::Connection, [
|
||||
:exec, :async_exec, :exec_prepared, :send_query_prepared, :query, :exec_params
|
||||
], :sql)
|
||||
|
||||
MethodProfiler.patch(Redis::Client, [
|
||||
:call, :call_pipeline
|
||||
], :redis)
|
||||
|
||||
MethodProfiler.patch(Net::HTTP, [
|
||||
:request
|
||||
], :net, no_recurse: true)
|
||||
|
||||
MethodProfiler.patch(Excon::Connection, [
|
||||
:request
|
||||
], :net)
|
||||
@patched_instrumentation = true
|
||||
end
|
||||
|
||||
MethodProfiler.ensure_discourse_instrumentation!
|
||||
(@@detailed_request_loggers ||= []) << callback
|
||||
end
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user