FIX: DistributedMutex (#7953)

This commit is contained in:
Daniel Waterworth 2019-08-01 09:12:05 +01:00 committed by GitHub
parent 3b575b82f3
commit 20bc4a38a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 368 additions and 32 deletions

View File

@ -24,10 +24,43 @@ class DistributedMutex
# NOTE wrapped in mutex to maintain its semantics # NOTE wrapped in mutex to maintain its semantics
def synchronize def synchronize
@mutex.lock @mutex.synchronize do
expire_time = get_lock
begin
yield
ensure
current_time = redis.time[0]
unless current_time < expire_time
warn("held for too long")
end
unless unlock(expire_time)
warn("didn't unlock cleanly")
end
end
end
end
private
attr_reader :key
attr_reader :redis
attr_reader :validity
def warn(msg)
Rails.logger.warn("DistributedMutex(#{key.inspect}): #{msg}")
end
def get_lock
attempts = 0 attempts = 0
while !try_to_get_lock while true
got_lock, expire_time = try_to_get_lock
if got_lock
return expire_time
end
sleep 0.001 sleep 0.001
# in readonly we will never be able to get a lock # in readonly we will never be able to get a lock
if @using_global_redis && Discourse.recently_readonly? if @using_global_redis && Discourse.recently_readonly?
@ -38,38 +71,48 @@ class DistributedMutex
end end
end end
end end
yield
ensure
@redis.del @key
@mutex.unlock
end end
private
def try_to_get_lock def try_to_get_lock
got_lock = false got_lock = false
if @redis.setnx @key, Time.now.to_i + @validity now = redis.time[0]
@redis.expire @key, @validity expire_time = now + validity
got_lock = true
else
begin
@redis.watch @key
time = @redis.get @key
if time && time.to_i < Time.now.to_i redis.watch key
got_lock = @redis.multi do
@redis.set @key, Time.now.to_i + @validity current_expire_time = redis.get key
end
if current_expire_time.present? && current_expire_time.to_i > now
redis.unwatch
got_lock = false
else
result =
redis.multi do
redis.set key, expire_time.to_s
redis.expire key, validity
end end
ensure
@redis.unwatch got_lock = result.present?
end
end end
got_lock [got_lock, expire_time]
end end
def unlock(expire_time)
redis.watch key
current_expire_time = redis.get key
if current_expire_time == expire_time.to_s
result =
redis.multi do
redis.del key
end
return result.present?
else
redis.unwatch
return false
end
end
end end

View File

@ -1,5 +1,7 @@
# frozen_string_literal: true # frozen_string_literal: true
require 'benchmark'
desc "This task is called by the Heroku scheduler add-on" desc "This task is called by the Heroku scheduler add-on"
task enqueue_digest_emails: :environment do task enqueue_digest_emails: :environment do
@ -18,16 +20,11 @@ task version_check: :environment do
Jobs::VersionCheck.new.execute(nil) Jobs::VersionCheck.new.execute(nil)
end end
def time
start = Time.now
yield
puts "Elapsed #{((Time.now - start) * 1000).to_i}ms"
end
desc "run every task the scheduler knows about in that order, use only for debugging" desc "run every task the scheduler knows about in that order, use only for debugging"
task 'scheduler:run_all' => :environment do task 'scheduler:run_all' => :environment do
MiniScheduler::Manager.discover_schedules.each do |schedule| MiniScheduler::Manager.discover_schedules.each do |schedule|
puts "Running #{schedule}" puts "Running #{schedule}"
time { schedule.new.execute({}) } elapsed = Benchmark.realtime { schedule.new.execute({}) }
puts "Elapsed #{(elapsed * 1000).to_i}ms"
end end
end end

View File

@ -98,4 +98,47 @@ describe DistributedMutex do
end end
end end
context "executions" do
it "should not allow critical sections to overlap" do
connections = (0...3).map { DiscourseRedis.new }
scenario =
Concurrency::Scenario.new do |execution|
locked = false
$redis.del('mutex_key')
connections.each do |connection|
connection.unwatch
end
3.times do |i|
execution.spawn do
begin
redis =
Concurrency::RedisWrapper.new(
connections[i],
execution
)
2.times do
DistributedMutex.synchronize('mutex_key', redis: redis) do
raise "already locked #{execution.path}" if locked
locked = true
execution.yield
raise "already unlocked #{execution.path}" unless locked
locked = false
end
end
rescue Redis::ConnectionError
end
end
end
end
scenario.run(runs: 10)
end
end
end end

253
spec/support/concurrency.rb Normal file
View File

@ -0,0 +1,253 @@
# frozen_string_literal: true
require 'fiber'
module Concurrency
module Logic
class DeadEnd < StandardError; end
module Complete
class Path
def initialize
@path = []
@index = 0
end
def to_s
"#<Logic::Complete::Path path=#{@path}>"
end
def choose(*options)
raise DeadEnd if options.empty?
@path << [options.size, 0] unless @index < @path.size
pair = @path[@index]
raise "non-determinism" unless pair[0] == options.size
@index += 1
options[pair[1]]
end
def choose_with_weights(*options)
choose(options.map(&:first))
end
def dead_end
raise DeadEnd
end
def guard(condition)
dead_end unless condition
end
def next
@index = 0
until @path.empty?
pair = @path.last
pair[1] += 1
if pair[1] < pair[0]
break
else
@path.pop
end
end
!@path.empty?
end
end
def self.run(&blk)
path = Path.new
possibilities = []
while true
begin
possibilities << blk.call(path)
rescue DeadEnd
end
break unless path.next
end
possibilities
end
end
module Sampling
class Path
def initialize(random)
@random = random
end
def to_s
"#<Logic::Sampling::Path seed=#{@random.seed}>"
end
def choose(*options)
options.sample(random: @random)
end
def choose_with_weights(*options)
position = @random.rand
options.each do |(option, weight)|
if position <= weight
return option
else
position -= weight
end
end
raise "weights don't add up"
end
def dead_end
raise DeadEnd
end
def guard(condition)
dead_end unless condition
end
end
def self.run(seed, runs, &blk)
seed = seed.to_i
possibilities = []
runs.times do |i|
path = Path.new(Random.new(seed + i))
begin
possibilities << blk.call(path)
rescue DeadEnd
end
end
possibilities
end
end
def self.run(seed: nil, runs: nil, &blk)
if runs.present?
Sampling.run(seed, runs, &blk)
else
Complete.run(&blk)
end
end
end
class Scenario
def initialize(&blk)
@blk = blk
end
class Execution
attr_reader :path
def initialize(path)
@path = path
@tasks = []
end
def yield
Fiber.yield
end
def choose(*options)
@path.choose(*options)
end
def choose_with_weights(*options)
@path.choose_with_weights(*options)
end
def spawn(&blk)
@tasks << Fiber.new(&blk)
end
def run
until @tasks.empty?
task = @path.choose(*@tasks)
task.resume
unless task.alive?
@tasks.delete(task)
end
end
end
end
def run_with_path(path)
execution = Execution.new(path)
result = @blk.call(execution)
execution.run
result
end
def run(**opts)
Logic.run(**opts, &method(:run_with_path))
end
end
class RedisWrapper
def initialize(redis, execution)
@redis = redis
@execution = execution
@in_transaction = false
end
def multi(&blk)
with_possible_failure do
with_in_transaction do
@redis.multi(&blk)
end
end
end
def method_missing(method, *args, &blk)
if @in_transaction
@redis.send(method, *args, &blk)
else
with_possible_failure do
@redis.send(method, *args, &blk)
end
end
end
private
def with_in_transaction
previous_value, @in_transaction = @in_transaction, true
begin
return yield
ensure
@in_transaction = previous_value
end
end
def with_possible_failure
outcome =
@execution.choose_with_weights(
[:succeed, 0.96],
[:fail_before, 0.02],
[:fail_after, 0.02]
)
@execution.yield
if outcome == :fail_before
raise Redis::ConnectionError
end
result = yield
@execution.yield
if outcome == :fail_after
raise Redis::ConnectionError
end
result
end
end
end