diff --git a/lib/distributed_mutex.rb b/lib/distributed_mutex.rb index e17377436ff..420c5bc1436 100644 --- a/lib/distributed_mutex.rb +++ b/lib/distributed_mutex.rb @@ -24,10 +24,43 @@ class DistributedMutex # NOTE wrapped in mutex to maintain its semantics def synchronize - @mutex.lock + @mutex.synchronize do + expire_time = get_lock + + begin + yield + ensure + current_time = redis.time[0] + unless current_time < expire_time + warn("held for too long") + end + + unless unlock(expire_time) + warn("didn't unlock cleanly") + end + end + end + end + + private + + attr_reader :key + attr_reader :redis + attr_reader :validity + + def warn(msg) + Rails.logger.warn("DistributedMutex(#{key.inspect}): #{msg}") + end + + def get_lock attempts = 0 - while !try_to_get_lock + while true + got_lock, expire_time = try_to_get_lock + if got_lock + return expire_time + end + sleep 0.001 # in readonly we will never be able to get a lock if @using_global_redis && Discourse.recently_readonly? @@ -38,38 +71,48 @@ class DistributedMutex end end end - - yield - - ensure - @redis.del @key - @mutex.unlock end - private - def try_to_get_lock got_lock = false - if @redis.setnx @key, Time.now.to_i + @validity - @redis.expire @key, @validity - got_lock = true - else - begin - @redis.watch @key - time = @redis.get @key + now = redis.time[0] + expire_time = now + validity - if time && time.to_i < Time.now.to_i - got_lock = @redis.multi do - @redis.set @key, Time.now.to_i + @validity - end + redis.watch key + + current_expire_time = redis.get key + + if current_expire_time.present? && current_expire_time.to_i > now + redis.unwatch + + got_lock = false + else + result = + redis.multi do + redis.set key, expire_time.to_s + redis.expire key, validity end - ensure - @redis.unwatch - end + + got_lock = result.present? end - got_lock + [got_lock, expire_time] end + def unlock(expire_time) + redis.watch key + current_expire_time = redis.get key + + if current_expire_time == expire_time.to_s + result = + redis.multi do + redis.del key + end + return result.present? + else + redis.unwatch + return false + end + end end diff --git a/lib/tasks/scheduler.rake b/lib/tasks/scheduler.rake index 4b38e4d69bc..6bfb28eceaa 100644 --- a/lib/tasks/scheduler.rake +++ b/lib/tasks/scheduler.rake @@ -1,5 +1,7 @@ # frozen_string_literal: true +require 'benchmark' + desc "This task is called by the Heroku scheduler add-on" task enqueue_digest_emails: :environment do @@ -18,16 +20,11 @@ task version_check: :environment do Jobs::VersionCheck.new.execute(nil) end -def time - start = Time.now - yield - puts "Elapsed #{((Time.now - start) * 1000).to_i}ms" -end - desc "run every task the scheduler knows about in that order, use only for debugging" task 'scheduler:run_all' => :environment do MiniScheduler::Manager.discover_schedules.each do |schedule| puts "Running #{schedule}" - time { schedule.new.execute({}) } + elapsed = Benchmark.realtime { schedule.new.execute({}) } + puts "Elapsed #{(elapsed * 1000).to_i}ms" end end diff --git a/spec/components/distributed_mutex_spec.rb b/spec/components/distributed_mutex_spec.rb index 569c35eb90d..95588c1267a 100644 --- a/spec/components/distributed_mutex_spec.rb +++ b/spec/components/distributed_mutex_spec.rb @@ -98,4 +98,47 @@ describe DistributedMutex do end end + context "executions" do + it "should not allow critical sections to overlap" do + connections = (0...3).map { DiscourseRedis.new } + + scenario = + Concurrency::Scenario.new do |execution| + locked = false + + $redis.del('mutex_key') + + connections.each do |connection| + connection.unwatch + end + + 3.times do |i| + execution.spawn do + begin + redis = + Concurrency::RedisWrapper.new( + connections[i], + execution + ) + + 2.times do + DistributedMutex.synchronize('mutex_key', redis: redis) do + raise "already locked #{execution.path}" if locked + locked = true + + execution.yield + + raise "already unlocked #{execution.path}" unless locked + locked = false + end + end + rescue Redis::ConnectionError + end + end + end + end + + scenario.run(runs: 10) + end + end end diff --git a/spec/support/concurrency.rb b/spec/support/concurrency.rb new file mode 100644 index 00000000000..fdc0f796ccd --- /dev/null +++ b/spec/support/concurrency.rb @@ -0,0 +1,253 @@ +# frozen_string_literal: true + +require 'fiber' + +module Concurrency + module Logic + class DeadEnd < StandardError; end + + module Complete + class Path + def initialize + @path = [] + @index = 0 + end + + def to_s + "#" + end + + def choose(*options) + raise DeadEnd if options.empty? + + @path << [options.size, 0] unless @index < @path.size + + pair = @path[@index] + raise "non-determinism" unless pair[0] == options.size + + @index += 1 + options[pair[1]] + end + + def choose_with_weights(*options) + choose(options.map(&:first)) + end + + def dead_end + raise DeadEnd + end + + def guard(condition) + dead_end unless condition + end + + def next + @index = 0 + + until @path.empty? + pair = @path.last + pair[1] += 1 + if pair[1] < pair[0] + break + else + @path.pop + end + end + + !@path.empty? + end + end + + def self.run(&blk) + path = Path.new + possibilities = [] + + while true + begin + possibilities << blk.call(path) + rescue DeadEnd + end + + break unless path.next + end + + possibilities + end + end + + module Sampling + class Path + def initialize(random) + @random = random + end + + def to_s + "#" + end + + def choose(*options) + options.sample(random: @random) + end + + def choose_with_weights(*options) + position = @random.rand + options.each do |(option, weight)| + if position <= weight + return option + else + position -= weight + end + end + raise "weights don't add up" + end + + def dead_end + raise DeadEnd + end + + def guard(condition) + dead_end unless condition + end + end + + def self.run(seed, runs, &blk) + seed = seed.to_i + possibilities = [] + + runs.times do |i| + path = Path.new(Random.new(seed + i)) + + begin + possibilities << blk.call(path) + rescue DeadEnd + end + end + + possibilities + end + end + + def self.run(seed: nil, runs: nil, &blk) + if runs.present? + Sampling.run(seed, runs, &blk) + else + Complete.run(&blk) + end + end + end + + class Scenario + def initialize(&blk) + @blk = blk + end + + class Execution + attr_reader :path + + def initialize(path) + @path = path + @tasks = [] + end + + def yield + Fiber.yield + end + + def choose(*options) + @path.choose(*options) + end + + def choose_with_weights(*options) + @path.choose_with_weights(*options) + end + + def spawn(&blk) + @tasks << Fiber.new(&blk) + end + + def run + until @tasks.empty? + task = @path.choose(*@tasks) + task.resume + unless task.alive? + @tasks.delete(task) + end + end + end + end + + def run_with_path(path) + execution = Execution.new(path) + result = @blk.call(execution) + execution.run + result + end + + def run(**opts) + Logic.run(**opts, &method(:run_with_path)) + end + end + + class RedisWrapper + def initialize(redis, execution) + @redis = redis + @execution = execution + @in_transaction = false + end + + def multi(&blk) + with_possible_failure do + with_in_transaction do + @redis.multi(&blk) + end + end + end + + def method_missing(method, *args, &blk) + if @in_transaction + @redis.send(method, *args, &blk) + else + with_possible_failure do + @redis.send(method, *args, &blk) + end + end + end + + private + + def with_in_transaction + previous_value, @in_transaction = @in_transaction, true + + begin + return yield + ensure + @in_transaction = previous_value + end + end + + def with_possible_failure + outcome = + @execution.choose_with_weights( + [:succeed, 0.96], + [:fail_before, 0.02], + [:fail_after, 0.02] + ) + + @execution.yield + + if outcome == :fail_before + raise Redis::ConnectionError + end + + result = yield + + @execution.yield + + if outcome == :fail_after + raise Redis::ConnectionError + end + + result + end + end +end