mirror of
https://github.com/discourse/discourse.git
synced 2024-11-22 12:38:26 +08:00
DEV: Log Unicorn worker timeout backtraces to Rails.logger
(#27257)
This commit introduces the following changes: 1. Introduce the `SignalTrapLogger` singleton which starts a single thread that polls a queue to log messages with the specified logger. This thread is necessary becasue most loggers cannot be used inside the `Signal.trap` context as they rely on mutexes which are not allowed within the context. 2. Moves the monkey patch in `freedom_patches/unicorn_http_server_patch.rb` to `config/unicorn.config.rb` which is already monkey patching `Unicorn::HttpServer`. 3. `Unicorn::HttpServer` will now automatically send a `USR2` signal to a unicorn worker 2 seconds before the worker is timed out by the Unicorn master. 4. When a Unicorn worker receives a `USR2` signal, it will now log only the main thread's backtraces to `Rails.logger`. Previously, it was `put`ing the backtraces to `STDOUT` which most people wouldn't read. Logging it via `Rails.logger` will make the backtraces easily accessible via `/logs`.
This commit is contained in:
parent
4b2bd4d682
commit
23c38cbf11
|
@ -84,14 +84,8 @@ before_fork do |server, worker|
|
||||||
server.logger.info "starting #{sidekiqs} supervised sidekiqs"
|
server.logger.info "starting #{sidekiqs} supervised sidekiqs"
|
||||||
|
|
||||||
require "demon/sidekiq"
|
require "demon/sidekiq"
|
||||||
Demon::Sidekiq.logger = server.logger
|
|
||||||
Demon::Sidekiq.after_fork { DiscourseEvent.trigger(:sidekiq_fork_started) }
|
Demon::Sidekiq.after_fork { DiscourseEvent.trigger(:sidekiq_fork_started) }
|
||||||
Demon::Sidekiq.start(sidekiqs)
|
Demon::Sidekiq.start(sidekiqs, logger: server.logger)
|
||||||
|
|
||||||
Signal.trap("SIGTSTP") do
|
|
||||||
Demon::Sidekiq.log("Issuing stop to Sidekiq")
|
|
||||||
Demon::Sidekiq.stop
|
|
||||||
end
|
|
||||||
|
|
||||||
# Trap USR1, so we can re-issue to sidekiq workers
|
# Trap USR1, so we can re-issue to sidekiq workers
|
||||||
# but chain the default unicorn implementation as well
|
# but chain the default unicorn implementation as well
|
||||||
|
@ -104,21 +98,48 @@ before_fork do |server, worker|
|
||||||
|
|
||||||
if ENV["DISCOURSE_ENABLE_EMAIL_SYNC_DEMON"] == "true"
|
if ENV["DISCOURSE_ENABLE_EMAIL_SYNC_DEMON"] == "true"
|
||||||
server.logger.info "starting up EmailSync demon"
|
server.logger.info "starting up EmailSync demon"
|
||||||
Demon::EmailSync.start
|
Demon::EmailSync.start(1, logger: server.logger)
|
||||||
Signal.trap("SIGTSTP") do
|
|
||||||
STDERR.puts "#{Time.now}: Issuing stop to EmailSync"
|
|
||||||
Demon::EmailSync.stop
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
DiscoursePluginRegistry.demon_processes.each do |demon_class|
|
DiscoursePluginRegistry.demon_processes.each do |demon_class|
|
||||||
server.logger.info "starting #{demon_class.prefix} demon"
|
server.logger.info "starting #{demon_class.prefix} demon"
|
||||||
demon_class.start
|
demon_class.start(1, logger: server.logger)
|
||||||
end
|
end
|
||||||
|
|
||||||
class ::Unicorn::HttpServer
|
class ::Unicorn::HttpServer
|
||||||
alias master_sleep_orig master_sleep
|
alias master_sleep_orig master_sleep
|
||||||
|
|
||||||
|
# Original source: https://github.com/defunkt/unicorn/blob/6c9c442fb6aa12fd871237bc2bb5aec56c5b3538/lib/unicorn/http_server.rb#L477-L496
|
||||||
|
def murder_lazy_workers
|
||||||
|
next_sleep = @timeout - 1
|
||||||
|
now = time_now.to_i
|
||||||
|
@workers.dup.each_pair do |wpid, worker|
|
||||||
|
tick = worker.tick
|
||||||
|
0 == tick and next # skip workers that haven't processed any clients
|
||||||
|
diff = now - tick
|
||||||
|
tmp = @timeout - diff
|
||||||
|
|
||||||
|
# START MONKEY PATCH
|
||||||
|
if tmp < 2
|
||||||
|
logger.error "worker=#{worker.nr} PID:#{wpid} running too long " \
|
||||||
|
"(#{diff}s), sending USR2 to dump thread backtraces"
|
||||||
|
kill_worker(:USR2, wpid)
|
||||||
|
end
|
||||||
|
# END MONKEY PATCH
|
||||||
|
|
||||||
|
if tmp >= 0
|
||||||
|
next_sleep > tmp and next_sleep = tmp
|
||||||
|
next
|
||||||
|
end
|
||||||
|
next_sleep = 0
|
||||||
|
logger.error "worker=#{worker.nr} PID:#{wpid} timeout " \
|
||||||
|
"(#{diff}s > #{@timeout}s), killing"
|
||||||
|
|
||||||
|
kill_worker(:KILL, wpid) # take no prisoners for timeout violations
|
||||||
|
end
|
||||||
|
next_sleep <= 0 ? 1 : next_sleep
|
||||||
|
end
|
||||||
|
|
||||||
def max_sidekiq_rss
|
def max_sidekiq_rss
|
||||||
rss =
|
rss =
|
||||||
`ps -eo rss,args | grep sidekiq | grep -v grep | awk '{print $1}'`.split("\n")
|
`ps -eo rss,args | grep sidekiq | grep -v grep | awk '{print $1}'`.split("\n")
|
||||||
|
@ -270,9 +291,14 @@ end
|
||||||
after_fork do |server, worker|
|
after_fork do |server, worker|
|
||||||
DiscourseEvent.trigger(:web_fork_started)
|
DiscourseEvent.trigger(:web_fork_started)
|
||||||
Discourse.after_fork
|
Discourse.after_fork
|
||||||
|
SignalTrapLogger.instance.after_fork
|
||||||
|
|
||||||
Signal.trap("USR2") { puts <<~MSG }
|
Signal.trap("USR2") do
|
||||||
[#{Time.now.utc.strftime("%Y-%m-%dT%H:%M:%S.%6N")} ##{Process.pid}] Received USR2 signal, dumping backtrace for all threads
|
message = <<~MSG
|
||||||
#{Thread.list.map { |t| "#{t.backtrace&.join("\n")}" }.join("\n\n")}
|
Unicorn worker received USR2 signal indicating it is about to timeout, dumping backtrace for main thread
|
||||||
|
#{Thread.current.backtrace&.join("\n")}
|
||||||
MSG
|
MSG
|
||||||
|
|
||||||
|
SignalTrapLogger.instance.log(Rails.logger, message, level: :warn)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -9,9 +9,9 @@ class Demon::Base
|
||||||
@demons
|
@demons
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.start(count = 1, verbose: false)
|
def self.start(count = 1, verbose: false, logger: nil)
|
||||||
@demons ||= {}
|
@demons ||= {}
|
||||||
count.times { |i| (@demons["#{prefix}_#{i}"] ||= new(i, verbose: verbose)).start }
|
count.times { |i| (@demons["#{prefix}_#{i}"] ||= new(i, verbose:, logger:)).start }
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.stop
|
def self.stop
|
||||||
|
@ -39,7 +39,7 @@ class Demon::Base
|
||||||
attr_reader :pid, :parent_pid, :started, :index
|
attr_reader :pid, :parent_pid, :started, :index
|
||||||
attr_accessor :stop_timeout
|
attr_accessor :stop_timeout
|
||||||
|
|
||||||
def initialize(index, rails_root: nil, parent_pid: nil, verbose: false)
|
def initialize(index, rails_root: nil, parent_pid: nil, verbose: false, logger: nil)
|
||||||
@index = index
|
@index = index
|
||||||
@pid = nil
|
@pid = nil
|
||||||
@parent_pid = parent_pid || Process.pid
|
@parent_pid = parent_pid || Process.pid
|
||||||
|
@ -47,6 +47,11 @@ class Demon::Base
|
||||||
@stop_timeout = 10
|
@stop_timeout = 10
|
||||||
@rails_root = rails_root || Rails.root
|
@rails_root = rails_root || Rails.root
|
||||||
@verbose = verbose
|
@verbose = verbose
|
||||||
|
@logger = logger || Logger.new(STDERR)
|
||||||
|
end
|
||||||
|
|
||||||
|
def log(message, level: :info)
|
||||||
|
@logger.public_send(level, message)
|
||||||
end
|
end
|
||||||
|
|
||||||
def pid_file
|
def pid_file
|
||||||
|
@ -72,6 +77,7 @@ class Demon::Base
|
||||||
|
|
||||||
def stop
|
def stop
|
||||||
@started = false
|
@started = false
|
||||||
|
|
||||||
if @pid
|
if @pid
|
||||||
Process.kill(stop_signal, @pid)
|
Process.kill(stop_signal, @pid)
|
||||||
|
|
||||||
|
@ -99,7 +105,7 @@ class Demon::Base
|
||||||
wait_for_stop.call
|
wait_for_stop.call
|
||||||
|
|
||||||
if alive?
|
if alive?
|
||||||
STDERR.puts "Process would not terminate cleanly, force quitting. pid: #{@pid} #{self.class}"
|
log("Process would not terminate cleanly, force quitting. pid: #{@pid} #{self.class}")
|
||||||
Process.kill("KILL", @pid)
|
Process.kill("KILL", @pid)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -125,8 +131,9 @@ class Demon::Base
|
||||||
rescue StandardError
|
rescue StandardError
|
||||||
-1
|
-1
|
||||||
end
|
end
|
||||||
|
|
||||||
if dead
|
if dead
|
||||||
STDERR.puts "Detected dead worker #{@pid}, restarting..."
|
log("Detected dead worker #{@pid}, restarting...")
|
||||||
@pid = nil
|
@pid = nil
|
||||||
@started = false
|
@started = false
|
||||||
start
|
start
|
||||||
|
@ -138,7 +145,7 @@ class Demon::Base
|
||||||
|
|
||||||
if existing = already_running?
|
if existing = already_running?
|
||||||
# should not happen ... so kill violently
|
# should not happen ... so kill violently
|
||||||
STDERR.puts "Attempting to kill pid #{existing}"
|
log("Attempting to kill pid #{existing}")
|
||||||
Process.kill("TERM", existing)
|
Process.kill("TERM", existing)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -199,7 +206,7 @@ class Demon::Base
|
||||||
Process.kill "KILL", Process.pid
|
Process.kill "KILL", Process.pid
|
||||||
end
|
end
|
||||||
rescue => e
|
rescue => e
|
||||||
STDERR.puts "URGENT monitoring thread had an exception #{e}"
|
log("URGENT monitoring thread had an exception #{e}")
|
||||||
end
|
end
|
||||||
sleep 1
|
sleep 1
|
||||||
end
|
end
|
||||||
|
|
|
@ -77,14 +77,14 @@ class Demon::EmailSync < ::Demon::Base
|
||||||
end
|
end
|
||||||
|
|
||||||
def after_fork
|
def after_fork
|
||||||
puts "[EmailSync] Loading EmailSync in process id #{Process.pid}"
|
log("[EmailSync] Loading EmailSync in process id #{Process.pid}")
|
||||||
|
|
||||||
loop do
|
loop do
|
||||||
break if Discourse.redis.set(HEARTBEAT_KEY, Time.now.to_i, ex: HEARTBEAT_INTERVAL, nx: true)
|
break if Discourse.redis.set(HEARTBEAT_KEY, Time.now.to_i, ex: HEARTBEAT_INTERVAL, nx: true)
|
||||||
sleep HEARTBEAT_INTERVAL
|
sleep HEARTBEAT_INTERVAL
|
||||||
end
|
end
|
||||||
|
|
||||||
puts "[EmailSync] Starting EmailSync main thread"
|
log("[EmailSync] Starting EmailSync main thread")
|
||||||
|
|
||||||
@running = true
|
@running = true
|
||||||
@sync_data = {}
|
@sync_data = {}
|
||||||
|
@ -158,18 +158,20 @@ class Demon::EmailSync < ::Demon::Base
|
||||||
Discourse.redis.del(HEARTBEAT_KEY)
|
Discourse.redis.del(HEARTBEAT_KEY)
|
||||||
exit 0
|
exit 0
|
||||||
rescue => e
|
rescue => e
|
||||||
STDERR.puts e.message
|
log("#{e.message}: #{e.backtrace.join("\n")}")
|
||||||
STDERR.puts e.backtrace.join("\n")
|
|
||||||
exit 1
|
exit 1
|
||||||
end
|
end
|
||||||
|
|
||||||
def kill_and_disconnect!(data)
|
def kill_and_disconnect!(data)
|
||||||
data[:thread].kill
|
data[:thread].kill
|
||||||
data[:thread].join
|
data[:thread].join
|
||||||
|
|
||||||
begin
|
begin
|
||||||
data[:syncer]&.disconnect!
|
data[:syncer]&.disconnect!
|
||||||
rescue Net::IMAP::ResponseError => err
|
rescue Net::IMAP::ResponseError => err
|
||||||
puts "[EmailSync] Encountered a response error when disconnecting: #{err}"
|
log(
|
||||||
|
"[EmailSync] Encountered a response error when disconnecting: #{err}\n#{err.backtrace.join("\n")}",
|
||||||
|
)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -3,8 +3,6 @@
|
||||||
require "demon/base"
|
require "demon/base"
|
||||||
|
|
||||||
class Demon::Sidekiq < ::Demon::Base
|
class Demon::Sidekiq < ::Demon::Base
|
||||||
cattr_accessor :logger
|
|
||||||
|
|
||||||
def self.prefix
|
def self.prefix
|
||||||
"sidekiq"
|
"sidekiq"
|
||||||
end
|
end
|
||||||
|
@ -13,37 +11,6 @@ class Demon::Sidekiq < ::Demon::Base
|
||||||
blk ? (@blk = blk) : @blk
|
blk ? (@blk = blk) : @blk
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.format(message)
|
|
||||||
"[#{Time.now.utc.strftime("%Y-%m-%dT%H:%M:%S.%6N")} ##{Process.pid}] #{message}"
|
|
||||||
end
|
|
||||||
|
|
||||||
def self.log(message, level: :info)
|
|
||||||
# We use an IO pipe and log messages using the logger in a seperate thread to avoid the `log writing failed. can't be called from trap context`
|
|
||||||
# error message that is raised when trying to log from within a `Signal.trap` block.
|
|
||||||
if logger
|
|
||||||
if !defined?(@logger_read_pipe)
|
|
||||||
@logger_read_pipe, @logger_write_pipe = IO.pipe
|
|
||||||
|
|
||||||
@logger_thread =
|
|
||||||
Thread.new do
|
|
||||||
begin
|
|
||||||
while readable_io = IO.select([@logger_read_pipe])
|
|
||||||
logger.public_send(level, readable_io.first[0].gets.strip)
|
|
||||||
end
|
|
||||||
rescue => e
|
|
||||||
STDOUT.puts self.format(
|
|
||||||
"Error in Sidekiq demon logger thread: #{e.message}\n#{e.backtrace.join("\n")}",
|
|
||||||
)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
@logger_write_pipe.puts(message)
|
|
||||||
else
|
|
||||||
STDOUT.puts self.format(message)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
def suppress_stdout
|
def suppress_stdout
|
||||||
|
@ -54,12 +21,13 @@ class Demon::Sidekiq < ::Demon::Base
|
||||||
false
|
false
|
||||||
end
|
end
|
||||||
|
|
||||||
def log(message, level: :info)
|
def log_in_trap(message, level: :info)
|
||||||
self.class.log(message, level:)
|
SignalTrapLogger.instance.log(@logger, message, level: level)
|
||||||
end
|
end
|
||||||
|
|
||||||
def after_fork
|
def after_fork
|
||||||
Demon::Sidekiq.after_fork&.call
|
Demon::Sidekiq.after_fork&.call
|
||||||
|
SignalTrapLogger.instance.after_fork
|
||||||
|
|
||||||
log("Loading Sidekiq in process id #{Process.pid}")
|
log("Loading Sidekiq in process id #{Process.pid}")
|
||||||
require "sidekiq/cli"
|
require "sidekiq/cli"
|
||||||
|
@ -67,9 +35,9 @@ class Demon::Sidekiq < ::Demon::Base
|
||||||
|
|
||||||
# Unicorn uses USR1 to indicate that log files have been rotated
|
# Unicorn uses USR1 to indicate that log files have been rotated
|
||||||
Signal.trap("USR1") do
|
Signal.trap("USR1") do
|
||||||
log("Sidekiq reopening logs...")
|
log_in_trap("Sidekiq reopening logs...")
|
||||||
Unicorn::Util.reopen_logs
|
Unicorn::Util.reopen_logs
|
||||||
log("Sidekiq done reopening logs...")
|
log_in_trap("Sidekiq done reopening logs...")
|
||||||
end
|
end
|
||||||
|
|
||||||
options = ["-c", GlobalSetting.sidekiq_workers.to_s]
|
options = ["-c", GlobalSetting.sidekiq_workers.to_s]
|
||||||
|
@ -91,8 +59,7 @@ class Demon::Sidekiq < ::Demon::Base
|
||||||
load Rails.root + "config/initializers/100-sidekiq.rb"
|
load Rails.root + "config/initializers/100-sidekiq.rb"
|
||||||
cli.run
|
cli.run
|
||||||
rescue => e
|
rescue => e
|
||||||
STDERR.puts e.message
|
log("Error encountered while starting Sidekiq: #{e.message}\n#{e.backtrace.join("\n")}")
|
||||||
STDERR.puts e.backtrace.join("\n")
|
|
||||||
exit 1
|
exit 1
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -1,38 +0,0 @@
|
||||||
# frozen_string_literal: true
|
|
||||||
|
|
||||||
if ENV["DISCOURSE_DUMP_BACKTRACES_ON_UNICORN_WORKER_TIMEOUT"] && defined?(Unicorn::HttpServer)
|
|
||||||
module UnicornHTTPServerPatch
|
|
||||||
# Original source: https://github.com/defunkt/unicorn/blob/6c9c442fb6aa12fd871237bc2bb5aec56c5b3538/lib/unicorn/http_server.rb#L477-L496
|
|
||||||
def murder_lazy_workers
|
|
||||||
next_sleep = @timeout - 1
|
|
||||||
now = time_now.to_i
|
|
||||||
@workers.dup.each_pair do |wpid, worker|
|
|
||||||
tick = worker.tick
|
|
||||||
0 == tick and next # skip workers that haven't processed any clients
|
|
||||||
diff = now - tick
|
|
||||||
tmp = @timeout - diff
|
|
||||||
|
|
||||||
# START MONKEY PATCH
|
|
||||||
if tmp < 2
|
|
||||||
logger.error "worker=#{worker.nr} PID:#{wpid} running too long " \
|
|
||||||
"(#{diff}s), sending USR2 to dump thread backtraces"
|
|
||||||
kill_worker(:USR2, wpid)
|
|
||||||
end
|
|
||||||
# END MONKEY PATCH
|
|
||||||
|
|
||||||
if tmp >= 0
|
|
||||||
next_sleep > tmp and next_sleep = tmp
|
|
||||||
next
|
|
||||||
end
|
|
||||||
next_sleep = 0
|
|
||||||
logger.error "worker=#{worker.nr} PID:#{wpid} timeout " \
|
|
||||||
"(#{diff}s > #{@timeout}s), killing"
|
|
||||||
|
|
||||||
kill_worker(:KILL, wpid) # take no prisoners for timeout violations
|
|
||||||
end
|
|
||||||
next_sleep <= 0 ? 1 : next_sleep
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
Unicorn::HttpServer.prepend(UnicornHTTPServerPatch)
|
|
||||||
end
|
|
49
lib/signal_trap_logger.rb
Normal file
49
lib/signal_trap_logger.rb
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
# This class is used to log messages to a specified logger from within a `Signal.trap` context. Most loggers rely on
|
||||||
|
# methods that are prohibited within a `Signal.trap` context, so this class is used to queue up log messages and then
|
||||||
|
# log them from a separate thread outside of the `Signal.trap` context.
|
||||||
|
#
|
||||||
|
# Example:
|
||||||
|
# Signal.trap("USR1") do
|
||||||
|
# SignalTrapLogger.instance.log(Rails.logger, "Received USR1 signal")
|
||||||
|
# end
|
||||||
|
#
|
||||||
|
# Do note that you need to call `SignalTrapLogger.instance.after_fork` after forking a new process to ensure that the
|
||||||
|
# logging thread is running in the new process.
|
||||||
|
class SignalTrapLogger
|
||||||
|
include Singleton
|
||||||
|
|
||||||
|
def initialize
|
||||||
|
@queue = Queue.new
|
||||||
|
ensure_logging_thread_running
|
||||||
|
end
|
||||||
|
|
||||||
|
def log(logger, message, level: :info)
|
||||||
|
@queue << { logger:, message:, level: }
|
||||||
|
end
|
||||||
|
|
||||||
|
def after_fork
|
||||||
|
ensure_logging_thread_running
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def ensure_logging_thread_running
|
||||||
|
return if @thread&.alive?
|
||||||
|
|
||||||
|
@thread =
|
||||||
|
Thread.new do
|
||||||
|
loop do
|
||||||
|
begin
|
||||||
|
log_entry = @queue.pop
|
||||||
|
log_entry[:logger].public_send(log_entry[:level], log_entry[:message])
|
||||||
|
rescue => error
|
||||||
|
Rails.logger.error(
|
||||||
|
"Error in SignalTrapLogger thread: #{error.message}\n#{error.backtrace.join("\n")}",
|
||||||
|
)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
Loading…
Reference in New Issue
Block a user