From 2df3c65ba90a3a7c6ca27e1733502521c5f6243a Mon Sep 17 00:00:00 2001 From: Sam Date: Tue, 10 May 2022 08:19:02 +1000 Subject: [PATCH] FIX: add support for pipelined and multi redis commands (#16682) Latest redis interoduces a block form of multi / pipelined, this was incorrectly passed through and not namespaced. Fix also updates logster, we held off on upgrading it due to missing functions --- Gemfile | 2 +- Gemfile.lock | 6 ++--- app/models/site.rb | 8 +++--- app/services/random_topic_selector.rb | 12 ++++----- lib/discourse_redis.rb | 24 +++++++++++++++-- lib/distributed_mutex.rb | 12 +++++---- lib/redis_snapshot.rb | 10 +++---- lib/tasks/redis.rake | 4 +-- spec/lib/discourse_redis_spec.rb | 39 +++++++++++++++++++++++++++ 9 files changed, 89 insertions(+), 28 deletions(-) diff --git a/Gemfile b/Gemfile index f9286c4ef50..bb5d41e03b3 100644 --- a/Gemfile +++ b/Gemfile @@ -230,7 +230,7 @@ gem 'cppjieba_rb', require: false gem 'lograge', require: false gem 'logstash-event', require: false gem 'logstash-logger', require: false -gem 'logster', '2.11.0' +gem 'logster' # NOTE: later versions of sassc are causing a segfault, possibly dependent on processer architecture # and until resolved should be locked at 2.0.1 diff --git a/Gemfile.lock b/Gemfile.lock index f88aa43dfb9..5c806d6dc71 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -215,7 +215,7 @@ GEM logstash-event (1.2.02) logstash-logger (0.26.1) logstash-event (~> 1.2) - logster (2.11.0) + logster (2.11.2) loofah (2.17.0) crass (~> 1.0.2) nokogiri (>= 1.5.9) @@ -560,7 +560,7 @@ DEPENDENCIES lograge logstash-event logstash-logger - logster (= 2.11.0) + logster loofah lru_redux lz4-ruby @@ -638,4 +638,4 @@ DEPENDENCIES yaml-lint BUNDLED WITH - 2.3.5 + 2.3.13 diff --git a/app/models/site.rb b/app/models/site.rb index 12353dbd5f6..8b6df669974 100644 --- a/app/models/site.rb +++ b/app/models/site.rb @@ -181,10 +181,10 @@ class Site json = MultiJson.dump(SiteSerializer.new(site, root: false, scope: guardian)) if guardian.anonymous? - Discourse.redis.multi do - Discourse.redis.setex 'site_json', 1800, json - Discourse.redis.set 'site_json_seq', seq - Discourse.redis.set 'site_json_version', Discourse.git_version + Discourse.redis.multi do |transaction| + transaction.setex 'site_json', 1800, json + transaction.set 'site_json_seq', seq + transaction.set 'site_json_version', Discourse.git_version end end diff --git a/app/services/random_topic_selector.rb b/app/services/random_topic_selector.rb index 467b4dba5f3..8b8d121befe 100644 --- a/app/services/random_topic_selector.rb +++ b/app/services/random_topic_selector.rb @@ -40,9 +40,9 @@ class RandomTopicSelector key = cache_key(category) if results.present? - Discourse.redis.multi do - Discourse.redis.rpush(key, results) - Discourse.redis.expire(key, 2.days) + Discourse.redis.multi do |transaction| + transaction.rpush(key, results) + transaction.expire(key, 2.days) end end @@ -56,9 +56,9 @@ class RandomTopicSelector return results if count < 1 - results = Discourse.redis.multi do - Discourse.redis.lrange(key, 0, count - 1) - Discourse.redis.ltrim(key, count, -1) + results = Discourse.redis.multi do |transaction| + transaction.lrange(key, 0, count - 1) + transaction.ltrim(key, count, -1) end if !results.is_a?(Array) # Redis is in readonly mode diff --git a/lib/discourse_redis.rb b/lib/discourse_redis.rb index e5a26aeb31f..0634cb36e31 100644 --- a/lib/discourse_redis.rb +++ b/lib/discourse_redis.rb @@ -14,9 +14,9 @@ class DiscourseRedis GlobalSetting.redis_config end - def initialize(config = nil, namespace: true) + def initialize(config = nil, namespace: true, raw_redis: nil) @config = config || DiscourseRedis.config - @redis = DiscourseRedis.raw_connection(@config.dup) + @redis = raw_redis || DiscourseRedis.raw_connection(@config.dup) @namespace = namespace end @@ -150,6 +150,26 @@ class DiscourseRedis Cache.new end + def multi + if block_given? + @redis.multi do |transaction| + yield DiscourseRedis.new(@config, namespace: @namespace, raw_redis: transaction) + end + else + @redis.multi + end + end + + def pipelined + if block_given? + @redis.pipelined do |transaction| + yield DiscourseRedis.new(@config, namespace: @namespace, raw_redis: transaction) + end + else + @redis.pipelined + end + end + private def remove_namespace(key) diff --git a/lib/distributed_mutex.rb b/lib/distributed_mutex.rb index fed5d91f7de..824a81599d6 100644 --- a/lib/distributed_mutex.rb +++ b/lib/distributed_mutex.rb @@ -93,9 +93,9 @@ class DistributedMutex got_lock = false else result = - redis.multi do - redis.set key, expire_time.to_s - redis.expireat key, expire_time + 1 + redis.multi do |transaction| + transaction.set key, expire_time.to_s + transaction.expireat key, expire_time + 1 end got_lock = !result.nil? @@ -112,9 +112,11 @@ class DistributedMutex current_expire_time = redis.get key if current_expire_time == expire_time.to_s + # MULTI is the way redis ensures the watched key + # has not changed by the time it is deleted result = - redis.multi do - redis.del key + redis.multi do |transaction| + transaction.del key end return !result.nil? else diff --git a/lib/redis_snapshot.rb b/lib/redis_snapshot.rb index 3eb78fb3a16..1ef40792eec 100644 --- a/lib/redis_snapshot.rb +++ b/lib/redis_snapshot.rb @@ -14,9 +14,9 @@ class RedisSnapshot keys = redis.keys values = - redis.pipelined do + redis.pipelined do |batch| keys.each do |key| - redis.dump(key) + batch.dump(key) end end @@ -28,11 +28,11 @@ class RedisSnapshot end def restore(redis = Discourse.redis) - redis.pipelined do - redis.flushdb + redis.pipelined do |batch| + batch.flushdb @dump.each do |key, value| - redis.restore(key, 0, value) + batch.restore(key, 0, value) end end diff --git a/lib/tasks/redis.rake b/lib/tasks/redis.rake index ad824bb503c..ee45cb0e19f 100644 --- a/lib/tasks/redis.rake +++ b/lib/tasks/redis.rake @@ -15,13 +15,13 @@ task 'redis:clean_up' => ['environment'] do cursor, keys = redis.scan(cursor) cursor = cursor.to_i - redis.multi do + redis.multi do |transaction| keys.each do |key| if match = key.match(regexp) db_name = match[:message_bus] || match[:namespace] if !dbs.include?(db_name) - redis.del(key) + transaction.del(key) end end end diff --git a/spec/lib/discourse_redis_spec.rb b/spec/lib/discourse_redis_spec.rb index a898f3f7f58..3a82beed8e4 100644 --- a/spec/lib/discourse_redis_spec.rb +++ b/spec/lib/discourse_redis_spec.rb @@ -17,6 +17,45 @@ describe DiscourseRedis do raw_redis.flushdb end + describe 'pipelined / multi' do + let(:redis) { DiscourseRedis.new } + + it 'should support multi commands' do + val = redis.multi do |transaction| + transaction.set 'foo', 'bar' + transaction.set 'bar', 'foo' + transaction.get 'bar' + end + + expect(raw_redis.get('foo')).to eq(nil) + expect(raw_redis.get('bar')).to eq(nil) + expect(redis.get('foo')).to eq('bar') + expect(redis.get('bar')).to eq('foo') + + expect(val).to eq(["OK", "OK", "foo"]) + end + + it 'should support pipelined commands' do + set, incr = nil + val = redis.pipelined do |pipeline| + set = pipeline.set "foo", "baz" + incr = pipeline.incr "baz" + end + + expect(val).to eq(["OK", 1]) + + expect(set.value).to eq("OK") + expect(incr.value).to eq(1) + + expect(raw_redis.get('foo')).to eq(nil) + expect(raw_redis.get('baz')).to eq(nil) + + expect(redis.get('foo')).to eq("baz") + expect(redis.get('baz')).to eq("1") + + end + end + describe 'when namespace is enabled' do let(:redis) { DiscourseRedis.new }