mirror of
https://github.com/discourse/discourse.git
synced 2024-11-25 09:42:07 +08:00
FIX: Redis fallback handler refactoring (#8771)
* DEV: Add a fake Mutex that for concurrency testing with Fibers * DEV: Support running in sleep order in concurrency tests * FIX: A separate FallbackHandler should be used for each redis pair This commit refactors the FallbackHandler and Connector: * There were two different ways to determine whether the redis master was up. There is now one way and it is the responsibility of the new RedisStatus class. * A background thread would be created whenever `verify_master` was called unless the thread already existed. The thread would periodically check the status of the redis master. However, checking that a thread is `alive?` is an ineffective way of determining whether it will continue to check the redis master in the future since the thread may be in the process of winding down. Now, this thread is created when the recorded master status goes from up to down. Since this thread runs the only part of the code that is able to bring the recorded status up again, we ensure that only one thread is probing the redis master at a time and that there is always a thread probing redis master when it is recorded as being down. * Each time the status of the redis master was checked periodically, it would spawn a new thread and immediately join on it. I assume this happened to isolate the check from the current execution, but since the join rethrows exceptions in the parent thread, this was not effective. * The logic for falling back was spread over the FallbackHandler and the Connector. The connector is now a dumb object that delegates responsibility for determining the status of redis to the FallbackHandler. * Previously, failing to connect to a master redis instance when it was not recorded as down would raise an exception. Now, this exception is passed to `Discourse.warn_exception` and the connection is made to the slave. This commit introduces the FallbackHandlers singleton: * It is responsible for holding the set of FallbackHandlers. * It adds callbacks to the fallback handlers for when a redis master comes up or goes down. Main redis and message bus redis may exist on different or the same redis hosts and so these callbacks may all exist on the same FallbackHandler or on separate ones. These objects are tested using fake concurrency provided by the Concurrency module: * An `around(:each)` hook is used to cause each test to run inside a Scenario so that the test body, mocking cleanup and `after(:each)` callbacks are run in a different Fiber. * Therefore, holting the execution of the Execution abruptly (so that the fibers aren't run to completion), prevents the mocking cleaning and `after(:each)` callbacks from running. I have tried to prevent this by recovering from all exceptions during an Execution. * FIX: Create frozen copies of passed in config where possible * FIX: extract start_reset method and remove method used by tests Co-authored-by: Daniel Waterworth <me@danielwaterworth.com>
This commit is contained in:
parent
1b3b0708c0
commit
4f677854d3
21
lib/concurrency.rb
Normal file
21
lib/concurrency.rb
Normal file
|
@ -0,0 +1,21 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
# This is the 'actually concurrent' counterpart to
|
||||
# Concurrency::Scenario::Execution from spec/support/concurrency.rb
|
||||
module Concurrency
|
||||
class ThreadedExecution
|
||||
def new_mutex
|
||||
Mutex.new
|
||||
end
|
||||
|
||||
def sleep(delay)
|
||||
super(delay)
|
||||
nil
|
||||
end
|
||||
|
||||
def spawn(&blk)
|
||||
Thread.new(&blk)
|
||||
nil
|
||||
end
|
||||
end
|
||||
end
|
|
@ -3,139 +3,248 @@
|
|||
#
|
||||
# A wrapper around redis that namespaces keys with the current site id
|
||||
#
|
||||
require_dependency 'cache'
|
||||
require_dependency 'concurrency'
|
||||
|
||||
class DiscourseRedis
|
||||
class FallbackHandler
|
||||
include Singleton
|
||||
|
||||
class RedisStatus
|
||||
MASTER_ROLE_STATUS = "role:master".freeze
|
||||
MASTER_LOADING_STATUS = "loading:1".freeze
|
||||
MASTER_LOADED_STATUS = "loading:0".freeze
|
||||
CONNECTION_TYPES = %w{normal pubsub}.each(&:freeze)
|
||||
|
||||
def initialize
|
||||
@master = true
|
||||
@running = false
|
||||
@mutex = Mutex.new
|
||||
@slave_config = DiscourseRedis.slave_config
|
||||
@message_bus_keepalive_interval = MessageBus.keepalive_interval
|
||||
def initialize(master_config, slave_config)
|
||||
master_config = master_config.dup.freeze unless master_config.frozen?
|
||||
slave_config = slave_config.dup.freeze unless slave_config.frozen?
|
||||
|
||||
@master_config = master_config
|
||||
@slave_config = slave_config
|
||||
end
|
||||
|
||||
def verify_master
|
||||
synchronize do
|
||||
return if @thread && @thread.alive?
|
||||
|
||||
@thread = Thread.new do
|
||||
loop do
|
||||
begin
|
||||
thread = Thread.new { initiate_fallback_to_master }
|
||||
thread.join
|
||||
break if synchronize { @master }
|
||||
sleep 5
|
||||
ensure
|
||||
thread.kill
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def initiate_fallback_to_master
|
||||
success = false
|
||||
def master_alive?
|
||||
master_client = connect(@master_config)
|
||||
|
||||
begin
|
||||
redis_config = DiscourseRedis.config.dup
|
||||
redis_config.delete(:connector)
|
||||
master_client = ::Redis::Client.new(redis_config)
|
||||
logger.warn "#{log_prefix}: Checking connection to master server..."
|
||||
info = master_client.call([:info])
|
||||
|
||||
if info.include?(MASTER_LOADED_STATUS) && info.include?(MASTER_ROLE_STATUS)
|
||||
begin
|
||||
logger.warn "#{log_prefix}: Master server is active, killing all connections to slave..."
|
||||
|
||||
self.master = true
|
||||
slave_client = ::Redis::Client.new(@slave_config)
|
||||
|
||||
CONNECTION_TYPES.each do |connection_type|
|
||||
slave_client.call([:client, [:kill, 'type', connection_type]])
|
||||
end
|
||||
|
||||
MessageBus.keepalive_interval = @message_bus_keepalive_interval
|
||||
Discourse.clear_readonly!
|
||||
Discourse.request_refresh!
|
||||
success = true
|
||||
ensure
|
||||
slave_client&.disconnect
|
||||
end
|
||||
end
|
||||
rescue => e
|
||||
logger.warn "#{log_prefix}: Connection to Master server failed with '#{e.message}'"
|
||||
rescue Redis::ConnectionError, Redis::CannotConnectError, RuntimeError => ex
|
||||
raise ex if ex.class == RuntimeError && ex.message != "Name or service not known"
|
||||
warn "Master not alive, error connecting"
|
||||
return false
|
||||
ensure
|
||||
master_client&.disconnect
|
||||
master_client.disconnect
|
||||
end
|
||||
|
||||
success
|
||||
unless info.include?(MASTER_LOADED_STATUS)
|
||||
warn "Master not alive, status is loading"
|
||||
return false
|
||||
end
|
||||
|
||||
unless info.include?(MASTER_ROLE_STATUS)
|
||||
warn "Master not alive, role != master"
|
||||
return false
|
||||
end
|
||||
|
||||
true
|
||||
end
|
||||
|
||||
def master
|
||||
synchronize { @master }
|
||||
end
|
||||
def fallback
|
||||
warn "Killing connections to slave..."
|
||||
|
||||
def master=(args)
|
||||
synchronize do
|
||||
@master = args
|
||||
slave_client = connect(@slave_config)
|
||||
|
||||
# Disables MessageBus keepalive when Redis is in readonly mode
|
||||
MessageBus.keepalive_interval = 0 if !@master
|
||||
begin
|
||||
CONNECTION_TYPES.each do |connection_type|
|
||||
slave_client.call([:client, [:kill, 'type', connection_type]])
|
||||
end
|
||||
rescue Redis::ConnectionError, Redis::CannotConnectError, RuntimeError => ex
|
||||
raise ex if ex.class == RuntimeError && ex.message != "Name or service not known"
|
||||
warn "Attempted a redis fallback, but connection to slave failed"
|
||||
ensure
|
||||
slave_client.disconnect
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def synchronize
|
||||
@mutex.synchronize { yield }
|
||||
end
|
||||
|
||||
def logger
|
||||
Rails.logger
|
||||
def connect(config)
|
||||
config = config.dup
|
||||
config.delete(:connector)
|
||||
::Redis::Client.new(config)
|
||||
end
|
||||
|
||||
def log_prefix
|
||||
"#{self.class}"
|
||||
@log_prefix ||= begin
|
||||
master_string = "#{@master_config[:host]}:#{@master_config[:port]}"
|
||||
slave_string = "#{@slave_config[:host]}:#{@slave_config[:port]}"
|
||||
"RedisStatus master=#{master_string} slave=#{slave_string}"
|
||||
end
|
||||
end
|
||||
|
||||
def warn(message)
|
||||
Rails.logger.warn "#{log_prefix}: #{message}"
|
||||
end
|
||||
end
|
||||
|
||||
class FallbackHandler
|
||||
def initialize(log_prefix, redis_status, execution)
|
||||
@log_prefix = log_prefix
|
||||
@redis_status = redis_status
|
||||
@mutex = execution.new_mutex
|
||||
@execution = execution
|
||||
@master = true
|
||||
@event_handlers = []
|
||||
end
|
||||
|
||||
def add_callbacks(handler)
|
||||
@mutex.synchronize do
|
||||
@event_handlers << handler
|
||||
end
|
||||
end
|
||||
|
||||
def start_reset
|
||||
@mutex.synchronize do
|
||||
if @master
|
||||
@master = false
|
||||
trigger(:down)
|
||||
true
|
||||
else
|
||||
false
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def use_master?
|
||||
master = @mutex.synchronize { @master }
|
||||
if !master
|
||||
false
|
||||
elsif safe_master_alive?
|
||||
true
|
||||
else
|
||||
if start_reset
|
||||
@execution.spawn do
|
||||
loop do
|
||||
@execution.sleep 5
|
||||
info "Checking connection to master"
|
||||
if safe_master_alive?
|
||||
@mutex.synchronize do
|
||||
@master = true
|
||||
@redis_status.fallback
|
||||
trigger(:up)
|
||||
end
|
||||
break
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
false
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
attr_reader :log_prefix
|
||||
|
||||
def trigger(event)
|
||||
@event_handlers.each do |handler|
|
||||
begin
|
||||
handler.public_send(event)
|
||||
rescue Exception => e
|
||||
Discourse.warn_exception(e, message: "Error running FallbackHandler callback")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def info(message)
|
||||
Rails.logger.info "#{log_prefix}: #{message}"
|
||||
end
|
||||
|
||||
def safe_master_alive?
|
||||
begin
|
||||
@redis_status.master_alive?
|
||||
rescue Exception => e
|
||||
Discourse.warn_exception(e, message: "Error running master_alive?")
|
||||
false
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class MessageBusFallbackCallbacks
|
||||
def down
|
||||
@keepalive_interval, MessageBus.keepalive_interval =
|
||||
MessageBus.keepalive_interval, 0
|
||||
end
|
||||
|
||||
def up
|
||||
MessageBus.keepalive_interval = @keepalive_interval
|
||||
end
|
||||
end
|
||||
|
||||
class MainRedisReadOnlyCallbacks
|
||||
def down
|
||||
end
|
||||
|
||||
def up
|
||||
Discourse.clear_readonly!
|
||||
Discourse.request_refresh!
|
||||
end
|
||||
end
|
||||
|
||||
class FallbackHandlers
|
||||
include Singleton
|
||||
|
||||
def initialize
|
||||
@mutex = Mutex.new
|
||||
@fallback_handlers = {}
|
||||
end
|
||||
|
||||
def handler_for(config)
|
||||
config = config.dup.freeze unless config.frozen?
|
||||
|
||||
@mutex.synchronize do
|
||||
@fallback_handlers[[config[:host], config[:port]]] ||= begin
|
||||
log_prefix = "FallbackHandler #{config[:host]}:#{config[:port]}"
|
||||
slave_config = DiscourseRedis.slave_config(config)
|
||||
redis_status = RedisStatus.new(config, slave_config)
|
||||
|
||||
handler =
|
||||
FallbackHandler.new(
|
||||
log_prefix,
|
||||
redis_status,
|
||||
Concurrency::ThreadedExecution.new
|
||||
)
|
||||
|
||||
if config == GlobalSetting.redis_config
|
||||
handler.add_callbacks(MainRedisReadOnlyCallbacks.new)
|
||||
end
|
||||
|
||||
if config == GlobalSetting.message_bus_redis_config
|
||||
handler.add_callbacks(MessageBusFallbackCallbacks.new)
|
||||
end
|
||||
|
||||
handler
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def self.handler_for(config)
|
||||
instance.handler_for(config)
|
||||
end
|
||||
end
|
||||
|
||||
class Connector < Redis::Client::Connector
|
||||
def initialize(options)
|
||||
options = options.dup.freeze unless options.frozen?
|
||||
|
||||
super(options)
|
||||
@slave_options = DiscourseRedis.slave_config(options)
|
||||
@fallback_handler = DiscourseRedis::FallbackHandler.instance
|
||||
@slave_options = DiscourseRedis.slave_config(options).freeze
|
||||
@fallback_handler = DiscourseRedis::FallbackHandlers.handler_for(options)
|
||||
end
|
||||
|
||||
def resolve(client = nil)
|
||||
if !@fallback_handler.master
|
||||
@fallback_handler.verify_master
|
||||
return @slave_options
|
||||
end
|
||||
|
||||
begin
|
||||
options = @options.dup
|
||||
options.delete(:connector)
|
||||
client ||= Redis::Client.new(options)
|
||||
|
||||
loading = client.call([:info, :persistence]).include?(
|
||||
DiscourseRedis::FallbackHandler::MASTER_LOADING_STATUS
|
||||
)
|
||||
|
||||
loading ? @slave_options : @options
|
||||
rescue Redis::ConnectionError, Redis::CannotConnectError, RuntimeError => ex
|
||||
raise ex if ex.class == RuntimeError && ex.message != "Name or service not known"
|
||||
@fallback_handler.master = false
|
||||
@fallback_handler.verify_master
|
||||
raise ex
|
||||
ensure
|
||||
client.disconnect
|
||||
def resolve
|
||||
if @fallback_handler.use_master?
|
||||
@options
|
||||
else
|
||||
@slave_options
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -159,10 +268,6 @@ class DiscourseRedis
|
|||
@namespace = namespace
|
||||
end
|
||||
|
||||
def self.fallback_handler
|
||||
@fallback_handler ||= DiscourseRedis::FallbackHandler.instance
|
||||
end
|
||||
|
||||
def without_namespace
|
||||
# Only use this if you want to store and fetch data that's shared between sites
|
||||
@redis
|
||||
|
@ -176,7 +281,6 @@ class DiscourseRedis
|
|||
STDERR.puts "WARN: Redis is in a readonly state. Performed a noop"
|
||||
end
|
||||
|
||||
fallback_handler.verify_master if !fallback_handler.master
|
||||
Discourse.received_redis_readonly!
|
||||
nil
|
||||
else
|
||||
|
@ -302,5 +406,4 @@ class DiscourseRedis
|
|||
def remove_namespace(key)
|
||||
key[(namespace.length + 1)..-1]
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -3,20 +3,84 @@
|
|||
require 'rails_helper'
|
||||
|
||||
describe DiscourseRedis do
|
||||
before do
|
||||
DiscourseRedis::FallbackHandlers.instance.instance_variable_set(:@fallback_handlers, {})
|
||||
end
|
||||
|
||||
let(:slave_host) { 'testhost' }
|
||||
let(:slave_port) { 1234 }
|
||||
|
||||
let(:config) do
|
||||
DiscourseRedis.config.dup.merge(slave_host: 'testhost', slave_port: 1234, connector: DiscourseRedis::Connector)
|
||||
GlobalSetting.redis_config.dup.merge(slave_host: 'testhost', slave_port: 1234, connector: DiscourseRedis::Connector)
|
||||
end
|
||||
|
||||
let(:fallback_handler) { DiscourseRedis::FallbackHandler.instance }
|
||||
let(:slave_config) { DiscourseRedis.slave_config(config) }
|
||||
|
||||
it "ignore_readonly returns nil from a pure exception" do
|
||||
result = DiscourseRedis.ignore_readonly { raise Redis::CommandError.new("READONLY") }
|
||||
expect(result).to eq(nil)
|
||||
end
|
||||
|
||||
let!(:master_conn) { mock('master') }
|
||||
|
||||
def self.use_fake_threads
|
||||
attr_reader :execution
|
||||
|
||||
around(:each) do |example|
|
||||
scenario =
|
||||
Concurrency::Scenario.new do |execution|
|
||||
@execution = execution
|
||||
example.run
|
||||
end
|
||||
|
||||
scenario.run(sleep_order: true, runs: 1)
|
||||
end
|
||||
|
||||
after(:each) do
|
||||
# Doing this here, as opposed to after example.run, ensures that it
|
||||
# happens before the mocha expectations are checked.
|
||||
execution.wait_done
|
||||
end
|
||||
end
|
||||
|
||||
def stop_after(time)
|
||||
execution.sleep(time)
|
||||
execution.stop_other_tasks
|
||||
end
|
||||
|
||||
def expect_master_info(conf = config)
|
||||
conf = conf.dup
|
||||
conf.delete(:connector)
|
||||
|
||||
Redis::Client.expects(:new)
|
||||
.with(conf)
|
||||
.returns(master_conn)
|
||||
|
||||
master_conn.expects(:disconnect)
|
||||
master_conn
|
||||
.expects(:call)
|
||||
.with([:info])
|
||||
end
|
||||
|
||||
def info_response(*values)
|
||||
values.map { |x| x.join(':') }.join("\r\n")
|
||||
end
|
||||
|
||||
def expect_fallback(config = slave_config)
|
||||
slave_conn = mock('slave')
|
||||
|
||||
config = config.dup
|
||||
config.delete(:connector)
|
||||
|
||||
Redis::Client.expects(:new)
|
||||
.with(config)
|
||||
.returns(slave_conn)
|
||||
|
||||
slave_conn.expects(:call).with([:client, [:kill, 'type', 'normal']])
|
||||
slave_conn.expects(:call).with([:client, [:kill, 'type', 'pubsub']])
|
||||
slave_conn.expects(:disconnect)
|
||||
end
|
||||
|
||||
describe 'redis commands' do
|
||||
let(:raw_redis) { Redis.new(DiscourseRedis.config) }
|
||||
|
||||
|
@ -97,150 +161,349 @@ describe DiscourseRedis do
|
|||
end
|
||||
end
|
||||
|
||||
context 'when redis connection is to a slave redis server' do
|
||||
it 'should check the status of the master server' do
|
||||
begin
|
||||
fallback_handler.master = false
|
||||
Discourse.redis.without_namespace.expects(:set).raises(Redis::CommandError.new("READONLY"))
|
||||
fallback_handler.expects(:verify_master).once
|
||||
Discourse.redis.set('test', '1')
|
||||
ensure
|
||||
fallback_handler.master = true
|
||||
Discourse.redis.del('test')
|
||||
describe DiscourseRedis::RedisStatus do
|
||||
let(:redis_status) { DiscourseRedis::RedisStatus.new(config, slave_config) }
|
||||
|
||||
context "#master_alive?" do
|
||||
it "returns false when the master's hostname cannot be resolved" do
|
||||
expect_master_info
|
||||
.raises(RuntimeError.new('Name or service not known'))
|
||||
|
||||
expect(redis_status.master_alive?).to eq(false)
|
||||
end
|
||||
|
||||
it "raises an error if a runtime error is raised" do
|
||||
error = RuntimeError.new('a random runtime error')
|
||||
expect_master_info.raises(error)
|
||||
|
||||
expect {
|
||||
redis_status.master_alive?
|
||||
}.to raise_error(error)
|
||||
end
|
||||
|
||||
it "returns false if the master is unavailable" do
|
||||
expect_master_info.raises(Redis::ConnectionError.new)
|
||||
|
||||
expect(redis_status.master_alive?).to eq(false)
|
||||
end
|
||||
|
||||
it "returns false if the master is loading" do
|
||||
expect_master_info
|
||||
.returns(info_response(['loading', '1'], ['role', 'master']))
|
||||
|
||||
expect(redis_status.master_alive?).to eq(false)
|
||||
end
|
||||
|
||||
it "returns false if the master is a slave" do
|
||||
expect_master_info
|
||||
.returns(info_response(['loading', '0'], ['role', 'slave']))
|
||||
|
||||
expect(redis_status.master_alive?).to eq(false)
|
||||
end
|
||||
|
||||
it "returns true when the master isn't loading and the role is master" do
|
||||
expect_master_info
|
||||
.returns(info_response(['loading', '0'], ['role', 'master']))
|
||||
|
||||
expect(redis_status.master_alive?).to eq(true)
|
||||
end
|
||||
end
|
||||
|
||||
context "#fallback" do
|
||||
it "instructs redis to kill client connections" do
|
||||
expect_fallback
|
||||
|
||||
redis_status.fallback
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe DiscourseRedis::Connector do
|
||||
let(:connector) { DiscourseRedis::Connector.new(config) }
|
||||
let(:fallback_handler) { mock('fallback_handler') }
|
||||
|
||||
after do
|
||||
fallback_handler.master = true
|
||||
before do
|
||||
DiscourseRedis::FallbackHandlers.stubs(:handler_for).returns(fallback_handler)
|
||||
end
|
||||
|
||||
it 'should return the master config when master is up' do
|
||||
fallback_handler.expects(:use_master?).returns(true)
|
||||
expect(connector.resolve).to eq(config)
|
||||
end
|
||||
|
||||
class BrokenRedis
|
||||
def initialize(error)
|
||||
@error = error
|
||||
end
|
||||
|
||||
def call(*args)
|
||||
raise @error
|
||||
end
|
||||
|
||||
def disconnect
|
||||
end
|
||||
end
|
||||
|
||||
it 'should return the slave config when master is down' do
|
||||
error = Redis::CannotConnectError
|
||||
|
||||
expect do
|
||||
connector.resolve(BrokenRedis.new(error))
|
||||
end.to raise_error(Redis::CannotConnectError)
|
||||
|
||||
config = connector.resolve
|
||||
|
||||
expect(config[:host]).to eq(slave_host)
|
||||
expect(config[:port]).to eq(slave_port)
|
||||
end
|
||||
|
||||
it "should return the slave config when master's hostname cannot be resolved" do
|
||||
error = RuntimeError.new('Name or service not known')
|
||||
|
||||
expect do
|
||||
connector.resolve(BrokenRedis.new(error))
|
||||
end.to raise_error(error)
|
||||
|
||||
expect(fallback_handler.master).to eq(false)
|
||||
|
||||
config = connector.resolve
|
||||
|
||||
expect(config[:host]).to eq(slave_host)
|
||||
expect(config[:port]).to eq(slave_port)
|
||||
expect(fallback_handler.master).to eq(false)
|
||||
end
|
||||
|
||||
it "should return the slave config when master is still loading data" do
|
||||
Redis::Client.any_instance
|
||||
.expects(:call)
|
||||
.with([:info, :persistence])
|
||||
.returns("
|
||||
someconfig:haha\r
|
||||
#{DiscourseRedis::FallbackHandler::MASTER_LOADING_STATUS}
|
||||
")
|
||||
|
||||
config = connector.resolve
|
||||
|
||||
expect(config[:host]).to eq(slave_host)
|
||||
expect(config[:port]).to eq(slave_port)
|
||||
end
|
||||
|
||||
it "should raise the right error" do
|
||||
error = RuntimeError.new('test')
|
||||
|
||||
2.times do
|
||||
expect { connector.resolve(BrokenRedis.new(error)) }
|
||||
.to raise_error(error)
|
||||
end
|
||||
fallback_handler.expects(:use_master?).returns(false)
|
||||
expect(connector.resolve).to eq(slave_config)
|
||||
end
|
||||
end
|
||||
|
||||
describe DiscourseRedis::FallbackHandler do
|
||||
before do
|
||||
@original_keepalive_interval = MessageBus.keepalive_interval
|
||||
end
|
||||
use_fake_threads
|
||||
|
||||
after do
|
||||
fallback_handler.master = true
|
||||
MessageBus.keepalive_interval = @original_keepalive_interval
|
||||
end
|
||||
let!(:redis_status) { mock }
|
||||
let!(:fallback_handler) { DiscourseRedis::FallbackHandler.new("", redis_status, execution) }
|
||||
|
||||
describe '#initiate_fallback_to_master' do
|
||||
it 'should return the right value if the master server is still down' do
|
||||
fallback_handler.master = false
|
||||
Redis::Client.any_instance.expects(:call).with([:info]).returns("Some other stuff")
|
||||
context "in the initial configuration" do
|
||||
it "tests that the master is alive and returns true if it is" do
|
||||
redis_status.expects(:master_alive?).returns(true)
|
||||
|
||||
expect(fallback_handler.initiate_fallback_to_master).to eq(false)
|
||||
expect(MessageBus.keepalive_interval).to eq(0)
|
||||
expect(fallback_handler.use_master?).to eq(true)
|
||||
end
|
||||
|
||||
it 'should fallback to the master server once it is up' do
|
||||
fallback_handler.master = false
|
||||
master_conn = mock('master')
|
||||
slave_conn = mock('slave')
|
||||
it "tests that the master is alive and returns false if it is not" do
|
||||
redis_status.expects(:master_alive?).returns(false)
|
||||
expect(fallback_handler.use_master?).to eq(false)
|
||||
|
||||
Redis::Client.expects(:new)
|
||||
.with(DiscourseRedis.config)
|
||||
.returns(master_conn)
|
||||
stop_after(1)
|
||||
end
|
||||
|
||||
Redis::Client.expects(:new)
|
||||
.with(DiscourseRedis.slave_config)
|
||||
.returns(slave_conn)
|
||||
it "tests that the master is alive and returns false if it raises an exception" do
|
||||
error = Exception.new
|
||||
redis_status.expects(:master_alive?).raises(error)
|
||||
|
||||
master_conn.expects(:call)
|
||||
.with([:info])
|
||||
.returns("
|
||||
#{DiscourseRedis::FallbackHandler::MASTER_ROLE_STATUS}\r\n
|
||||
#{DiscourseRedis::FallbackHandler::MASTER_LOADED_STATUS}
|
||||
")
|
||||
Discourse.expects(:warn_exception)
|
||||
.with(error, message: "Error running master_alive?")
|
||||
|
||||
DiscourseRedis::FallbackHandler::CONNECTION_TYPES.each do |connection_type|
|
||||
slave_conn.expects(:call).with(
|
||||
[:client, [:kill, 'type', connection_type]]
|
||||
)
|
||||
expect(fallback_handler.use_master?).to eq(false)
|
||||
|
||||
stop_after(1)
|
||||
end
|
||||
end
|
||||
|
||||
context "after master_alive? has returned false" do
|
||||
before do
|
||||
redis_status.expects(:master_alive?).returns(false)
|
||||
expect(fallback_handler.use_master?).to eq(false)
|
||||
end
|
||||
|
||||
it "responds with false to the next call to use_master? without consulting redis_status" do
|
||||
expect(fallback_handler.use_master?).to eq(false)
|
||||
|
||||
stop_after(1)
|
||||
end
|
||||
|
||||
it "checks that master is alive again after a timeout" do
|
||||
redis_status.expects(:master_alive?).returns(false)
|
||||
|
||||
stop_after(6)
|
||||
end
|
||||
|
||||
it "checks that master is alive again and checks again if an exception is raised" do
|
||||
error = Exception.new
|
||||
redis_status.expects(:master_alive?).raises(error)
|
||||
|
||||
Discourse.expects(:warn_exception)
|
||||
.with(error, message: "Error running master_alive?")
|
||||
|
||||
execution.sleep(6)
|
||||
|
||||
redis_status.expects(:master_alive?).returns(true)
|
||||
redis_status.expects(:fallback)
|
||||
|
||||
stop_after(5)
|
||||
end
|
||||
|
||||
it "triggers a fallback after master_alive? returns true" do
|
||||
redis_status.expects(:master_alive?).returns(true)
|
||||
redis_status.expects(:fallback)
|
||||
|
||||
stop_after(6)
|
||||
end
|
||||
|
||||
context "after falling back" do
|
||||
before do
|
||||
redis_status.expects(:master_alive?).returns(true)
|
||||
redis_status.expects(:fallback)
|
||||
|
||||
stop_after(6)
|
||||
end
|
||||
|
||||
master_conn.expects(:disconnect)
|
||||
slave_conn.expects(:disconnect)
|
||||
it "tests that the master is alive and returns true if it is" do
|
||||
redis_status.expects(:master_alive?).returns(true)
|
||||
|
||||
expect(fallback_handler.initiate_fallback_to_master).to eq(true)
|
||||
expect(fallback_handler.master).to eq(true)
|
||||
expect(Discourse.recently_readonly?).to eq(false)
|
||||
expect(MessageBus.keepalive_interval).to eq(-1)
|
||||
expect(fallback_handler.use_master?).to eq(true)
|
||||
end
|
||||
|
||||
it "tests that the master is alive and returns false if it is not" do
|
||||
redis_status.expects(:master_alive?).returns(false)
|
||||
expect(fallback_handler.use_master?).to eq(false)
|
||||
|
||||
stop_after(1)
|
||||
end
|
||||
|
||||
it "tests that the master is alive and returns false if it raises an exception" do
|
||||
error = Exception.new
|
||||
redis_status.expects(:master_alive?).raises(error)
|
||||
|
||||
Discourse.expects(:warn_exception)
|
||||
.with(error, message: "Error running master_alive?")
|
||||
|
||||
expect(fallback_handler.use_master?).to eq(false)
|
||||
|
||||
stop_after(1)
|
||||
end
|
||||
|
||||
it "doesn't do anything to redis_status for a really long time" do
|
||||
stop_after(1e9)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context "when message bus and main are on the same host" do
|
||||
use_fake_threads
|
||||
|
||||
before do
|
||||
# Since config is based on GlobalSetting, we need to fetch it before
|
||||
# stubbing
|
||||
conf = config
|
||||
|
||||
GlobalSetting.stubs(:redis_config).returns(conf)
|
||||
GlobalSetting.stubs(:message_bus_redis_config).returns(conf)
|
||||
|
||||
Concurrency::ThreadedExecution.stubs(:new).returns(execution)
|
||||
end
|
||||
|
||||
context "when the redis master goes down" do
|
||||
it "sets the message bus keepalive interval to 0" do
|
||||
expect_master_info
|
||||
.raises(Redis::ConnectionError.new)
|
||||
|
||||
MessageBus.expects(:keepalive_interval=).with(0)
|
||||
|
||||
DiscourseRedis::Connector.new(config).resolve
|
||||
|
||||
execution.stop_other_tasks
|
||||
end
|
||||
end
|
||||
|
||||
context "when the redis master comes back up" do
|
||||
before do
|
||||
MessageBus.keepalive_interval = 60
|
||||
|
||||
expect_master_info
|
||||
.raises(Redis::ConnectionError.new)
|
||||
|
||||
DiscourseRedis::Connector.new(config).resolve
|
||||
|
||||
expect_master_info
|
||||
.returns(info_response(['loading', '0'], ['role', 'master']))
|
||||
|
||||
expect_fallback
|
||||
end
|
||||
|
||||
it "sets the message bus keepalive interval to its original value" do
|
||||
MessageBus.expects(:keepalive_interval=).with(60)
|
||||
end
|
||||
|
||||
it "calls clear_readonly! and request_refresh! on Discourse" do
|
||||
Discourse.expects(:clear_readonly!)
|
||||
Discourse.expects(:request_refresh!)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context "when message bus and main are on different hosts" do
|
||||
use_fake_threads
|
||||
|
||||
before do
|
||||
# Since config is based on GlobalSetting, we need to fetch it before stubbing
|
||||
conf = config
|
||||
|
||||
GlobalSetting.stubs(:redis_config).returns(conf)
|
||||
|
||||
message_bus_config = conf.dup
|
||||
message_bus_config[:port] = message_bus_config[:port].to_i + 1
|
||||
message_bus_config[:slave_port] = message_bus_config[:slave_port].to_i + 1
|
||||
|
||||
GlobalSetting.stubs(:message_bus_redis_config).returns(message_bus_config)
|
||||
|
||||
Concurrency::ThreadedExecution.stubs(:new).returns(execution)
|
||||
end
|
||||
|
||||
let(:message_bus_master_config) {
|
||||
GlobalSetting.message_bus_redis_config
|
||||
}
|
||||
|
||||
context "when the message bus master goes down" do
|
||||
before do
|
||||
expect_master_info(message_bus_master_config)
|
||||
.raises(Redis::ConnectionError.new)
|
||||
end
|
||||
|
||||
it "sets the message bus keepalive interval to 0" do
|
||||
MessageBus.expects(:keepalive_interval=).with(0)
|
||||
|
||||
DiscourseRedis::Connector.new(message_bus_master_config).resolve
|
||||
|
||||
execution.stop_other_tasks
|
||||
end
|
||||
|
||||
it "does not call clear_readonly! or request_refresh! on Discourse" do
|
||||
Discourse.expects(:clear_readonly!).never
|
||||
Discourse.expects(:request_refresh!).never
|
||||
|
||||
DiscourseRedis::Connector.new(message_bus_master_config).resolve
|
||||
|
||||
execution.stop_other_tasks
|
||||
end
|
||||
end
|
||||
|
||||
context "when the message bus master comes back up" do
|
||||
before do
|
||||
MessageBus.keepalive_interval = 60
|
||||
|
||||
expect_master_info(message_bus_master_config)
|
||||
.raises(Redis::ConnectionError.new)
|
||||
|
||||
DiscourseRedis::Connector.new(message_bus_master_config).resolve
|
||||
|
||||
expect_master_info(message_bus_master_config)
|
||||
.returns(info_response(['loading', '0'], ['role', 'master']))
|
||||
|
||||
expect_fallback(DiscourseRedis.slave_config(message_bus_master_config))
|
||||
end
|
||||
|
||||
it "sets the message bus keepalive interval to its original value" do
|
||||
MessageBus.expects(:keepalive_interval=).with(60)
|
||||
end
|
||||
end
|
||||
|
||||
context "when the main master goes down" do
|
||||
before do
|
||||
expect_master_info
|
||||
.raises(Redis::ConnectionError.new)
|
||||
end
|
||||
|
||||
it "does not change the message bus keepalive interval" do
|
||||
MessageBus.expects(:keepalive_interval=).never
|
||||
|
||||
DiscourseRedis::Connector.new(config).resolve
|
||||
|
||||
execution.stop_other_tasks
|
||||
end
|
||||
end
|
||||
|
||||
context "when the main master comes back up" do
|
||||
before do
|
||||
expect_master_info
|
||||
.raises(Redis::ConnectionError.new)
|
||||
|
||||
DiscourseRedis::Connector.new(config).resolve
|
||||
|
||||
expect_master_info
|
||||
.returns(info_response(['loading', '0'], ['role', 'master']))
|
||||
|
||||
expect_fallback
|
||||
end
|
||||
|
||||
it "does not change the message bus keepalive interval" do
|
||||
MessageBus.expects(:keepalive_interval=).never
|
||||
end
|
||||
|
||||
it "calls clear_readonly! and request_refresh! on Discourse" do
|
||||
Discourse.expects(:clear_readonly!)
|
||||
Discourse.expects(:request_refresh!)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -30,7 +30,7 @@ module Concurrency
|
|||
end
|
||||
|
||||
def choose_with_weights(*options)
|
||||
choose(options.map(&:first))
|
||||
choose(*options.map(&:first))
|
||||
end
|
||||
|
||||
def dead_end
|
||||
|
@ -147,10 +147,11 @@ module Concurrency
|
|||
def initialize(path)
|
||||
@path = path
|
||||
@tasks = []
|
||||
@time = 0
|
||||
end
|
||||
|
||||
def yield
|
||||
Fiber.yield
|
||||
sleep(0)
|
||||
end
|
||||
|
||||
def choose(*options)
|
||||
|
@ -161,30 +162,86 @@ module Concurrency
|
|||
@path.choose_with_weights(*options)
|
||||
end
|
||||
|
||||
def spawn(&blk)
|
||||
@tasks << Fiber.new(&blk)
|
||||
def stop_other_tasks
|
||||
@tasts = @tasks.select! { |task| task[:fiber] == Fiber.current }
|
||||
end
|
||||
|
||||
def run
|
||||
def sleep(length)
|
||||
Fiber.yield(@time + length)
|
||||
end
|
||||
|
||||
def start_root(&blk)
|
||||
descriptor = {
|
||||
fiber: Fiber.new(&blk),
|
||||
run_at: 0
|
||||
}
|
||||
|
||||
@tasks << descriptor
|
||||
end
|
||||
|
||||
def spawn(&blk)
|
||||
descriptor = {
|
||||
fiber: Fiber.new(&blk),
|
||||
run_at: @time
|
||||
}
|
||||
|
||||
@tasks << descriptor
|
||||
|
||||
self.yield
|
||||
end
|
||||
|
||||
def run(sleep_order: false)
|
||||
until @tasks.empty?
|
||||
task = @path.choose(*@tasks)
|
||||
task.resume
|
||||
unless task.alive?
|
||||
@tasks.delete(task)
|
||||
descriptor =
|
||||
if sleep_order
|
||||
@tasks.sort_by! { |x| x[:run_at] }
|
||||
run_at = @tasks.first[:run_at]
|
||||
@path.choose(*@tasks.take_while { |x| x[:run_at] == run_at })
|
||||
else
|
||||
@path.choose(*@tasks)
|
||||
end
|
||||
|
||||
@time = [@time, descriptor[:run_at]].max
|
||||
fiber = descriptor[:fiber]
|
||||
|
||||
begin
|
||||
run_at = fiber.resume
|
||||
rescue Exception
|
||||
end
|
||||
|
||||
if fiber.alive?
|
||||
descriptor[:run_at] = run_at
|
||||
else
|
||||
@tasks.delete(descriptor)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def wait_done
|
||||
until @tasks.size == 1
|
||||
self.sleep(1e9)
|
||||
end
|
||||
end
|
||||
|
||||
def new_mutex
|
||||
Mutex.new(self)
|
||||
end
|
||||
end
|
||||
|
||||
def run_with_path(path)
|
||||
def run_with_path(path, sleep_order: false)
|
||||
execution = Execution.new(path)
|
||||
result = @blk.call(execution)
|
||||
execution.run
|
||||
result = {}
|
||||
execution.start_root {
|
||||
result[:value] = @blk.call(execution)
|
||||
}
|
||||
execution.run(sleep_order: sleep_order)
|
||||
result
|
||||
end
|
||||
|
||||
def run(**opts)
|
||||
Logic.run(**opts, &method(:run_with_path))
|
||||
def run(sleep_order: false, **opts)
|
||||
Logic.run(**opts) do |path|
|
||||
run_with_path(path, sleep_order: sleep_order)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -250,4 +307,45 @@ module Concurrency
|
|||
result
|
||||
end
|
||||
end
|
||||
|
||||
class Mutex
|
||||
def initialize(execution)
|
||||
@execution = execution
|
||||
@locked_by = nil
|
||||
end
|
||||
|
||||
def lock
|
||||
@execution.yield
|
||||
|
||||
fiber = Fiber.current
|
||||
while true
|
||||
if @locked_by.nil?
|
||||
@locked_by = fiber
|
||||
return
|
||||
elsif @locked_by == fiber
|
||||
raise ThreadError, "deadlock; recursive locking"
|
||||
else
|
||||
@execution.yield
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def unlock
|
||||
@execution.yield
|
||||
|
||||
if @locked_by != Fiber.current
|
||||
raise ThreadError, "Attempt to unlock a mutex which is locked by another thread"
|
||||
end
|
||||
@locked_by = nil
|
||||
end
|
||||
|
||||
def synchronize
|
||||
lock
|
||||
begin
|
||||
yield
|
||||
ensure
|
||||
unlock
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue
Block a user