diff --git a/lib/distributed_cache.rb b/lib/distributed_cache.rb index 203bb2ae842..31cb69ffc25 100644 --- a/lib/distributed_cache.rb +++ b/lib/distributed_cache.rb @@ -1,153 +1,14 @@ # frozen_string_literal: true -# Like a hash, just does its best to stay in sync across the farm -# On boot all instances are blank, but they populate as various processes -# fill it up - -require 'weakref' -require 'base64' - -class DistributedCache - - class Manager - CHANNEL_NAME ||= '/distributed_hash'.freeze - - def initialize(message_bus = nil) - @subscribers = [] - @subscribed = false - @lock = Mutex.new - @message_bus = message_bus || MessageBus - end - - def subscribers - @subscribers - end - - def process_message(message) - i = @subscribers.length - 1 - - payload = message.data - - while i >= 0 - begin - current = @subscribers[i] - - next if payload["origin"] == current.identity && !Rails.env.test? - next if current.key != payload["hash_key"] - next if payload["discourse_version"] != Discourse.git_version - - hash = current.hash(message.site_id) - - case payload["op"] - when "set" then hash[payload["key"]] = payload["marshalled"] ? Marshal.load(Base64.decode64(payload["value"])) : payload["value"] - when "delete" then hash.delete(payload["key"]) - when "clear" then hash.clear - end - - rescue WeakRef::RefError - @subscribers.delete_at(i) - ensure - i -= 1 - end - end - end - - def ensure_subscribe! - return if @subscribed - @lock.synchronize do - return if @subscribed - @message_bus.subscribe(CHANNEL_NAME) do |message| - @lock.synchronize do - process_message(message) - end - end - @subscribed = true - end - end - - def publish(hash, message) - message[:origin] = hash.identity - message[:hash_key] = hash.key - message[:discourse_version] = Discourse.git_version - @message_bus.publish(CHANNEL_NAME, message, user_ids: [-1]) - end - - def set(hash, key, value) - # special support for set - marshal = (Set === value || Hash === value || Array === value) - value = Base64.encode64(Marshal.dump(value)) if marshal - publish(hash, op: :set, key: key, value: value, marshalled: marshal) - end - - def delete(hash, key) - publish(hash, op: :delete, key: key) - end - - def clear(hash) - publish(hash, op: :clear) - end - - def register(hash) - @lock.synchronize do - @subscribers << WeakRef.new(hash) - end - end - end - - @default_manager = Manager.new - - def self.default_manager - @default_manager - end - - attr_reader :key +require 'message_bus/distributed_cache' +class DistributedCache < MessageBus::DistributedCache def initialize(key, manager: nil, namespace: true) - @key = key - @data = {} - @manager = manager || DistributedCache.default_manager - @namespace = namespace - - @manager.ensure_subscribe! - @manager.register(self) + super( + key, + manager: manager, + namespace: namespace, + app_version: Discourse.git_version + ) end - - def identity - # fork resilient / multi machine identity - (@seed_id ||= SecureRandom.hex) + "#{Process.pid}" - end - - def []=(k, v) - k = k.to_s if Symbol === k - @manager.set(self, k, v) - hash[k] = v - end - - def [](k) - k = k.to_s if Symbol === k - hash[k] - end - - def delete(k, publish: true) - k = k.to_s if Symbol === k - @manager.delete(self, k) if publish - hash.delete(k) - end - - def clear - @manager.clear(self) - hash.clear - end - - def hash(db = nil) - db = - if @namespace - db || RailsMultisite::ConnectionManagement.current_db - else - RailsMultisite::ConnectionManagement::DEFAULT - end - - @data[db] ||= ThreadSafe::Hash.new - end - end diff --git a/spec/components/distributed_cache_spec.rb b/spec/components/distributed_cache_spec.rb deleted file mode 100644 index 4ff420c96e4..00000000000 --- a/spec/components/distributed_cache_spec.rb +++ /dev/null @@ -1,134 +0,0 @@ -require 'rails_helper' -require 'distributed_cache' - -describe DistributedCache do - - before :all do - @bus = MessageBus::Instance.new - @bus.configure(backend: :memory) - @manager = DistributedCache::Manager.new(@bus) - end - - after :all do - @bus.destroy - end - - def cache(name) - DistributedCache.new(name, manager: @manager) - end - - let :cache_name do - SecureRandom.hex - end - - let! :cache1 do - cache(cache_name) - end - - let! :cache2 do - cache(cache_name) - end - - it 'supports arrays with hashes' do - - c1 = cache("test1") - c2 = cache("test1") - - c1["test"] = [{ test: :test }] - - wait_for do - c2["test"] == [{ test: :test }] - end - - expect(c2[:test]).to eq([{ test: :test }]) - end - - it 'allows us to store Set' do - c1 = cache("test1") - c2 = cache("test1") - - set = Set.new - set << 1 - set << "b" - set << 92803984 - set << 93739739873973 - - c1["cats"] = set - - wait_for do - c2["cats"] == set - end - - expect(c2["cats"]).to eq(set) - - set << 5 - - c2["cats"] = set - - wait_for do - c1["cats"] == set - end - - expect(c1["cats"]).to eq(set) - end - - it 'does not leak state across caches' do - c2 = cache("test1") - c3 = cache("test1") - c2["hi"] = "hi" - wait_for do - c3["hi"] == "hi" - end - - Thread.pass - expect(cache1["hi"]).to eq(nil) - - end - - it 'allows coerces symbol keys to strings' do - cache1[:key] = "test" - expect(cache1["key"]).to eq("test") - - wait_for do - cache2[:key] == "test" - end - expect(cache2["key"]).to eq("test") - end - - it 'sets other caches' do - cache1["test"] = "world" - wait_for do - cache2["test"] == "world" - end - end - - it 'deletes from other caches' do - cache1["foo"] = "bar" - - wait_for do - cache2["foo"] == "bar" - end - - cache1.delete("foo") - expect(cache1["foo"]).to eq(nil) - - wait_for do - cache2["foo"] == nil - end - end - - it 'clears cache on request' do - cache1["foo"] = "bar" - - wait_for do - cache2["foo"] == "bar" - end - - cache1.clear - expect(cache1["foo"]).to eq(nil) - wait_for do - cache2["boom"] == nil - end - end - -end