mirror of
https://github.com/discourse/discourse.git
synced 2024-11-22 15:52:11 +08:00
DEV: Implement distributed mutex in lua (#16228)
The rationale behind this was mostly to stop using `redis.synchronize` (now removed in redis gem 4.6)
This commit is contained in:
parent
6487179dec
commit
87353faac6
|
@ -1,10 +1,34 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
# Cross-process locking using Redis.
|
||||
#
|
||||
# Expiration happens when the current time is greater than the expire time
|
||||
class DistributedMutex
|
||||
DEFAULT_VALIDITY ||= 60
|
||||
DEFAULT_VALIDITY = 60
|
||||
CHECK_READONLY_ATTEMPTS = 10
|
||||
|
||||
LOCK_SCRIPT = DiscourseRedis::EvalHelper.new <<~LUA
|
||||
local now = redis.call("time")[1]
|
||||
local expire_time = now + ARGV[1]
|
||||
local current_expire_time = redis.call("get", KEYS[1])
|
||||
|
||||
if current_expire_time and tonumber(now) <= tonumber(current_expire_time) then
|
||||
return nil
|
||||
else
|
||||
local result = redis.call("setex", KEYS[1], ARGV[1] + 1, tostring(expire_time))
|
||||
return expire_time
|
||||
end
|
||||
LUA
|
||||
|
||||
UNLOCK_SCRIPT = DiscourseRedis::EvalHelper.new <<~LUA
|
||||
local current_expire_time = redis.call("get", KEYS[1])
|
||||
|
||||
if current_expire_time == ARGV[1] then
|
||||
local result = redis.call("del", KEYS[1])
|
||||
return result ~= nil
|
||||
else
|
||||
return false
|
||||
end
|
||||
LUA
|
||||
|
||||
def self.synchronize(key, redis: nil, validity: DEFAULT_VALIDITY, &blk)
|
||||
self.new(
|
||||
|
@ -22,26 +46,29 @@ class DistributedMutex
|
|||
@validity = validity
|
||||
end
|
||||
|
||||
CHECK_READONLY_ATTEMPT ||= 10
|
||||
|
||||
# NOTE wrapped in mutex to maintain its semantics
|
||||
def synchronize
|
||||
result = nil
|
||||
|
||||
@mutex.synchronize do
|
||||
expire_time = get_lock
|
||||
|
||||
begin
|
||||
yield
|
||||
result = yield
|
||||
ensure
|
||||
current_time = redis.time[0]
|
||||
if current_time > expire_time
|
||||
warn("held for too long, expected max: #{@validity} secs, took an extra #{current_time - expire_time} secs")
|
||||
end
|
||||
|
||||
if !unlock(expire_time) && current_time <= expire_time
|
||||
unlocked = UNLOCK_SCRIPT.eval(redis, [prefixed_key], [expire_time.to_s])
|
||||
if !unlocked && current_time <= expire_time
|
||||
warn("the redis key appears to have been tampered with before expiration")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
result
|
||||
end
|
||||
|
||||
private
|
||||
|
@ -50,79 +77,32 @@ class DistributedMutex
|
|||
attr_reader :redis
|
||||
attr_reader :validity
|
||||
|
||||
def warn(msg)
|
||||
Rails.logger.warn("DistributedMutex(#{key.inspect}): #{msg}")
|
||||
end
|
||||
|
||||
def get_lock
|
||||
attempts = 0
|
||||
|
||||
while true
|
||||
got_lock, expire_time = try_to_get_lock
|
||||
if got_lock
|
||||
return expire_time
|
||||
end
|
||||
expire_time = LOCK_SCRIPT.eval(redis, [prefixed_key], [validity])
|
||||
|
||||
return expire_time if expire_time
|
||||
|
||||
sleep 0.001
|
||||
|
||||
# in readonly we will never be able to get a lock
|
||||
if @using_global_redis && Discourse.recently_readonly?
|
||||
attempts += 1
|
||||
|
||||
if attempts > CHECK_READONLY_ATTEMPT
|
||||
if attempts > CHECK_READONLY_ATTEMPTS
|
||||
raise Discourse::ReadOnly
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def try_to_get_lock
|
||||
got_lock = false
|
||||
|
||||
now = redis.time[0]
|
||||
expire_time = now + validity
|
||||
|
||||
redis.synchronize do
|
||||
redis.unwatch
|
||||
redis.watch key
|
||||
|
||||
current_expire_time = redis.get key
|
||||
|
||||
if current_expire_time && now <= current_expire_time.to_i
|
||||
redis.unwatch
|
||||
|
||||
got_lock = false
|
||||
else
|
||||
result =
|
||||
redis.multi do |transaction|
|
||||
transaction.set key, expire_time.to_s
|
||||
transaction.expireat key, expire_time + 1
|
||||
def prefixed_key
|
||||
@prefixed_key ||= redis.respond_to?(:namespace_key) ? redis.namespace_key(key) : key
|
||||
end
|
||||
|
||||
got_lock = !result.nil?
|
||||
end
|
||||
|
||||
[got_lock, expire_time]
|
||||
end
|
||||
end
|
||||
|
||||
def unlock(expire_time)
|
||||
redis.synchronize do
|
||||
redis.unwatch
|
||||
redis.watch key
|
||||
current_expire_time = redis.get key
|
||||
|
||||
if current_expire_time == expire_time.to_s
|
||||
# MULTI is the way redis ensures the watched key
|
||||
# has not changed by the time it is deleted
|
||||
result =
|
||||
redis.multi do |transaction|
|
||||
transaction.del key
|
||||
end
|
||||
return !result.nil?
|
||||
else
|
||||
redis.unwatch
|
||||
return false
|
||||
end
|
||||
end
|
||||
def warn(msg)
|
||||
Rails.logger.warn("DistributedMutex(#{key.inspect}): #{msg}")
|
||||
end
|
||||
end
|
||||
|
|
|
@ -31,38 +31,28 @@ describe DistributedMutex do
|
|||
|
||||
Discourse.redis.setnx key, Time.now.to_i - 1
|
||||
|
||||
start = Time.now.to_i
|
||||
start = Time.now
|
||||
m.synchronize do
|
||||
"nop"
|
||||
end
|
||||
|
||||
# no longer than a second
|
||||
expect(Time.now.to_i).to be <= start + 1
|
||||
expect(Time.now).to be <= start + 1
|
||||
end
|
||||
|
||||
# expected: 1574200319
|
||||
# got: 1574200320
|
||||
#
|
||||
# (compared using ==)
|
||||
# ./spec/components/distributed_mutex_spec.rb:60:in `block (3 levels) in <main>'
|
||||
# ./lib/distributed_mutex.rb:33:in `block in synchronize'
|
||||
xit 'allows the validity of the lock to be configured' do
|
||||
freeze_time
|
||||
|
||||
it "allows the validity of the lock to be configured" do
|
||||
mutex = DistributedMutex.new(key, validity: 2)
|
||||
|
||||
mutex.synchronize do
|
||||
expect(Discourse.redis.ttl(key)).to eq(2)
|
||||
expect(Discourse.redis.get(key).to_i).to eq(Time.now.to_i + 2)
|
||||
expect(Discourse.redis.ttl(key)).to be <= 3
|
||||
expect(Discourse.redis.get(key).to_i).to be_within(1.second).of(Time.now.to_i + 2)
|
||||
end
|
||||
|
||||
mutex = DistributedMutex.new(key)
|
||||
|
||||
mutex.synchronize do
|
||||
expect(Discourse.redis.ttl(key)).to eq(DistributedMutex::DEFAULT_VALIDITY)
|
||||
|
||||
expect(Discourse.redis.get(key).to_i)
|
||||
.to eq(Time.now.to_i + DistributedMutex::DEFAULT_VALIDITY)
|
||||
expect(Discourse.redis.ttl(key)).to be <= DistributedMutex::DEFAULT_VALIDITY + 1
|
||||
expect(Discourse.redis.get(key).to_i).to be_within(1.second).of(Time.now.to_i + DistributedMutex::DEFAULT_VALIDITY)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -78,7 +68,7 @@ describe DistributedMutex do
|
|||
|
||||
context "readonly redis" do
|
||||
before do
|
||||
Discourse.redis.slaveof "127.0.0.1", "99991"
|
||||
Discourse.redis.slaveof "127.0.0.1", "65534"
|
||||
end
|
||||
|
||||
after do
|
||||
|
@ -97,7 +87,7 @@ describe DistributedMutex do
|
|||
}.to raise_error(Discourse::ReadOnly)
|
||||
|
||||
expect(done).to eq(false)
|
||||
expect(Time.now - start).to be <= 1.second
|
||||
expect(Time.now).to be <= start + 1
|
||||
end
|
||||
end
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user