mirror of
https://github.com/discourse/discourse.git
synced 2024-11-30 14:18:12 +08:00
30990006a9
This reduces chances of errors where consumers of strings mutate inputs and reduces memory usage of the app. Test suite passes now, but there may be some stuff left, so we will run a few sites on a branch prior to merging
306 lines
7.9 KiB
Ruby
306 lines
7.9 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
#
|
|
# A wrapper around redis that namespaces keys with the current site id
|
|
#
|
|
require_dependency 'cache'
|
|
|
|
class DiscourseRedis
|
|
class FallbackHandler
|
|
include Singleton
|
|
|
|
MASTER_ROLE_STATUS = "role:master".freeze
|
|
MASTER_LOADING_STATUS = "loading:1".freeze
|
|
MASTER_LOADED_STATUS = "loading:0".freeze
|
|
CONNECTION_TYPES = %w{normal pubsub}.each(&:freeze)
|
|
|
|
def initialize
|
|
@master = true
|
|
@running = false
|
|
@mutex = Mutex.new
|
|
@slave_config = DiscourseRedis.slave_config
|
|
@message_bus_keepalive_interval = MessageBus.keepalive_interval
|
|
end
|
|
|
|
def verify_master
|
|
synchronize { return if @thread && @thread.alive? }
|
|
|
|
@thread = Thread.new do
|
|
loop do
|
|
begin
|
|
thread = Thread.new { initiate_fallback_to_master }
|
|
thread.join
|
|
break if synchronize { @master }
|
|
sleep 5
|
|
ensure
|
|
thread.kill
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
def initiate_fallback_to_master
|
|
success = false
|
|
|
|
begin
|
|
redis_config = DiscourseRedis.config.dup
|
|
redis_config.delete(:connector)
|
|
master_client = ::Redis::Client.new(redis_config)
|
|
logger.warn "#{log_prefix}: Checking connection to master server..."
|
|
info = master_client.call([:info])
|
|
|
|
if info.include?(MASTER_LOADED_STATUS) && info.include?(MASTER_ROLE_STATUS)
|
|
begin
|
|
logger.warn "#{log_prefix}: Master server is active, killing all connections to slave..."
|
|
|
|
self.master = true
|
|
slave_client = ::Redis::Client.new(@slave_config)
|
|
|
|
CONNECTION_TYPES.each do |connection_type|
|
|
slave_client.call([:client, [:kill, 'type', connection_type]])
|
|
end
|
|
|
|
MessageBus.keepalive_interval = @message_bus_keepalive_interval
|
|
Discourse.clear_readonly!
|
|
Discourse.request_refresh!
|
|
success = true
|
|
ensure
|
|
slave_client&.disconnect
|
|
end
|
|
end
|
|
rescue => e
|
|
logger.warn "#{log_prefix}: Connection to Master server failed with '#{e.message}'"
|
|
ensure
|
|
master_client&.disconnect
|
|
end
|
|
|
|
success
|
|
end
|
|
|
|
def master
|
|
synchronize { @master }
|
|
end
|
|
|
|
def master=(args)
|
|
synchronize do
|
|
@master = args
|
|
|
|
# Disables MessageBus keepalive when Redis is in readonly mode
|
|
MessageBus.keepalive_interval = 0 if !@master
|
|
end
|
|
end
|
|
|
|
private
|
|
|
|
def synchronize
|
|
@mutex.synchronize { yield }
|
|
end
|
|
|
|
def logger
|
|
Rails.logger
|
|
end
|
|
|
|
def log_prefix
|
|
"#{self.class}"
|
|
end
|
|
end
|
|
|
|
class Connector < Redis::Client::Connector
|
|
def initialize(options)
|
|
super(options)
|
|
@slave_options = DiscourseRedis.slave_config(options)
|
|
@fallback_handler = DiscourseRedis::FallbackHandler.instance
|
|
end
|
|
|
|
def resolve(client = nil)
|
|
if !@fallback_handler.master
|
|
@fallback_handler.verify_master
|
|
return @slave_options
|
|
end
|
|
|
|
begin
|
|
options = @options.dup
|
|
options.delete(:connector)
|
|
client ||= Redis::Client.new(options)
|
|
|
|
loading = client.call([:info, :persistence]).include?(
|
|
DiscourseRedis::FallbackHandler::MASTER_LOADING_STATUS
|
|
)
|
|
|
|
loading ? @slave_options : @options
|
|
rescue Redis::ConnectionError, Redis::CannotConnectError, RuntimeError => ex
|
|
raise ex if ex.class == RuntimeError && ex.message != "Name or service not known"
|
|
@fallback_handler.master = false
|
|
@fallback_handler.verify_master
|
|
raise ex
|
|
ensure
|
|
client.disconnect
|
|
end
|
|
end
|
|
end
|
|
|
|
def self.raw_connection(config = nil)
|
|
config ||= self.config
|
|
Redis.new(config)
|
|
end
|
|
|
|
def self.config
|
|
GlobalSetting.redis_config
|
|
end
|
|
|
|
def self.slave_config(options = config)
|
|
options.dup.merge!(host: options[:slave_host], port: options[:slave_port])
|
|
end
|
|
|
|
def initialize(config = nil, namespace: true)
|
|
@config = config || DiscourseRedis.config
|
|
@redis = DiscourseRedis.raw_connection(@config)
|
|
@namespace = namespace
|
|
end
|
|
|
|
def self.fallback_handler
|
|
@fallback_handler ||= DiscourseRedis::FallbackHandler.instance
|
|
end
|
|
|
|
def without_namespace
|
|
# Only use this if you want to store and fetch data that's shared between sites
|
|
@redis
|
|
end
|
|
|
|
def self.ignore_readonly
|
|
yield
|
|
rescue Redis::CommandError => ex
|
|
if ex.message =~ /READONLY/
|
|
unless Discourse.recently_readonly? || Rails.env.test?
|
|
STDERR.puts "WARN: Redis is in a readonly state. Performed a noop"
|
|
end
|
|
|
|
fallback_handler.verify_master if !fallback_handler.master
|
|
Discourse.received_readonly!
|
|
nil
|
|
else
|
|
raise ex
|
|
end
|
|
end
|
|
|
|
# prefix the key with the namespace
|
|
def method_missing(meth, *args, &block)
|
|
if @redis.respond_to?(meth)
|
|
DiscourseRedis.ignore_readonly { @redis.public_send(meth, *args, &block) }
|
|
else
|
|
super
|
|
end
|
|
end
|
|
|
|
# Proxy key methods through, but prefix the keys with the namespace
|
|
[:append, :blpop, :brpop, :brpoplpush, :decr, :decrby, :exists, :expire, :expireat, :get, :getbit, :getrange, :getset,
|
|
:hdel, :hexists, :hget, :hgetall, :hincrby, :hincrbyfloat, :hkeys, :hlen, :hmget, :hmset, :hset, :hsetnx, :hvals, :incr,
|
|
:incrby, :incrbyfloat, :lindex, :linsert, :llen, :lpop, :lpush, :lpushx, :lrange, :lrem, :lset, :ltrim,
|
|
:mapped_hmset, :mapped_hmget, :mapped_mget, :mapped_mset, :mapped_msetnx, :move, :mset,
|
|
:msetnx, :persist, :pexpire, :pexpireat, :psetex, :pttl, :rename, :renamenx, :rpop, :rpoplpush, :rpush, :rpushx, :sadd, :scard,
|
|
:sdiff, :set, :setbit, :setex, :setnx, :setrange, :sinter, :sismember, :smembers, :sort, :spop, :srandmember, :srem, :strlen,
|
|
:sunion, :ttl, :type, :watch, :zadd, :zcard, :zcount, :zincrby, :zrange, :zrangebyscore, :zrank, :zrem, :zremrangebyrank,
|
|
:zremrangebyscore, :zrevrange, :zrevrangebyscore, :zrevrank, :zrangebyscore ].each do |m|
|
|
define_method m do |*args|
|
|
args[0] = "#{namespace}:#{args[0]}" if @namespace
|
|
DiscourseRedis.ignore_readonly { @redis.public_send(m, *args) }
|
|
end
|
|
end
|
|
|
|
def mget(*args)
|
|
args.map! { |a| "#{namespace}:#{a}" } if @namespace
|
|
DiscourseRedis.ignore_readonly { @redis.mget(*args) }
|
|
end
|
|
|
|
def del(k)
|
|
DiscourseRedis.ignore_readonly do
|
|
k = "#{namespace}:#{k}" if @namespace
|
|
@redis.del k
|
|
end
|
|
end
|
|
|
|
def scan_each(options = {}, &block)
|
|
DiscourseRedis.ignore_readonly do
|
|
match = options[:match].presence || '*'
|
|
|
|
options[:match] =
|
|
if @namespace
|
|
"#{namespace}:#{match}"
|
|
else
|
|
match
|
|
end
|
|
|
|
if block
|
|
@redis.scan_each(options) do |key|
|
|
key = remove_namespace(key) if @namespace
|
|
block.call(key)
|
|
end
|
|
else
|
|
@redis.scan_each(options).map do |key|
|
|
key = remove_namespace(key) if @namespace
|
|
key
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
def keys(pattern = nil)
|
|
DiscourseRedis.ignore_readonly do
|
|
pattern = pattern || '*'
|
|
pattern = "#{namespace}:#{pattern}" if @namespace
|
|
keys = @redis.keys(pattern)
|
|
|
|
if @namespace
|
|
len = namespace.length + 1
|
|
keys.map! { |k| k[len..-1] }
|
|
end
|
|
|
|
keys
|
|
end
|
|
end
|
|
|
|
def delete_prefixed(prefix)
|
|
DiscourseRedis.ignore_readonly do
|
|
keys("#{prefix}*").each { |k| $redis.del(k) }
|
|
end
|
|
end
|
|
|
|
def flushdb
|
|
DiscourseRedis.ignore_readonly do
|
|
keys.each { |k| del(k) }
|
|
end
|
|
end
|
|
|
|
def reconnect
|
|
@redis._client.reconnect
|
|
end
|
|
|
|
def namespace_key(key)
|
|
if @namespace
|
|
"#{namespace}:#{key}"
|
|
else
|
|
key
|
|
end
|
|
end
|
|
|
|
def namespace
|
|
RailsMultisite::ConnectionManagement.current_db
|
|
end
|
|
|
|
def self.namespace
|
|
Rails.logger.warn("DiscourseRedis.namespace is going to be deprecated, do not use it!")
|
|
RailsMultisite::ConnectionManagement.current_db
|
|
end
|
|
|
|
def self.new_redis_store
|
|
Cache.new
|
|
end
|
|
|
|
private
|
|
|
|
def remove_namespace(key)
|
|
key[(namespace.length + 1)..-1]
|
|
end
|
|
|
|
end
|