mirror of
https://github.com/discourse/discourse.git
synced 2024-11-28 07:33:40 +08:00
2422ca0e67
Polling every 0.001s can cause extreme load on the redis instance, especially in scenarios where multiple app instances are waiting on the same lock. This commit introduces an exponential backoff starting from 0.001s and reaching a maximum interval of 1s. Previously `CHECK_READONLY_ATTEMPTS` was 10, and resulted in a block for 0.01s. Under the new logic, 10 attempts take more than 1s. Therefore CHECK_READONLY_ATTEMPTS is reduced to 5, bringing its total time to around 0.031s
111 lines
2.7 KiB
Ruby
111 lines
2.7 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
# Cross-process locking using Redis.
|
|
# Expiration happens when the current time is greater than the expire time
|
|
class DistributedMutex
|
|
DEFAULT_VALIDITY = 60
|
|
CHECK_READONLY_ATTEMPTS = 5
|
|
|
|
LOCK_SCRIPT = DiscourseRedis::EvalHelper.new <<~LUA
|
|
local now = redis.call("time")[1]
|
|
local expire_time = now + ARGV[1]
|
|
local current_expire_time = redis.call("get", KEYS[1])
|
|
|
|
if current_expire_time and tonumber(now) <= tonumber(current_expire_time) then
|
|
return nil
|
|
else
|
|
local result = redis.call("setex", KEYS[1], ARGV[1] + 1, tostring(expire_time))
|
|
return expire_time
|
|
end
|
|
LUA
|
|
|
|
UNLOCK_SCRIPT = DiscourseRedis::EvalHelper.new <<~LUA
|
|
local current_expire_time = redis.call("get", KEYS[1])
|
|
|
|
if current_expire_time == ARGV[1] then
|
|
local result = redis.call("del", KEYS[1])
|
|
return result ~= nil
|
|
else
|
|
return false
|
|
end
|
|
LUA
|
|
|
|
def self.synchronize(key, redis: nil, validity: DEFAULT_VALIDITY, &blk)
|
|
self.new(
|
|
key,
|
|
redis: redis,
|
|
validity: validity
|
|
).synchronize(&blk)
|
|
end
|
|
|
|
def initialize(key, redis: nil, validity: DEFAULT_VALIDITY)
|
|
@key = key
|
|
@using_global_redis = true if !redis
|
|
@redis = redis || Discourse.redis
|
|
@mutex = Mutex.new
|
|
@validity = validity
|
|
end
|
|
|
|
# NOTE wrapped in mutex to maintain its semantics
|
|
def synchronize
|
|
result = nil
|
|
|
|
@mutex.synchronize do
|
|
expire_time = get_lock
|
|
|
|
begin
|
|
result = yield
|
|
ensure
|
|
current_time = redis.time[0]
|
|
if current_time > expire_time
|
|
warn("held for too long, expected max: #{@validity} secs, took an extra #{current_time - expire_time} secs")
|
|
end
|
|
|
|
unlocked = UNLOCK_SCRIPT.eval(redis, [prefixed_key], [expire_time.to_s])
|
|
if !unlocked && current_time <= expire_time
|
|
warn("the redis key appears to have been tampered with before expiration")
|
|
end
|
|
end
|
|
end
|
|
|
|
result
|
|
end
|
|
|
|
private
|
|
|
|
attr_reader :key
|
|
attr_reader :redis
|
|
attr_reader :validity
|
|
|
|
def get_lock
|
|
attempts = 0
|
|
|
|
while true
|
|
expire_time = LOCK_SCRIPT.eval(redis, [prefixed_key], [validity])
|
|
|
|
return expire_time if expire_time
|
|
|
|
# Exponential backoff, max duration 1s
|
|
interval = attempts < 10 ? (0.001 * 2**attempts) : 1
|
|
sleep interval
|
|
|
|
# in readonly we will never be able to get a lock
|
|
if @using_global_redis && Discourse.recently_readonly?
|
|
attempts += 1
|
|
|
|
if attempts > CHECK_READONLY_ATTEMPTS
|
|
raise Discourse::ReadOnly
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
def prefixed_key
|
|
@prefixed_key ||= redis.respond_to?(:namespace_key) ? redis.namespace_key(key) : key
|
|
end
|
|
|
|
def warn(msg)
|
|
Rails.logger.warn("DistributedMutex(#{key.inspect}): #{msg}")
|
|
end
|
|
end
|