mirror of
https://github.com/discourse/discourse.git
synced 2025-01-18 21:12:45 +08:00
654959faa4
Forcing distributed muted to raise when a notify reviewable job is running leads to excessive errors in the logs under many conditions. The new pattern 1. Optimises the counting of reviewables so it is a lot faster 2. Holds the distributed lock for 2 minutes (max) The downside is the job queue can get blocked up when tons of notify reviewables are running at the same time. However this should be very rare in the real world, as we only notify when stuff is flagged which is fairly infrequent. This also give a fair bit more time for the notifications which may be a little slow on large sites with tons of mods.
106 lines
2.7 KiB
Ruby
106 lines
2.7 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
# Cross-process locking using Redis.
|
|
# Expiration happens when the current time is greater than the expire time
|
|
class DistributedMutex
|
|
DEFAULT_VALIDITY = 60
|
|
CHECK_READONLY_ATTEMPTS = 5
|
|
|
|
LOCK_SCRIPT = DiscourseRedis::EvalHelper.new <<~LUA
|
|
local now = redis.call("time")[1]
|
|
local expire_time = now + ARGV[1]
|
|
local current_expire_time = redis.call("get", KEYS[1])
|
|
|
|
if current_expire_time and tonumber(now) <= tonumber(current_expire_time) then
|
|
return nil
|
|
else
|
|
local result = redis.call("setex", KEYS[1], ARGV[1] + 1, tostring(expire_time))
|
|
return expire_time
|
|
end
|
|
LUA
|
|
|
|
UNLOCK_SCRIPT = DiscourseRedis::EvalHelper.new <<~LUA
|
|
local current_expire_time = redis.call("get", KEYS[1])
|
|
|
|
if current_expire_time == ARGV[1] then
|
|
local result = redis.call("del", KEYS[1])
|
|
return result ~= nil
|
|
else
|
|
return false
|
|
end
|
|
LUA
|
|
|
|
def self.synchronize(key, redis: nil, validity: DEFAULT_VALIDITY, &blk)
|
|
self.new(key, redis: redis, validity: validity).synchronize(&blk)
|
|
end
|
|
|
|
def initialize(key, redis: nil, validity: DEFAULT_VALIDITY)
|
|
@key = key
|
|
@using_global_redis = true if !redis
|
|
@redis = redis || Discourse.redis
|
|
@mutex = Mutex.new
|
|
@validity = validity
|
|
end
|
|
|
|
# NOTE wrapped in mutex to maintain its semantics
|
|
def synchronize
|
|
result = nil
|
|
|
|
@mutex.synchronize do
|
|
expire_time = get_lock
|
|
|
|
begin
|
|
result = yield
|
|
ensure
|
|
current_time = redis.time[0]
|
|
if current_time > expire_time
|
|
warn(
|
|
"held for too long, expected max: #{@validity} secs, took an extra #{current_time - expire_time} secs",
|
|
)
|
|
end
|
|
|
|
unlocked = UNLOCK_SCRIPT.eval(redis, [prefixed_key], [expire_time.to_s])
|
|
if !unlocked && current_time <= expire_time
|
|
warn("the redis key appears to have been tampered with before expiration")
|
|
end
|
|
end
|
|
end
|
|
|
|
result
|
|
end
|
|
|
|
private
|
|
|
|
attr_reader :key
|
|
attr_reader :redis
|
|
attr_reader :validity
|
|
|
|
def get_lock
|
|
attempts = 0
|
|
|
|
while true
|
|
expire_time = LOCK_SCRIPT.eval(redis, [prefixed_key], [validity])
|
|
|
|
return expire_time if expire_time
|
|
|
|
# Exponential backoff, max duration 1s
|
|
interval = attempts < 10 ? (0.001 * 2**attempts) : 1
|
|
sleep interval
|
|
attempts += 1
|
|
|
|
# in readonly we will never be able to get a lock
|
|
if @using_global_redis && Discourse.recently_readonly? && attempts > CHECK_READONLY_ATTEMPTS
|
|
raise Discourse::ReadOnly
|
|
end
|
|
end
|
|
end
|
|
|
|
def prefixed_key
|
|
@prefixed_key ||= redis.respond_to?(:namespace_key) ? redis.namespace_key(key) : key
|
|
end
|
|
|
|
def warn(msg)
|
|
Rails.logger.warn("DistributedMutex(#{key.inspect}): #{msg}")
|
|
end
|
|
end
|