mirror of
https://github.com/discourse/discourse.git
synced 2024-11-25 09:42:07 +08:00
REFACTOR: distributed_cache is moved to the message_bus gem
This commit is contained in:
parent
99d1ded3b3
commit
d166c38ab7
|
@ -1,153 +1,14 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
# Like a hash, just does its best to stay in sync across the farm
|
||||
# On boot all instances are blank, but they populate as various processes
|
||||
# fill it up
|
||||
|
||||
require 'weakref'
|
||||
require 'base64'
|
||||
|
||||
class DistributedCache
|
||||
|
||||
class Manager
|
||||
CHANNEL_NAME ||= '/distributed_hash'.freeze
|
||||
|
||||
def initialize(message_bus = nil)
|
||||
@subscribers = []
|
||||
@subscribed = false
|
||||
@lock = Mutex.new
|
||||
@message_bus = message_bus || MessageBus
|
||||
end
|
||||
|
||||
def subscribers
|
||||
@subscribers
|
||||
end
|
||||
|
||||
def process_message(message)
|
||||
i = @subscribers.length - 1
|
||||
|
||||
payload = message.data
|
||||
|
||||
while i >= 0
|
||||
begin
|
||||
current = @subscribers[i]
|
||||
|
||||
next if payload["origin"] == current.identity && !Rails.env.test?
|
||||
next if current.key != payload["hash_key"]
|
||||
next if payload["discourse_version"] != Discourse.git_version
|
||||
|
||||
hash = current.hash(message.site_id)
|
||||
|
||||
case payload["op"]
|
||||
when "set" then hash[payload["key"]] = payload["marshalled"] ? Marshal.load(Base64.decode64(payload["value"])) : payload["value"]
|
||||
when "delete" then hash.delete(payload["key"])
|
||||
when "clear" then hash.clear
|
||||
end
|
||||
|
||||
rescue WeakRef::RefError
|
||||
@subscribers.delete_at(i)
|
||||
ensure
|
||||
i -= 1
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def ensure_subscribe!
|
||||
return if @subscribed
|
||||
@lock.synchronize do
|
||||
return if @subscribed
|
||||
@message_bus.subscribe(CHANNEL_NAME) do |message|
|
||||
@lock.synchronize do
|
||||
process_message(message)
|
||||
end
|
||||
end
|
||||
@subscribed = true
|
||||
end
|
||||
end
|
||||
|
||||
def publish(hash, message)
|
||||
message[:origin] = hash.identity
|
||||
message[:hash_key] = hash.key
|
||||
message[:discourse_version] = Discourse.git_version
|
||||
@message_bus.publish(CHANNEL_NAME, message, user_ids: [-1])
|
||||
end
|
||||
|
||||
def set(hash, key, value)
|
||||
# special support for set
|
||||
marshal = (Set === value || Hash === value || Array === value)
|
||||
value = Base64.encode64(Marshal.dump(value)) if marshal
|
||||
publish(hash, op: :set, key: key, value: value, marshalled: marshal)
|
||||
end
|
||||
|
||||
def delete(hash, key)
|
||||
publish(hash, op: :delete, key: key)
|
||||
end
|
||||
|
||||
def clear(hash)
|
||||
publish(hash, op: :clear)
|
||||
end
|
||||
|
||||
def register(hash)
|
||||
@lock.synchronize do
|
||||
@subscribers << WeakRef.new(hash)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@default_manager = Manager.new
|
||||
|
||||
def self.default_manager
|
||||
@default_manager
|
||||
end
|
||||
|
||||
attr_reader :key
|
||||
require 'message_bus/distributed_cache'
|
||||
|
||||
class DistributedCache < MessageBus::DistributedCache
|
||||
def initialize(key, manager: nil, namespace: true)
|
||||
@key = key
|
||||
@data = {}
|
||||
@manager = manager || DistributedCache.default_manager
|
||||
@namespace = namespace
|
||||
|
||||
@manager.ensure_subscribe!
|
||||
@manager.register(self)
|
||||
super(
|
||||
key,
|
||||
manager: manager,
|
||||
namespace: namespace,
|
||||
app_version: Discourse.git_version
|
||||
)
|
||||
end
|
||||
|
||||
def identity
|
||||
# fork resilient / multi machine identity
|
||||
(@seed_id ||= SecureRandom.hex) + "#{Process.pid}"
|
||||
end
|
||||
|
||||
def []=(k, v)
|
||||
k = k.to_s if Symbol === k
|
||||
@manager.set(self, k, v)
|
||||
hash[k] = v
|
||||
end
|
||||
|
||||
def [](k)
|
||||
k = k.to_s if Symbol === k
|
||||
hash[k]
|
||||
end
|
||||
|
||||
def delete(k, publish: true)
|
||||
k = k.to_s if Symbol === k
|
||||
@manager.delete(self, k) if publish
|
||||
hash.delete(k)
|
||||
end
|
||||
|
||||
def clear
|
||||
@manager.clear(self)
|
||||
hash.clear
|
||||
end
|
||||
|
||||
def hash(db = nil)
|
||||
db =
|
||||
if @namespace
|
||||
db || RailsMultisite::ConnectionManagement.current_db
|
||||
else
|
||||
RailsMultisite::ConnectionManagement::DEFAULT
|
||||
end
|
||||
|
||||
@data[db] ||= ThreadSafe::Hash.new
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -1,134 +0,0 @@
|
|||
require 'rails_helper'
|
||||
require 'distributed_cache'
|
||||
|
||||
describe DistributedCache do
|
||||
|
||||
before :all do
|
||||
@bus = MessageBus::Instance.new
|
||||
@bus.configure(backend: :memory)
|
||||
@manager = DistributedCache::Manager.new(@bus)
|
||||
end
|
||||
|
||||
after :all do
|
||||
@bus.destroy
|
||||
end
|
||||
|
||||
def cache(name)
|
||||
DistributedCache.new(name, manager: @manager)
|
||||
end
|
||||
|
||||
let :cache_name do
|
||||
SecureRandom.hex
|
||||
end
|
||||
|
||||
let! :cache1 do
|
||||
cache(cache_name)
|
||||
end
|
||||
|
||||
let! :cache2 do
|
||||
cache(cache_name)
|
||||
end
|
||||
|
||||
it 'supports arrays with hashes' do
|
||||
|
||||
c1 = cache("test1")
|
||||
c2 = cache("test1")
|
||||
|
||||
c1["test"] = [{ test: :test }]
|
||||
|
||||
wait_for do
|
||||
c2["test"] == [{ test: :test }]
|
||||
end
|
||||
|
||||
expect(c2[:test]).to eq([{ test: :test }])
|
||||
end
|
||||
|
||||
it 'allows us to store Set' do
|
||||
c1 = cache("test1")
|
||||
c2 = cache("test1")
|
||||
|
||||
set = Set.new
|
||||
set << 1
|
||||
set << "b"
|
||||
set << 92803984
|
||||
set << 93739739873973
|
||||
|
||||
c1["cats"] = set
|
||||
|
||||
wait_for do
|
||||
c2["cats"] == set
|
||||
end
|
||||
|
||||
expect(c2["cats"]).to eq(set)
|
||||
|
||||
set << 5
|
||||
|
||||
c2["cats"] = set
|
||||
|
||||
wait_for do
|
||||
c1["cats"] == set
|
||||
end
|
||||
|
||||
expect(c1["cats"]).to eq(set)
|
||||
end
|
||||
|
||||
it 'does not leak state across caches' do
|
||||
c2 = cache("test1")
|
||||
c3 = cache("test1")
|
||||
c2["hi"] = "hi"
|
||||
wait_for do
|
||||
c3["hi"] == "hi"
|
||||
end
|
||||
|
||||
Thread.pass
|
||||
expect(cache1["hi"]).to eq(nil)
|
||||
|
||||
end
|
||||
|
||||
it 'allows coerces symbol keys to strings' do
|
||||
cache1[:key] = "test"
|
||||
expect(cache1["key"]).to eq("test")
|
||||
|
||||
wait_for do
|
||||
cache2[:key] == "test"
|
||||
end
|
||||
expect(cache2["key"]).to eq("test")
|
||||
end
|
||||
|
||||
it 'sets other caches' do
|
||||
cache1["test"] = "world"
|
||||
wait_for do
|
||||
cache2["test"] == "world"
|
||||
end
|
||||
end
|
||||
|
||||
it 'deletes from other caches' do
|
||||
cache1["foo"] = "bar"
|
||||
|
||||
wait_for do
|
||||
cache2["foo"] == "bar"
|
||||
end
|
||||
|
||||
cache1.delete("foo")
|
||||
expect(cache1["foo"]).to eq(nil)
|
||||
|
||||
wait_for do
|
||||
cache2["foo"] == nil
|
||||
end
|
||||
end
|
||||
|
||||
it 'clears cache on request' do
|
||||
cache1["foo"] = "bar"
|
||||
|
||||
wait_for do
|
||||
cache2["foo"] == "bar"
|
||||
end
|
||||
|
||||
cache1.clear
|
||||
expect(cache1["foo"]).to eq(nil)
|
||||
wait_for do
|
||||
cache2["boom"] == nil
|
||||
end
|
||||
end
|
||||
|
||||
end
|
Loading…
Reference in New Issue
Block a user