diff --git a/lib/distributed_cache.rb b/lib/distributed_cache.rb new file mode 100644 index 00000000000..59644db40e9 --- /dev/null +++ b/lib/distributed_cache.rb @@ -0,0 +1,116 @@ +# Like a hash, just does its best to stay in sync across the farm +# On boot all instances are blank, but they populate as various processes +# fill it up +# + +require 'weakref' + +class DistributedCache + @subscribers = [] + @subscribed = false + @lock = Mutex.new + + attr_reader :key + + def self.process_message(message) + i = @subscribers.length-1 + + payload = message.data + + while i >= 0 + begin + current = @subscribers[i] + + next if payload["origin"] == current.object_id + hash = current.hash(message.site_id) + case payload["op"] + when "set" then hash[payload["key"]] = payload["value"] + when "delete" then hash.delete(payload["key"]) + when "clear" then hash.clear + end + + rescue WeakRef::RefError + @subscribers.delete_at(i) + end + i -= 1 + end + end + + def self.channel_name + "/distributed_hash".freeze + end + + def self.ensure_subscribe! + return if @subscribed + @lock.synchronize do + return if @subscribed + MessageBus.subscribe(channel_name) do |message| + @lock.synchronize do + process_message(message) + end + end + @subscribed = true + end + end + + def self.publish(hash, message) + message[:origin] = hash.object_id + MessageBus.publish(channel_name, message) + end + + def self.set(hash, key, value) + publish(hash, { op: :set, key: key, value: value }) + end + + def self.delete(hash, key) + publish(hash, { op: :delete, key: key}) + end + + def self.clear(hash) + publish(hash, {op: :clear}) + end + + def self.register(hash) + @lock.synchronize do + @subscribers << WeakRef.new(hash) + end + end + + def initialize(key) + DistributedCache.ensure_subscribe! + DistributedCache.register(self) + + @key = key + @data = {} + end + + + def []=(k,v) + k = k.to_s if Symbol === k + DistributedCache.set(self, k, v) + hash[k] = v + end + + def [](k) + k = k.to_s if Symbol === k + hash[k] + end + + def delete(k) + k = k.to_s if Symbol === k + DistributedCache.delete(self, k) + hash.delete(k) + end + + def clear + DistributedCache.clear(self) + hash.clear + end + + + def hash(db = nil) + db ||= RailsMultisite::ConnectionManagement.current_db + @data[db] ||= ThreadSafe::Hash.new + end + +end diff --git a/lib/distributed_hash.rb b/lib/distributed_hash.rb deleted file mode 100644 index b53f8472db3..00000000000 --- a/lib/distributed_hash.rb +++ /dev/null @@ -1,36 +0,0 @@ -# Like a hash, just does its best to stay in sync across the farm -# -# Redis backed with an allowance for a certain amount of latency - - -class DistributedHash - - @lock = Mutex.new - - def self.ensure_subscribed - @lock.synchronize do - unless @subscribed - - end - @subscribed = true - end - end - - - def initialize(key, options={}) - @key = key - end - - def []=(k,v) - end - - def [](k) - end - - def delete(k) - end - - def clear - end - -end diff --git a/spec/components/distributed_cache_spec.rb b/spec/components/distributed_cache_spec.rb new file mode 100644 index 00000000000..f41e2221c89 --- /dev/null +++ b/spec/components/distributed_cache_spec.rb @@ -0,0 +1,72 @@ +require 'spec_helper' +require 'distributed_cache' + +describe DistributedCache do + + def wait_for(&blk) + i = 0 + result = false + while !result && i < 300 + result = blk.call + i += 1 + sleep 0.001 + end + + result.should == true + end + + let! :cache1 do + DistributedCache.new("test") + end + + let! :cache2 do + DistributedCache.new("test") + end + + it 'allows coerces symbol keys to strings' do + cache1[:key] = "test" + cache1["key"].should == "test" + + wait_for do + cache2[:key] == "test" + end + cache2["key"].should == "test" + end + + it 'sets other caches' do + cache1["test"] = "world" + wait_for do + cache2["test"] == "world" + end + end + + it 'deletes from other caches' do + cache1["foo"] = "bar" + + wait_for do + cache2["foo"] == "bar" + end + + cache1.delete("foo") + cache1["foo"].should == nil + + wait_for do + cache2["foo"] == nil + end + end + + it 'clears cache on request' do + cache1["foo"] = "bar" + + wait_for do + cache2["foo"] == "bar" + end + + cache1.clear + cache1["foo"].should == nil + wait_for do + cache2["boom"] == nil + end + end + +end diff --git a/spec/components/distributed_hash_spec.rb b/spec/components/distributed_hash_spec.rb deleted file mode 100644 index 8df49fa4760..00000000000 --- a/spec/components/distributed_hash_spec.rb +++ /dev/null @@ -1,12 +0,0 @@ -require 'spec_helper' -require 'distributed_hash' - -describe DiscoursePluginRegistry do - # it 'should sync the sets across instances' do - # h1 = DistributedHash.new(:hash) - # h2 = DistributedHash.new(:hash) - - # h1[:hello] = "world" - # h2[:hello].should == "world" - # end -end