diff --git a/lib/concurrency.rb b/lib/concurrency.rb new file mode 100644 index 00000000000..3bca0031055 --- /dev/null +++ b/lib/concurrency.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +# This is the 'actually concurrent' counterpart to +# Concurrency::Scenario::Execution from spec/support/concurrency.rb +module Concurrency + class ThreadedExecution + def new_mutex + Mutex.new + end + + def sleep(delay) + super(delay) + nil + end + + def spawn(&blk) + Thread.new(&blk) + nil + end + end +end diff --git a/lib/discourse_redis.rb b/lib/discourse_redis.rb index 728e244db74..912aea25843 100644 --- a/lib/discourse_redis.rb +++ b/lib/discourse_redis.rb @@ -3,139 +3,248 @@ # # A wrapper around redis that namespaces keys with the current site id # +require_dependency 'cache' +require_dependency 'concurrency' class DiscourseRedis - class FallbackHandler - include Singleton - + class RedisStatus MASTER_ROLE_STATUS = "role:master".freeze - MASTER_LOADING_STATUS = "loading:1".freeze MASTER_LOADED_STATUS = "loading:0".freeze CONNECTION_TYPES = %w{normal pubsub}.each(&:freeze) - def initialize - @master = true - @running = false - @mutex = Mutex.new - @slave_config = DiscourseRedis.slave_config - @message_bus_keepalive_interval = MessageBus.keepalive_interval + def initialize(master_config, slave_config) + master_config = master_config.dup.freeze unless master_config.frozen? + slave_config = slave_config.dup.freeze unless slave_config.frozen? + + @master_config = master_config + @slave_config = slave_config end - def verify_master - synchronize do - return if @thread && @thread.alive? - - @thread = Thread.new do - loop do - begin - thread = Thread.new { initiate_fallback_to_master } - thread.join - break if synchronize { @master } - sleep 5 - ensure - thread.kill - end - end - end - end - end - - def initiate_fallback_to_master - success = false + def master_alive? + master_client = connect(@master_config) begin - redis_config = DiscourseRedis.config.dup - redis_config.delete(:connector) - master_client = ::Redis::Client.new(redis_config) - logger.warn "#{log_prefix}: Checking connection to master server..." info = master_client.call([:info]) - - if info.include?(MASTER_LOADED_STATUS) && info.include?(MASTER_ROLE_STATUS) - begin - logger.warn "#{log_prefix}: Master server is active, killing all connections to slave..." - - self.master = true - slave_client = ::Redis::Client.new(@slave_config) - - CONNECTION_TYPES.each do |connection_type| - slave_client.call([:client, [:kill, 'type', connection_type]]) - end - - MessageBus.keepalive_interval = @message_bus_keepalive_interval - Discourse.clear_readonly! - Discourse.request_refresh! - success = true - ensure - slave_client&.disconnect - end - end - rescue => e - logger.warn "#{log_prefix}: Connection to Master server failed with '#{e.message}'" + rescue Redis::ConnectionError, Redis::CannotConnectError, RuntimeError => ex + raise ex if ex.class == RuntimeError && ex.message != "Name or service not known" + warn "Master not alive, error connecting" + return false ensure - master_client&.disconnect + master_client.disconnect end - success + unless info.include?(MASTER_LOADED_STATUS) + warn "Master not alive, status is loading" + return false + end + + unless info.include?(MASTER_ROLE_STATUS) + warn "Master not alive, role != master" + return false + end + + true end - def master - synchronize { @master } - end + def fallback + warn "Killing connections to slave..." - def master=(args) - synchronize do - @master = args + slave_client = connect(@slave_config) - # Disables MessageBus keepalive when Redis is in readonly mode - MessageBus.keepalive_interval = 0 if !@master + begin + CONNECTION_TYPES.each do |connection_type| + slave_client.call([:client, [:kill, 'type', connection_type]]) + end + rescue Redis::ConnectionError, Redis::CannotConnectError, RuntimeError => ex + raise ex if ex.class == RuntimeError && ex.message != "Name or service not known" + warn "Attempted a redis fallback, but connection to slave failed" + ensure + slave_client.disconnect end end private - def synchronize - @mutex.synchronize { yield } - end - - def logger - Rails.logger + def connect(config) + config = config.dup + config.delete(:connector) + ::Redis::Client.new(config) end def log_prefix - "#{self.class}" + @log_prefix ||= begin + master_string = "#{@master_config[:host]}:#{@master_config[:port]}" + slave_string = "#{@slave_config[:host]}:#{@slave_config[:port]}" + "RedisStatus master=#{master_string} slave=#{slave_string}" + end + end + + def warn(message) + Rails.logger.warn "#{log_prefix}: #{message}" + end + end + + class FallbackHandler + def initialize(log_prefix, redis_status, execution) + @log_prefix = log_prefix + @redis_status = redis_status + @mutex = execution.new_mutex + @execution = execution + @master = true + @event_handlers = [] + end + + def add_callbacks(handler) + @mutex.synchronize do + @event_handlers << handler + end + end + + def start_reset + @mutex.synchronize do + if @master + @master = false + trigger(:down) + true + else + false + end + end + end + + def use_master? + master = @mutex.synchronize { @master } + if !master + false + elsif safe_master_alive? + true + else + if start_reset + @execution.spawn do + loop do + @execution.sleep 5 + info "Checking connection to master" + if safe_master_alive? + @mutex.synchronize do + @master = true + @redis_status.fallback + trigger(:up) + end + break + end + end + end + end + + false + end + end + + private + + attr_reader :log_prefix + + def trigger(event) + @event_handlers.each do |handler| + begin + handler.public_send(event) + rescue Exception => e + Discourse.warn_exception(e, message: "Error running FallbackHandler callback") + end + end + end + + def info(message) + Rails.logger.info "#{log_prefix}: #{message}" + end + + def safe_master_alive? + begin + @redis_status.master_alive? + rescue Exception => e + Discourse.warn_exception(e, message: "Error running master_alive?") + false + end + end + end + + class MessageBusFallbackCallbacks + def down + @keepalive_interval, MessageBus.keepalive_interval = + MessageBus.keepalive_interval, 0 + end + + def up + MessageBus.keepalive_interval = @keepalive_interval + end + end + + class MainRedisReadOnlyCallbacks + def down + end + + def up + Discourse.clear_readonly! + Discourse.request_refresh! + end + end + + class FallbackHandlers + include Singleton + + def initialize + @mutex = Mutex.new + @fallback_handlers = {} + end + + def handler_for(config) + config = config.dup.freeze unless config.frozen? + + @mutex.synchronize do + @fallback_handlers[[config[:host], config[:port]]] ||= begin + log_prefix = "FallbackHandler #{config[:host]}:#{config[:port]}" + slave_config = DiscourseRedis.slave_config(config) + redis_status = RedisStatus.new(config, slave_config) + + handler = + FallbackHandler.new( + log_prefix, + redis_status, + Concurrency::ThreadedExecution.new + ) + + if config == GlobalSetting.redis_config + handler.add_callbacks(MainRedisReadOnlyCallbacks.new) + end + + if config == GlobalSetting.message_bus_redis_config + handler.add_callbacks(MessageBusFallbackCallbacks.new) + end + + handler + end + end + end + + def self.handler_for(config) + instance.handler_for(config) end end class Connector < Redis::Client::Connector def initialize(options) + options = options.dup.freeze unless options.frozen? + super(options) - @slave_options = DiscourseRedis.slave_config(options) - @fallback_handler = DiscourseRedis::FallbackHandler.instance + @slave_options = DiscourseRedis.slave_config(options).freeze + @fallback_handler = DiscourseRedis::FallbackHandlers.handler_for(options) end - def resolve(client = nil) - if !@fallback_handler.master - @fallback_handler.verify_master - return @slave_options - end - - begin - options = @options.dup - options.delete(:connector) - client ||= Redis::Client.new(options) - - loading = client.call([:info, :persistence]).include?( - DiscourseRedis::FallbackHandler::MASTER_LOADING_STATUS - ) - - loading ? @slave_options : @options - rescue Redis::ConnectionError, Redis::CannotConnectError, RuntimeError => ex - raise ex if ex.class == RuntimeError && ex.message != "Name or service not known" - @fallback_handler.master = false - @fallback_handler.verify_master - raise ex - ensure - client.disconnect + def resolve + if @fallback_handler.use_master? + @options + else + @slave_options end end end @@ -159,10 +268,6 @@ class DiscourseRedis @namespace = namespace end - def self.fallback_handler - @fallback_handler ||= DiscourseRedis::FallbackHandler.instance - end - def without_namespace # Only use this if you want to store and fetch data that's shared between sites @redis @@ -176,7 +281,6 @@ class DiscourseRedis STDERR.puts "WARN: Redis is in a readonly state. Performed a noop" end - fallback_handler.verify_master if !fallback_handler.master Discourse.received_redis_readonly! nil else @@ -302,5 +406,4 @@ class DiscourseRedis def remove_namespace(key) key[(namespace.length + 1)..-1] end - end diff --git a/spec/components/discourse_redis_spec.rb b/spec/components/discourse_redis_spec.rb index 6d4af1d8174..d769afdebfa 100644 --- a/spec/components/discourse_redis_spec.rb +++ b/spec/components/discourse_redis_spec.rb @@ -3,20 +3,84 @@ require 'rails_helper' describe DiscourseRedis do + before do + DiscourseRedis::FallbackHandlers.instance.instance_variable_set(:@fallback_handlers, {}) + end + let(:slave_host) { 'testhost' } let(:slave_port) { 1234 } let(:config) do - DiscourseRedis.config.dup.merge(slave_host: 'testhost', slave_port: 1234, connector: DiscourseRedis::Connector) + GlobalSetting.redis_config.dup.merge(slave_host: 'testhost', slave_port: 1234, connector: DiscourseRedis::Connector) end - let(:fallback_handler) { DiscourseRedis::FallbackHandler.instance } + let(:slave_config) { DiscourseRedis.slave_config(config) } it "ignore_readonly returns nil from a pure exception" do result = DiscourseRedis.ignore_readonly { raise Redis::CommandError.new("READONLY") } expect(result).to eq(nil) end + let!(:master_conn) { mock('master') } + + def self.use_fake_threads + attr_reader :execution + + around(:each) do |example| + scenario = + Concurrency::Scenario.new do |execution| + @execution = execution + example.run + end + + scenario.run(sleep_order: true, runs: 1) + end + + after(:each) do + # Doing this here, as opposed to after example.run, ensures that it + # happens before the mocha expectations are checked. + execution.wait_done + end + end + + def stop_after(time) + execution.sleep(time) + execution.stop_other_tasks + end + + def expect_master_info(conf = config) + conf = conf.dup + conf.delete(:connector) + + Redis::Client.expects(:new) + .with(conf) + .returns(master_conn) + + master_conn.expects(:disconnect) + master_conn + .expects(:call) + .with([:info]) + end + + def info_response(*values) + values.map { |x| x.join(':') }.join("\r\n") + end + + def expect_fallback(config = slave_config) + slave_conn = mock('slave') + + config = config.dup + config.delete(:connector) + + Redis::Client.expects(:new) + .with(config) + .returns(slave_conn) + + slave_conn.expects(:call).with([:client, [:kill, 'type', 'normal']]) + slave_conn.expects(:call).with([:client, [:kill, 'type', 'pubsub']]) + slave_conn.expects(:disconnect) + end + describe 'redis commands' do let(:raw_redis) { Redis.new(DiscourseRedis.config) } @@ -97,150 +161,349 @@ describe DiscourseRedis do end end - context 'when redis connection is to a slave redis server' do - it 'should check the status of the master server' do - begin - fallback_handler.master = false - Discourse.redis.without_namespace.expects(:set).raises(Redis::CommandError.new("READONLY")) - fallback_handler.expects(:verify_master).once - Discourse.redis.set('test', '1') - ensure - fallback_handler.master = true - Discourse.redis.del('test') + describe DiscourseRedis::RedisStatus do + let(:redis_status) { DiscourseRedis::RedisStatus.new(config, slave_config) } + + context "#master_alive?" do + it "returns false when the master's hostname cannot be resolved" do + expect_master_info + .raises(RuntimeError.new('Name or service not known')) + + expect(redis_status.master_alive?).to eq(false) + end + + it "raises an error if a runtime error is raised" do + error = RuntimeError.new('a random runtime error') + expect_master_info.raises(error) + + expect { + redis_status.master_alive? + }.to raise_error(error) + end + + it "returns false if the master is unavailable" do + expect_master_info.raises(Redis::ConnectionError.new) + + expect(redis_status.master_alive?).to eq(false) + end + + it "returns false if the master is loading" do + expect_master_info + .returns(info_response(['loading', '1'], ['role', 'master'])) + + expect(redis_status.master_alive?).to eq(false) + end + + it "returns false if the master is a slave" do + expect_master_info + .returns(info_response(['loading', '0'], ['role', 'slave'])) + + expect(redis_status.master_alive?).to eq(false) + end + + it "returns true when the master isn't loading and the role is master" do + expect_master_info + .returns(info_response(['loading', '0'], ['role', 'master'])) + + expect(redis_status.master_alive?).to eq(true) + end + end + + context "#fallback" do + it "instructs redis to kill client connections" do + expect_fallback + + redis_status.fallback end end end describe DiscourseRedis::Connector do let(:connector) { DiscourseRedis::Connector.new(config) } + let(:fallback_handler) { mock('fallback_handler') } - after do - fallback_handler.master = true + before do + DiscourseRedis::FallbackHandlers.stubs(:handler_for).returns(fallback_handler) end it 'should return the master config when master is up' do + fallback_handler.expects(:use_master?).returns(true) expect(connector.resolve).to eq(config) end - class BrokenRedis - def initialize(error) - @error = error - end - - def call(*args) - raise @error - end - - def disconnect - end - end - it 'should return the slave config when master is down' do - error = Redis::CannotConnectError - - expect do - connector.resolve(BrokenRedis.new(error)) - end.to raise_error(Redis::CannotConnectError) - - config = connector.resolve - - expect(config[:host]).to eq(slave_host) - expect(config[:port]).to eq(slave_port) - end - - it "should return the slave config when master's hostname cannot be resolved" do - error = RuntimeError.new('Name or service not known') - - expect do - connector.resolve(BrokenRedis.new(error)) - end.to raise_error(error) - - expect(fallback_handler.master).to eq(false) - - config = connector.resolve - - expect(config[:host]).to eq(slave_host) - expect(config[:port]).to eq(slave_port) - expect(fallback_handler.master).to eq(false) - end - - it "should return the slave config when master is still loading data" do - Redis::Client.any_instance - .expects(:call) - .with([:info, :persistence]) - .returns(" - someconfig:haha\r - #{DiscourseRedis::FallbackHandler::MASTER_LOADING_STATUS} - ") - - config = connector.resolve - - expect(config[:host]).to eq(slave_host) - expect(config[:port]).to eq(slave_port) - end - - it "should raise the right error" do - error = RuntimeError.new('test') - - 2.times do - expect { connector.resolve(BrokenRedis.new(error)) } - .to raise_error(error) - end + fallback_handler.expects(:use_master?).returns(false) + expect(connector.resolve).to eq(slave_config) end end describe DiscourseRedis::FallbackHandler do - before do - @original_keepalive_interval = MessageBus.keepalive_interval - end + use_fake_threads - after do - fallback_handler.master = true - MessageBus.keepalive_interval = @original_keepalive_interval - end + let!(:redis_status) { mock } + let!(:fallback_handler) { DiscourseRedis::FallbackHandler.new("", redis_status, execution) } - describe '#initiate_fallback_to_master' do - it 'should return the right value if the master server is still down' do - fallback_handler.master = false - Redis::Client.any_instance.expects(:call).with([:info]).returns("Some other stuff") + context "in the initial configuration" do + it "tests that the master is alive and returns true if it is" do + redis_status.expects(:master_alive?).returns(true) - expect(fallback_handler.initiate_fallback_to_master).to eq(false) - expect(MessageBus.keepalive_interval).to eq(0) + expect(fallback_handler.use_master?).to eq(true) end - it 'should fallback to the master server once it is up' do - fallback_handler.master = false - master_conn = mock('master') - slave_conn = mock('slave') + it "tests that the master is alive and returns false if it is not" do + redis_status.expects(:master_alive?).returns(false) + expect(fallback_handler.use_master?).to eq(false) - Redis::Client.expects(:new) - .with(DiscourseRedis.config) - .returns(master_conn) + stop_after(1) + end - Redis::Client.expects(:new) - .with(DiscourseRedis.slave_config) - .returns(slave_conn) + it "tests that the master is alive and returns false if it raises an exception" do + error = Exception.new + redis_status.expects(:master_alive?).raises(error) - master_conn.expects(:call) - .with([:info]) - .returns(" - #{DiscourseRedis::FallbackHandler::MASTER_ROLE_STATUS}\r\n - #{DiscourseRedis::FallbackHandler::MASTER_LOADED_STATUS} - ") + Discourse.expects(:warn_exception) + .with(error, message: "Error running master_alive?") - DiscourseRedis::FallbackHandler::CONNECTION_TYPES.each do |connection_type| - slave_conn.expects(:call).with( - [:client, [:kill, 'type', connection_type]] - ) + expect(fallback_handler.use_master?).to eq(false) + + stop_after(1) + end + end + + context "after master_alive? has returned false" do + before do + redis_status.expects(:master_alive?).returns(false) + expect(fallback_handler.use_master?).to eq(false) + end + + it "responds with false to the next call to use_master? without consulting redis_status" do + expect(fallback_handler.use_master?).to eq(false) + + stop_after(1) + end + + it "checks that master is alive again after a timeout" do + redis_status.expects(:master_alive?).returns(false) + + stop_after(6) + end + + it "checks that master is alive again and checks again if an exception is raised" do + error = Exception.new + redis_status.expects(:master_alive?).raises(error) + + Discourse.expects(:warn_exception) + .with(error, message: "Error running master_alive?") + + execution.sleep(6) + + redis_status.expects(:master_alive?).returns(true) + redis_status.expects(:fallback) + + stop_after(5) + end + + it "triggers a fallback after master_alive? returns true" do + redis_status.expects(:master_alive?).returns(true) + redis_status.expects(:fallback) + + stop_after(6) + end + + context "after falling back" do + before do + redis_status.expects(:master_alive?).returns(true) + redis_status.expects(:fallback) + + stop_after(6) end - master_conn.expects(:disconnect) - slave_conn.expects(:disconnect) + it "tests that the master is alive and returns true if it is" do + redis_status.expects(:master_alive?).returns(true) - expect(fallback_handler.initiate_fallback_to_master).to eq(true) - expect(fallback_handler.master).to eq(true) - expect(Discourse.recently_readonly?).to eq(false) - expect(MessageBus.keepalive_interval).to eq(-1) + expect(fallback_handler.use_master?).to eq(true) + end + + it "tests that the master is alive and returns false if it is not" do + redis_status.expects(:master_alive?).returns(false) + expect(fallback_handler.use_master?).to eq(false) + + stop_after(1) + end + + it "tests that the master is alive and returns false if it raises an exception" do + error = Exception.new + redis_status.expects(:master_alive?).raises(error) + + Discourse.expects(:warn_exception) + .with(error, message: "Error running master_alive?") + + expect(fallback_handler.use_master?).to eq(false) + + stop_after(1) + end + + it "doesn't do anything to redis_status for a really long time" do + stop_after(1e9) + end + end + end + end + + context "when message bus and main are on the same host" do + use_fake_threads + + before do + # Since config is based on GlobalSetting, we need to fetch it before + # stubbing + conf = config + + GlobalSetting.stubs(:redis_config).returns(conf) + GlobalSetting.stubs(:message_bus_redis_config).returns(conf) + + Concurrency::ThreadedExecution.stubs(:new).returns(execution) + end + + context "when the redis master goes down" do + it "sets the message bus keepalive interval to 0" do + expect_master_info + .raises(Redis::ConnectionError.new) + + MessageBus.expects(:keepalive_interval=).with(0) + + DiscourseRedis::Connector.new(config).resolve + + execution.stop_other_tasks + end + end + + context "when the redis master comes back up" do + before do + MessageBus.keepalive_interval = 60 + + expect_master_info + .raises(Redis::ConnectionError.new) + + DiscourseRedis::Connector.new(config).resolve + + expect_master_info + .returns(info_response(['loading', '0'], ['role', 'master'])) + + expect_fallback + end + + it "sets the message bus keepalive interval to its original value" do + MessageBus.expects(:keepalive_interval=).with(60) + end + + it "calls clear_readonly! and request_refresh! on Discourse" do + Discourse.expects(:clear_readonly!) + Discourse.expects(:request_refresh!) + end + end + end + + context "when message bus and main are on different hosts" do + use_fake_threads + + before do + # Since config is based on GlobalSetting, we need to fetch it before stubbing + conf = config + + GlobalSetting.stubs(:redis_config).returns(conf) + + message_bus_config = conf.dup + message_bus_config[:port] = message_bus_config[:port].to_i + 1 + message_bus_config[:slave_port] = message_bus_config[:slave_port].to_i + 1 + + GlobalSetting.stubs(:message_bus_redis_config).returns(message_bus_config) + + Concurrency::ThreadedExecution.stubs(:new).returns(execution) + end + + let(:message_bus_master_config) { + GlobalSetting.message_bus_redis_config + } + + context "when the message bus master goes down" do + before do + expect_master_info(message_bus_master_config) + .raises(Redis::ConnectionError.new) + end + + it "sets the message bus keepalive interval to 0" do + MessageBus.expects(:keepalive_interval=).with(0) + + DiscourseRedis::Connector.new(message_bus_master_config).resolve + + execution.stop_other_tasks + end + + it "does not call clear_readonly! or request_refresh! on Discourse" do + Discourse.expects(:clear_readonly!).never + Discourse.expects(:request_refresh!).never + + DiscourseRedis::Connector.new(message_bus_master_config).resolve + + execution.stop_other_tasks + end + end + + context "when the message bus master comes back up" do + before do + MessageBus.keepalive_interval = 60 + + expect_master_info(message_bus_master_config) + .raises(Redis::ConnectionError.new) + + DiscourseRedis::Connector.new(message_bus_master_config).resolve + + expect_master_info(message_bus_master_config) + .returns(info_response(['loading', '0'], ['role', 'master'])) + + expect_fallback(DiscourseRedis.slave_config(message_bus_master_config)) + end + + it "sets the message bus keepalive interval to its original value" do + MessageBus.expects(:keepalive_interval=).with(60) + end + end + + context "when the main master goes down" do + before do + expect_master_info + .raises(Redis::ConnectionError.new) + end + + it "does not change the message bus keepalive interval" do + MessageBus.expects(:keepalive_interval=).never + + DiscourseRedis::Connector.new(config).resolve + + execution.stop_other_tasks + end + end + + context "when the main master comes back up" do + before do + expect_master_info + .raises(Redis::ConnectionError.new) + + DiscourseRedis::Connector.new(config).resolve + + expect_master_info + .returns(info_response(['loading', '0'], ['role', 'master'])) + + expect_fallback + end + + it "does not change the message bus keepalive interval" do + MessageBus.expects(:keepalive_interval=).never + end + + it "calls clear_readonly! and request_refresh! on Discourse" do + Discourse.expects(:clear_readonly!) + Discourse.expects(:request_refresh!) end end end diff --git a/spec/support/concurrency.rb b/spec/support/concurrency.rb index 5d3cd0c87c4..062bcab56b5 100644 --- a/spec/support/concurrency.rb +++ b/spec/support/concurrency.rb @@ -30,7 +30,7 @@ module Concurrency end def choose_with_weights(*options) - choose(options.map(&:first)) + choose(*options.map(&:first)) end def dead_end @@ -147,10 +147,11 @@ module Concurrency def initialize(path) @path = path @tasks = [] + @time = 0 end def yield - Fiber.yield + sleep(0) end def choose(*options) @@ -161,30 +162,86 @@ module Concurrency @path.choose_with_weights(*options) end - def spawn(&blk) - @tasks << Fiber.new(&blk) + def stop_other_tasks + @tasts = @tasks.select! { |task| task[:fiber] == Fiber.current } end - def run + def sleep(length) + Fiber.yield(@time + length) + end + + def start_root(&blk) + descriptor = { + fiber: Fiber.new(&blk), + run_at: 0 + } + + @tasks << descriptor + end + + def spawn(&blk) + descriptor = { + fiber: Fiber.new(&blk), + run_at: @time + } + + @tasks << descriptor + + self.yield + end + + def run(sleep_order: false) until @tasks.empty? - task = @path.choose(*@tasks) - task.resume - unless task.alive? - @tasks.delete(task) + descriptor = + if sleep_order + @tasks.sort_by! { |x| x[:run_at] } + run_at = @tasks.first[:run_at] + @path.choose(*@tasks.take_while { |x| x[:run_at] == run_at }) + else + @path.choose(*@tasks) + end + + @time = [@time, descriptor[:run_at]].max + fiber = descriptor[:fiber] + + begin + run_at = fiber.resume + rescue Exception + end + + if fiber.alive? + descriptor[:run_at] = run_at + else + @tasks.delete(descriptor) end end end + + def wait_done + until @tasks.size == 1 + self.sleep(1e9) + end + end + + def new_mutex + Mutex.new(self) + end end - def run_with_path(path) + def run_with_path(path, sleep_order: false) execution = Execution.new(path) - result = @blk.call(execution) - execution.run + result = {} + execution.start_root { + result[:value] = @blk.call(execution) + } + execution.run(sleep_order: sleep_order) result end - def run(**opts) - Logic.run(**opts, &method(:run_with_path)) + def run(sleep_order: false, **opts) + Logic.run(**opts) do |path| + run_with_path(path, sleep_order: sleep_order) + end end end @@ -250,4 +307,45 @@ module Concurrency result end end + + class Mutex + def initialize(execution) + @execution = execution + @locked_by = nil + end + + def lock + @execution.yield + + fiber = Fiber.current + while true + if @locked_by.nil? + @locked_by = fiber + return + elsif @locked_by == fiber + raise ThreadError, "deadlock; recursive locking" + else + @execution.yield + end + end + end + + def unlock + @execution.yield + + if @locked_by != Fiber.current + raise ThreadError, "Attempt to unlock a mutex which is locked by another thread" + end + @locked_by = nil + end + + def synchronize + lock + begin + yield + ensure + unlock + end + end + end end