mirror of
https://github.com/discourse/discourse.git
synced 2025-01-16 03:32:40 +08:00
e4e5db57f0
This is a follow-up to 9812407f76
246 lines
6.8 KiB
Ruby
246 lines
6.8 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "demon/base"
|
|
|
|
class Demon::EmailSync < ::Demon::Base
|
|
HEARTBEAT_KEY = "email_sync_heartbeat"
|
|
HEARTBEAT_INTERVAL = 60.seconds
|
|
|
|
def self.prefix
|
|
"email_sync"
|
|
end
|
|
|
|
def self.max_email_sync_rss
|
|
return 0 if demons.empty?
|
|
|
|
email_sync_pids = demons.map { |uid, demon| demon.pid }
|
|
|
|
return 0 if email_sync_pids.empty?
|
|
|
|
rss =
|
|
`ps -eo pid,rss,args | grep '#{email_sync_pids.join("|")}' | grep -v grep | awk '{print $2}'`.split(
|
|
"\n",
|
|
)
|
|
.map(&:to_i)
|
|
.max
|
|
|
|
(rss || 0) * 1024
|
|
end
|
|
private_class_method :max_email_sync_rss
|
|
|
|
DEFAULT_MAX_ALLOWED_EMAIL_SYNC_RSS_MEGABYTES = 500
|
|
|
|
def self.max_allowed_email_sync_rss
|
|
[
|
|
ENV["UNICORN_EMAIL_SYNC_MAX_RSS"].to_i,
|
|
DEFAULT_MAX_ALLOWED_EMAIL_SYNC_RSS_MEGABYTES,
|
|
].max.megabytes
|
|
end
|
|
private_class_method :max_allowed_email_sync_rss
|
|
|
|
if Rails.env.test?
|
|
def self.test_cleanup
|
|
@@email_sync_next_heartbeat_check = nil
|
|
end
|
|
end
|
|
|
|
def self.check_email_sync_heartbeat
|
|
if defined?(@@email_sync_next_heartbeat_check) && @@email_sync_next_heartbeat_check &&
|
|
@@email_sync_next_heartbeat_check > Time.now.to_i
|
|
return
|
|
end
|
|
|
|
@@email_sync_next_heartbeat_check = (Time.now + HEARTBEAT_INTERVAL).to_i
|
|
|
|
should_restart = false
|
|
|
|
# Restart process if it does not respond anymore
|
|
last_heartbeat_ago = Time.now.to_i - Discourse.redis.get(HEARTBEAT_KEY).to_i
|
|
|
|
if last_heartbeat_ago > HEARTBEAT_INTERVAL.to_i
|
|
Rails.logger.warn(
|
|
"EmailSync heartbeat test failed (last heartbeat was #{last_heartbeat_ago}s ago), restarting",
|
|
)
|
|
|
|
should_restart = true
|
|
end
|
|
|
|
# Restart process if memory usage is too high
|
|
if !should_restart && (email_sync_rss = max_email_sync_rss) > max_allowed_email_sync_rss
|
|
Rails.logger.warn(
|
|
"EmailSync is consuming too much memory (using: %0.2fM) for '%s', restarting" %
|
|
[(email_sync_rss.to_f / 1.megabyte), HOSTNAME],
|
|
)
|
|
|
|
should_restart = true
|
|
end
|
|
|
|
restart if should_restart
|
|
end
|
|
|
|
private
|
|
|
|
def suppress_stdout
|
|
false
|
|
end
|
|
|
|
def suppress_stderr
|
|
false
|
|
end
|
|
|
|
def start_thread(db, group)
|
|
Thread.new do
|
|
RailsMultisite::ConnectionManagement.with_connection(db) do
|
|
ImapSyncLog.debug("Thread started for group #{group.name} in db #{db}", group, db: false)
|
|
begin
|
|
syncer = Imap::Sync.new(group)
|
|
rescue Net::IMAP::NoResponseError => e
|
|
group.update(imap_last_error: e.message)
|
|
Thread.exit
|
|
end
|
|
|
|
@sync_lock.synchronize { @sync_data[db][group.id][:syncer] = syncer }
|
|
|
|
status = nil
|
|
idle = false
|
|
|
|
while @running && group.reload.imap_mailbox_name.present?
|
|
ImapSyncLog.debug("Processing mailbox for group #{group.name} in db #{db}", group)
|
|
status =
|
|
syncer.process(
|
|
idle: syncer.can_idle? && status && status[:remaining] == 0,
|
|
old_emails_limit: status && status[:remaining] > 0 ? 0 : nil,
|
|
)
|
|
|
|
if !syncer.can_idle? && status[:remaining] == 0
|
|
ImapSyncLog.debug(
|
|
"Going to sleep for group #{group.name} in db #{db} to wait for new emails",
|
|
group,
|
|
db: false,
|
|
)
|
|
|
|
# Thread goes into sleep for a bit so it is better to return any
|
|
# connection back to the pool.
|
|
ActiveRecord::Base.connection_handler.clear_active_connections!
|
|
|
|
sleep SiteSetting.imap_polling_period_mins.minutes
|
|
end
|
|
end
|
|
|
|
syncer.disconnect!
|
|
end
|
|
end
|
|
end
|
|
|
|
def kill_threads
|
|
# This is not really safe so the caller should ensure it happens in a
|
|
# thread-safe context.
|
|
# It should be safe when called from within a `trap` (there are no
|
|
# synchronization primitives available anyway).
|
|
@running = false
|
|
|
|
@sync_data.each { |db, sync_data| sync_data.each { |_, data| kill_and_disconnect!(data) } }
|
|
|
|
exit 0
|
|
end
|
|
|
|
def after_fork
|
|
log("[EmailSync] Loading EmailSync in process id #{Process.pid}")
|
|
|
|
loop do
|
|
break if Discourse.redis.set(HEARTBEAT_KEY, Time.now.to_i, ex: HEARTBEAT_INTERVAL, nx: true)
|
|
sleep HEARTBEAT_INTERVAL
|
|
end
|
|
|
|
log("[EmailSync] Starting EmailSync main thread")
|
|
|
|
@running = true
|
|
@sync_data = {}
|
|
@sync_lock = Mutex.new
|
|
|
|
trap("INT") { kill_threads }
|
|
trap("TERM") { kill_threads }
|
|
trap("HUP") { kill_threads }
|
|
|
|
while @running
|
|
Discourse.redis.set(HEARTBEAT_KEY, Time.now.to_i, ex: HEARTBEAT_INTERVAL)
|
|
|
|
# Kill all threads for databases that no longer exist
|
|
all_dbs = Set.new(RailsMultisite::ConnectionManagement.all_dbs)
|
|
@sync_data.filter! do |db, sync_data|
|
|
next true if all_dbs.include?(db)
|
|
|
|
sync_data.each { |_, data| kill_and_disconnect!(data) }
|
|
|
|
false
|
|
end
|
|
|
|
RailsMultisite::ConnectionManagement.each_connection do |db|
|
|
next if !SiteSetting.enable_imap
|
|
|
|
groups = Group.with_imap_configured.map { |group| [group.id, group] }.to_h
|
|
|
|
@sync_lock.synchronize do
|
|
@sync_data[db] ||= {}
|
|
|
|
# Kill threads for group's mailbox that are no longer synchronized.
|
|
@sync_data[db].filter! do |group_id, data|
|
|
next true if groups[group_id] && data[:thread]&.alive? && !data[:syncer]&.disconnected?
|
|
|
|
if !groups[group_id]
|
|
ImapSyncLog.warn(
|
|
"Killing thread for group because mailbox is no longer synced",
|
|
group_id,
|
|
)
|
|
else
|
|
ImapSyncLog.warn("Thread for group is dead", group_id)
|
|
end
|
|
|
|
kill_and_disconnect!(data)
|
|
false
|
|
end
|
|
|
|
# Spawn new threads for groups that are now synchronized.
|
|
groups.each do |group_id, group|
|
|
if !@sync_data[db][group_id]
|
|
ImapSyncLog.debug(
|
|
"Starting thread for group #{group.name} mailbox #{group.imap_mailbox_name}",
|
|
group,
|
|
db: false,
|
|
)
|
|
|
|
@sync_data[db][group_id] = { thread: start_thread(db, group), syncer: nil }
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
# Thread goes into sleep for a bit so it is better to return any
|
|
# connection back to the pool.
|
|
ActiveRecord::Base.connection_handler.clear_active_connections!
|
|
|
|
sleep 5
|
|
end
|
|
|
|
@sync_lock.synchronize { kill_threads }
|
|
Discourse.redis.del(HEARTBEAT_KEY)
|
|
exit 0
|
|
rescue => e
|
|
log("#{e.message}: #{e.backtrace.join("\n")}")
|
|
exit 1
|
|
end
|
|
|
|
def kill_and_disconnect!(data)
|
|
data[:thread].kill
|
|
data[:thread].join
|
|
|
|
begin
|
|
data[:syncer]&.disconnect!
|
|
rescue Net::IMAP::ResponseError => err
|
|
log(
|
|
"[EmailSync] Encountered a response error when disconnecting: #{err}\n#{err.backtrace.join("\n")}",
|
|
)
|
|
end
|
|
end
|
|
end
|