diff --git a/lib/distributed_mutex.rb b/lib/distributed_mutex.rb index 5d85451c949..36d07e9f0e3 100644 --- a/lib/distributed_mutex.rb +++ b/lib/distributed_mutex.rb @@ -1,51 +1,48 @@ # Cross-process locking using Redis. class DistributedMutex - attr_accessor :redis - attr_reader :got_lock def initialize(key, redis=nil) @key = key @redis = redis || $redis - @got_lock = false + @mutex = Mutex.new end + # NOTE wrapped in mutex to maintain its semantics + def synchronize + @mutex.lock + while !try_to_get_lock + sleep 0.001 + end + + yield + + ensure + @redis.del @key + @mutex.unlock + end + + private + def try_to_get_lock - if redis.setnx @key, Time.now.to_i + 60 - redis.expire @key, 60 - @got_lock = true + got_lock = false + if @redis.setnx @key, Time.now.to_i + 60 + @redis.expire @key, 60 + got_lock = true else begin - redis.watch @key - time = redis.get @key + @redis.watch @key + time = @redis.get @key if time && time.to_i < Time.now.to_i - @got_lock = redis.multi do - redis.set @key, Time.now.to_i + 60 + got_lock = @redis.multi do + @redis.set @key, Time.now.to_i + 60 end end ensure - redis.unwatch + @redis.unwatch end end + + got_lock end - def get_lock - return if @got_lock - - start = Time.now - while !@got_lock - try_to_get_lock - end - end - - def release_lock - redis.del @key - @got_lock = false - end - - def synchronize - get_lock - yield - ensure - release_lock - end end diff --git a/spec/components/distributed_mutex_spec.rb b/spec/components/distributed_mutex_spec.rb index 70a00ba8ade..37cbb4e1f98 100644 --- a/spec/components/distributed_mutex_spec.rb +++ b/spec/components/distributed_mutex_spec.rb @@ -3,34 +3,47 @@ require_dependency 'distributed_mutex' describe DistributedMutex do it "allows only one mutex object to have the lock at a time" do - m1 = DistributedMutex.new("test_mutex_key") - m2 = DistributedMutex.new("test_mutex_key") - - m1.get_lock - m2.got_lock.should be_false - - t = Thread.new do - m2.get_lock + mutexes = (1..10).map do + DistributedMutex.new("test_mutex_key") end - m1.release_lock - t.join - m2.got_lock.should == true - end - - it "synchronizes correctly" do - array = [] - t = Thread.new do - DistributedMutex.new("correct_sync").synchronize do - sleep 0.01 - array.push 1 + x = 0 + mutexes.map do |m| + Thread.new do + m.synchronize do + y = x + sleep 0.001 + x = y + 1 + end end - end - sleep 0.005 - DistributedMutex.new("correct_sync").synchronize do - array.push 2 - end - t.join - array.should == [1, 2] + end.map(&:join) + + x.should == 10 end + + it "handles auto cleanup correctly" do + m = DistributedMutex.new("test_mutex_key") + + $redis.setnx "test_mutex_key", Time.now.to_i - 1 + + + start = Time.now.to_i + m.synchronize do + "nop" + end + + # no longer than a second + Time.now.to_i.should <= start + 1 + end + + it "maintains mutex semantics" do + m = DistributedMutex.new("test_mutex_key") + + lambda { + m.synchronize do + m.synchronize{} + end + }.should raise_error(ThreadError) + end + end