mirror of
https://github.com/discourse/discourse.git
synced 2024-11-25 08:43:25 +08:00
PERF: distributed cache class to help sync caches between processes
This commit is contained in:
parent
a2ba9a735e
commit
c55fa9d5c8
116
lib/distributed_cache.rb
Normal file
116
lib/distributed_cache.rb
Normal file
|
@ -0,0 +1,116 @@
|
|||
# Like a hash, just does its best to stay in sync across the farm
|
||||
# On boot all instances are blank, but they populate as various processes
|
||||
# fill it up
|
||||
#
|
||||
|
||||
require 'weakref'
|
||||
|
||||
class DistributedCache
|
||||
@subscribers = []
|
||||
@subscribed = false
|
||||
@lock = Mutex.new
|
||||
|
||||
attr_reader :key
|
||||
|
||||
def self.process_message(message)
|
||||
i = @subscribers.length-1
|
||||
|
||||
payload = message.data
|
||||
|
||||
while i >= 0
|
||||
begin
|
||||
current = @subscribers[i]
|
||||
|
||||
next if payload["origin"] == current.object_id
|
||||
hash = current.hash(message.site_id)
|
||||
case payload["op"]
|
||||
when "set" then hash[payload["key"]] = payload["value"]
|
||||
when "delete" then hash.delete(payload["key"])
|
||||
when "clear" then hash.clear
|
||||
end
|
||||
|
||||
rescue WeakRef::RefError
|
||||
@subscribers.delete_at(i)
|
||||
end
|
||||
i -= 1
|
||||
end
|
||||
end
|
||||
|
||||
def self.channel_name
|
||||
"/distributed_hash".freeze
|
||||
end
|
||||
|
||||
def self.ensure_subscribe!
|
||||
return if @subscribed
|
||||
@lock.synchronize do
|
||||
return if @subscribed
|
||||
MessageBus.subscribe(channel_name) do |message|
|
||||
@lock.synchronize do
|
||||
process_message(message)
|
||||
end
|
||||
end
|
||||
@subscribed = true
|
||||
end
|
||||
end
|
||||
|
||||
def self.publish(hash, message)
|
||||
message[:origin] = hash.object_id
|
||||
MessageBus.publish(channel_name, message)
|
||||
end
|
||||
|
||||
def self.set(hash, key, value)
|
||||
publish(hash, { op: :set, key: key, value: value })
|
||||
end
|
||||
|
||||
def self.delete(hash, key)
|
||||
publish(hash, { op: :delete, key: key})
|
||||
end
|
||||
|
||||
def self.clear(hash)
|
||||
publish(hash, {op: :clear})
|
||||
end
|
||||
|
||||
def self.register(hash)
|
||||
@lock.synchronize do
|
||||
@subscribers << WeakRef.new(hash)
|
||||
end
|
||||
end
|
||||
|
||||
def initialize(key)
|
||||
DistributedCache.ensure_subscribe!
|
||||
DistributedCache.register(self)
|
||||
|
||||
@key = key
|
||||
@data = {}
|
||||
end
|
||||
|
||||
|
||||
def []=(k,v)
|
||||
k = k.to_s if Symbol === k
|
||||
DistributedCache.set(self, k, v)
|
||||
hash[k] = v
|
||||
end
|
||||
|
||||
def [](k)
|
||||
k = k.to_s if Symbol === k
|
||||
hash[k]
|
||||
end
|
||||
|
||||
def delete(k)
|
||||
k = k.to_s if Symbol === k
|
||||
DistributedCache.delete(self, k)
|
||||
hash.delete(k)
|
||||
end
|
||||
|
||||
def clear
|
||||
DistributedCache.clear(self)
|
||||
hash.clear
|
||||
end
|
||||
|
||||
|
||||
def hash(db = nil)
|
||||
db ||= RailsMultisite::ConnectionManagement.current_db
|
||||
@data[db] ||= ThreadSafe::Hash.new
|
||||
end
|
||||
|
||||
end
|
|
@ -1,36 +0,0 @@
|
|||
# Like a hash, just does its best to stay in sync across the farm
|
||||
#
|
||||
# Redis backed with an allowance for a certain amount of latency
|
||||
|
||||
|
||||
class DistributedHash
|
||||
|
||||
@lock = Mutex.new
|
||||
|
||||
def self.ensure_subscribed
|
||||
@lock.synchronize do
|
||||
unless @subscribed
|
||||
|
||||
end
|
||||
@subscribed = true
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
def initialize(key, options={})
|
||||
@key = key
|
||||
end
|
||||
|
||||
def []=(k,v)
|
||||
end
|
||||
|
||||
def [](k)
|
||||
end
|
||||
|
||||
def delete(k)
|
||||
end
|
||||
|
||||
def clear
|
||||
end
|
||||
|
||||
end
|
72
spec/components/distributed_cache_spec.rb
Normal file
72
spec/components/distributed_cache_spec.rb
Normal file
|
@ -0,0 +1,72 @@
|
|||
require 'spec_helper'
|
||||
require 'distributed_cache'
|
||||
|
||||
describe DistributedCache do
|
||||
|
||||
def wait_for(&blk)
|
||||
i = 0
|
||||
result = false
|
||||
while !result && i < 300
|
||||
result = blk.call
|
||||
i += 1
|
||||
sleep 0.001
|
||||
end
|
||||
|
||||
result.should == true
|
||||
end
|
||||
|
||||
let! :cache1 do
|
||||
DistributedCache.new("test")
|
||||
end
|
||||
|
||||
let! :cache2 do
|
||||
DistributedCache.new("test")
|
||||
end
|
||||
|
||||
it 'allows coerces symbol keys to strings' do
|
||||
cache1[:key] = "test"
|
||||
cache1["key"].should == "test"
|
||||
|
||||
wait_for do
|
||||
cache2[:key] == "test"
|
||||
end
|
||||
cache2["key"].should == "test"
|
||||
end
|
||||
|
||||
it 'sets other caches' do
|
||||
cache1["test"] = "world"
|
||||
wait_for do
|
||||
cache2["test"] == "world"
|
||||
end
|
||||
end
|
||||
|
||||
it 'deletes from other caches' do
|
||||
cache1["foo"] = "bar"
|
||||
|
||||
wait_for do
|
||||
cache2["foo"] == "bar"
|
||||
end
|
||||
|
||||
cache1.delete("foo")
|
||||
cache1["foo"].should == nil
|
||||
|
||||
wait_for do
|
||||
cache2["foo"] == nil
|
||||
end
|
||||
end
|
||||
|
||||
it 'clears cache on request' do
|
||||
cache1["foo"] = "bar"
|
||||
|
||||
wait_for do
|
||||
cache2["foo"] == "bar"
|
||||
end
|
||||
|
||||
cache1.clear
|
||||
cache1["foo"].should == nil
|
||||
wait_for do
|
||||
cache2["boom"] == nil
|
||||
end
|
||||
end
|
||||
|
||||
end
|
|
@ -1,12 +0,0 @@
|
|||
require 'spec_helper'
|
||||
require 'distributed_hash'
|
||||
|
||||
describe DiscoursePluginRegistry do
|
||||
# it 'should sync the sets across instances' do
|
||||
# h1 = DistributedHash.new(:hash)
|
||||
# h2 = DistributedHash.new(:hash)
|
||||
|
||||
# h1[:hello] = "world"
|
||||
# h2[:hello].should == "world"
|
||||
# end
|
||||
end
|
Loading…
Reference in New Issue
Block a user