discourse/lib/distributed_cache.rb

117 lines
2.2 KiB
Ruby

# Like a hash, just does its best to stay in sync across the farm
# On boot all instances are blank, but they populate as various processes
# fill it up
#
require 'weakref'
class DistributedCache
@subscribers = []
@subscribed = false
@lock = Mutex.new
attr_reader :key
def self.process_message(message)
i = @subscribers.length-1
payload = message.data
while i >= 0
begin
current = @subscribers[i]
next if payload["origin"] == current.object_id
hash = current.hash(message.site_id)
case payload["op"]
when "set" then hash[payload["key"]] = payload["value"]
when "delete" then hash.delete(payload["key"])
when "clear" then hash.clear
end
rescue WeakRef::RefError
@subscribers.delete_at(i)
end
i -= 1
end
end
def self.channel_name
"/distributed_hash".freeze
end
def self.ensure_subscribe!
return if @subscribed
@lock.synchronize do
return if @subscribed
MessageBus.subscribe(channel_name) do |message|
@lock.synchronize do
process_message(message)
end
end
@subscribed = true
end
end
def self.publish(hash, message)
message[:origin] = hash.object_id
MessageBus.publish(channel_name, message, {user_ids: [-1]})
end
def self.set(hash, key, value)
publish(hash, { op: :set, key: key, value: value })
end
def self.delete(hash, key)
publish(hash, { op: :delete, key: key})
end
def self.clear(hash)
publish(hash, {op: :clear})
end
def self.register(hash)
@lock.synchronize do
@subscribers << WeakRef.new(hash)
end
end
def initialize(key)
DistributedCache.ensure_subscribe!
DistributedCache.register(self)
@key = key
@data = {}
end
def []=(k,v)
k = k.to_s if Symbol === k
DistributedCache.set(self, k, v)
hash[k] = v
end
def [](k)
k = k.to_s if Symbol === k
hash[k]
end
def delete(k)
k = k.to_s if Symbol === k
DistributedCache.delete(self, k)
hash.delete(k)
end
def clear
DistributedCache.clear(self)
hash.clear
end
def hash(db = nil)
db ||= RailsMultisite::ConnectionManagement.current_db
@data[db] ||= ThreadSafe::Hash.new
end
end