mirror of
https://github.com/discourse/discourse.git
synced 2024-11-24 15:49:55 +08:00
bb217bbcc8
PresenceChannel configuration is cached using redis. That cache is used, and sometimes repopulated, during normal GET requests. When the primary redis server was readonly, that `redis.set` call would raise an error and cause the entire request to fail. Instead, we should ignore the failure and continue without populating the cache.
625 lines
20 KiB
Ruby
625 lines
20 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
# The server-side implementation of PresenceChannels. See also {PresenceController}
|
|
# and +app/assets/javascripts/discourse/app/services/presence.js+
|
|
class PresenceChannel
|
|
class NotFound < StandardError
|
|
end
|
|
class InvalidAccess < StandardError
|
|
end
|
|
class ConfigNotLoaded < StandardError
|
|
end
|
|
class InvalidConfig < StandardError
|
|
end
|
|
|
|
class State
|
|
include ActiveModel::Serialization
|
|
|
|
attr_reader :message_bus_last_id
|
|
attr_reader :user_ids
|
|
attr_reader :count
|
|
|
|
def initialize(message_bus_last_id:, user_ids: nil, count: nil)
|
|
raise "user_ids or count required" if user_ids.nil? && count.nil?
|
|
@message_bus_last_id = message_bus_last_id
|
|
@user_ids = user_ids
|
|
@count = count || user_ids.count
|
|
end
|
|
|
|
def users
|
|
return nil if user_ids.nil?
|
|
User.where(id: user_ids)
|
|
end
|
|
end
|
|
|
|
# Class for managing config of PresenceChannel
|
|
# Three parameters can be provided on initialization:
|
|
# public: boolean value. If true, channel information is visible to all users (default false)
|
|
# allowed_user_ids: array of user_ids that can view, and become present in, the channel (default [])
|
|
# allowed_group_ids: array of group_ids that can view, and become present in, the channel (default [])
|
|
# count_only: boolean. If true, user identities are never revealed to clients. (default [])
|
|
class Config
|
|
NOT_FOUND ||= "notfound"
|
|
|
|
attr_accessor :public, :allowed_user_ids, :allowed_group_ids, :count_only, :timeout
|
|
|
|
def initialize(
|
|
public: false,
|
|
allowed_user_ids: nil,
|
|
allowed_group_ids: nil,
|
|
count_only: false,
|
|
timeout: nil
|
|
)
|
|
@public = public
|
|
@allowed_user_ids = allowed_user_ids
|
|
@allowed_group_ids = allowed_group_ids
|
|
@count_only = count_only
|
|
@timeout = timeout
|
|
end
|
|
|
|
def self.from_json(json)
|
|
data = JSON.parse(json, symbolize_names: true)
|
|
data = {} if !data.is_a? Hash
|
|
new(**data.slice(:public, :allowed_user_ids, :allowed_group_ids, :count_only, :timeout))
|
|
end
|
|
|
|
def to_json
|
|
data = { public: public }
|
|
data[:allowed_user_ids] = allowed_user_ids if allowed_user_ids
|
|
data[:allowed_group_ids] = allowed_group_ids if allowed_group_ids
|
|
data[:count_only] = count_only if count_only
|
|
data[:timeout] = timeout if timeout
|
|
data.to_json
|
|
end
|
|
end
|
|
|
|
DEFAULT_TIMEOUT ||= 60
|
|
CONFIG_CACHE_SECONDS ||= 10
|
|
GC_SECONDS ||= 24.hours.to_i
|
|
MUTEX_TIMEOUT_SECONDS ||= 10
|
|
MUTEX_LOCKED_ERROR ||= "PresenceChannel mutex is locked"
|
|
|
|
@@configuration_blocks ||= {}
|
|
|
|
attr_reader :name, :timeout, :message_bus_channel_name, :config
|
|
|
|
def initialize(name, raise_not_found: true, use_cache: true)
|
|
@name = name
|
|
@message_bus_channel_name = "/presence#{name}"
|
|
|
|
begin
|
|
@config = fetch_config(use_cache: use_cache)
|
|
rescue PresenceChannel::NotFound
|
|
raise if raise_not_found
|
|
@config = Config.new
|
|
end
|
|
|
|
@timeout = config.timeout || DEFAULT_TIMEOUT
|
|
end
|
|
|
|
# Is this user allowed to view this channel?
|
|
# Pass `nil` for anonymous viewers
|
|
def can_view?(user_id: nil, group_ids: nil)
|
|
return true if config.public
|
|
return true if user_id && config.allowed_user_ids&.include?(user_id)
|
|
|
|
if user_id && config.allowed_group_ids.present?
|
|
return true if config.allowed_group_ids.include?(Group::AUTO_GROUPS[:everyone])
|
|
group_ids ||= GroupUser.where(user_id: user_id).pluck("group_id")
|
|
return true if (group_ids & config.allowed_group_ids).present?
|
|
end
|
|
false
|
|
end
|
|
|
|
# Is a user allowed to enter this channel?
|
|
# Currently equal to the the can_view? permission
|
|
def can_enter?(user_id: nil, group_ids: nil)
|
|
return false if user_id.nil?
|
|
can_view?(user_id: user_id, group_ids: group_ids)
|
|
end
|
|
|
|
# Mark a user's client as present in this channel. The client_id should be unique per
|
|
# browser tab. This method should be called repeatedly (at least once every DEFAULT_TIMEOUT)
|
|
# while the user is present in the channel.
|
|
def present(user_id:, client_id:)
|
|
raise PresenceChannel::InvalidAccess if !can_enter?(user_id: user_id)
|
|
|
|
mutex_value = SecureRandom.hex
|
|
result =
|
|
retry_on_mutex_error do
|
|
PresenceChannel.redis_eval(
|
|
:present,
|
|
redis_keys,
|
|
[name, user_id, client_id, (Time.zone.now + timeout).to_i, mutex_value],
|
|
)
|
|
end
|
|
|
|
if result == 1
|
|
begin
|
|
publish_message(entering_user_ids: [user_id])
|
|
ensure
|
|
release_mutex(mutex_value)
|
|
end
|
|
end
|
|
end
|
|
|
|
# Immediately mark a user's client as leaving the channel
|
|
def leave(user_id:, client_id:)
|
|
mutex_value = SecureRandom.hex
|
|
result =
|
|
retry_on_mutex_error do
|
|
PresenceChannel.redis_eval(:leave, redis_keys, [name, user_id, client_id, nil, mutex_value])
|
|
end
|
|
|
|
if result == 1
|
|
begin
|
|
publish_message(leaving_user_ids: [user_id])
|
|
ensure
|
|
release_mutex(mutex_value)
|
|
end
|
|
end
|
|
end
|
|
|
|
# Fetch a {PresenceChannel::State} instance representing the current state of this
|
|
#
|
|
# @param [Boolean] count_only set true to skip fetching the list of user ids from redis
|
|
def state(count_only: config.count_only)
|
|
if count_only
|
|
last_id, count = retry_on_mutex_error { PresenceChannel.redis_eval(:count, redis_keys) }
|
|
else
|
|
last_id, ids = retry_on_mutex_error { PresenceChannel.redis_eval(:user_ids, redis_keys) }
|
|
end
|
|
count ||= ids&.count
|
|
last_id = nil if last_id == -1
|
|
|
|
if Rails.env.test? && MessageBus.backend == :memory
|
|
# Doing it this way is not atomic, but we have no other option when
|
|
# messagebus is not using the redis backend
|
|
last_id = MessageBus.last_id(message_bus_channel_name)
|
|
end
|
|
|
|
State.new(message_bus_last_id: last_id, user_ids: ids, count: count)
|
|
end
|
|
|
|
def user_ids
|
|
state.user_ids
|
|
end
|
|
|
|
def count
|
|
state(count_only: true).count
|
|
end
|
|
|
|
# Automatically expire all users which have not been 'present' for more than +DEFAULT_TIMEOUT+
|
|
def auto_leave
|
|
mutex_value = SecureRandom.hex
|
|
left_user_ids =
|
|
retry_on_mutex_error do
|
|
PresenceChannel.redis_eval(:auto_leave, redis_keys, [name, Time.zone.now.to_i, mutex_value])
|
|
end
|
|
|
|
if !left_user_ids.empty?
|
|
begin
|
|
publish_message(leaving_user_ids: left_user_ids)
|
|
ensure
|
|
release_mutex(mutex_value)
|
|
end
|
|
end
|
|
end
|
|
|
|
# Clear all members of the channel. This is intended for debugging/development only
|
|
def clear
|
|
PresenceChannel.redis.del(redis_key_zlist)
|
|
PresenceChannel.redis.del(redis_key_hash)
|
|
PresenceChannel.redis.del(redis_key_config)
|
|
PresenceChannel.redis.del(redis_key_mutex)
|
|
PresenceChannel.redis.zrem(self.class.redis_key_channel_list, name)
|
|
end
|
|
|
|
# Designed to be run periodically. Checks the channel list for channels with expired members,
|
|
# and runs auto_leave for each eligible channel
|
|
def self.auto_leave_all
|
|
channels_with_expiring_members =
|
|
PresenceChannel.redis.zrangebyscore(redis_key_channel_list, "-inf", Time.zone.now.to_i)
|
|
channels_with_expiring_members.each { |name| new(name, raise_not_found: false).auto_leave }
|
|
end
|
|
|
|
# Clear all known channels. This is intended for debugging/development only
|
|
def self.clear_all!
|
|
channels = PresenceChannel.redis.zrangebyscore(redis_key_channel_list, "-inf", "+inf")
|
|
channels.each { |name| new(name, raise_not_found: false).clear }
|
|
|
|
config_cache_keys =
|
|
PresenceChannel
|
|
.redis
|
|
.scan_each(match: Discourse.redis.namespace_key("_presence_*_config"))
|
|
.to_a
|
|
PresenceChannel.redis.del(*config_cache_keys) if config_cache_keys.present?
|
|
end
|
|
|
|
# Shortcut to access a redis client for all PresenceChannel activities.
|
|
# PresenceChannel must use the same Redis server as MessageBus, so that
|
|
# actions can be applied atomically. For the vast majority of Discourse
|
|
# installations, this is the same Redis server as `Discourse.redis`.
|
|
def self.redis
|
|
if MessageBus.backend == :redis
|
|
MessageBus.backend_instance.send(:pub_redis) # TODO: avoid a private API?
|
|
elsif Rails.env.test?
|
|
Discourse.redis.without_namespace
|
|
else
|
|
raise "PresenceChannel is unable to access MessageBus's Redis instance"
|
|
end
|
|
end
|
|
|
|
def self.redis_eval(key, *args)
|
|
LUA_SCRIPTS[key].eval(redis, *args)
|
|
end
|
|
|
|
# Register a callback to configure channels with a given prefix
|
|
# Prefix must match [a-zA-Z0-9_-]+
|
|
#
|
|
# For example, this registration will be used for
|
|
# all channels starting /topic-reply/...:
|
|
#
|
|
# register_prefix("topic-reply") do |channel_name|
|
|
# PresenceChannel::Config.new(public: true)
|
|
# end
|
|
#
|
|
# At runtime, the block will be passed a full channel name. If the channel
|
|
# should not exist, the block should return `nil`. If the channel should exist,
|
|
# the block should return a PresenceChannel::Config object.
|
|
#
|
|
# Return values may be cached for up to 10 seconds.
|
|
#
|
|
# Plugins should use the {Plugin::Instance.register_presence_channel_prefix} API instead
|
|
def self.register_prefix(prefix, &block)
|
|
unless prefix.match? /[a-zA-Z0-9_-]+/
|
|
raise "PresenceChannel prefix #{prefix} must match [a-zA-Z0-9_-]+"
|
|
end
|
|
if @@configuration_blocks&.[](prefix)
|
|
raise "PresenceChannel prefix #{prefix} already registered"
|
|
end
|
|
@@configuration_blocks[prefix] = block
|
|
end
|
|
|
|
# For use in a test environment only
|
|
def self.unregister_prefix(prefix)
|
|
raise "Only allowed in test environment" if !Rails.env.test?
|
|
@@configuration_blocks&.delete(prefix)
|
|
end
|
|
|
|
private
|
|
|
|
def fetch_config(use_cache: true)
|
|
cached_config = (PresenceChannel.redis.get(redis_key_config) if use_cache)
|
|
|
|
if cached_config == Config::NOT_FOUND
|
|
raise PresenceChannel::NotFound
|
|
elsif cached_config
|
|
Config.from_json(cached_config)
|
|
else
|
|
prefix = name[%r{/([a-zA-Z0-9_-]+)/.*}, 1]
|
|
raise PresenceChannel::NotFound if prefix.nil?
|
|
|
|
config_block = @@configuration_blocks[prefix]
|
|
config_block ||=
|
|
DiscoursePluginRegistry.presence_channel_prefixes.find { |t| t[0] == prefix }&.[](1)
|
|
raise PresenceChannel::NotFound if config_block.nil?
|
|
|
|
result = config_block.call(name)
|
|
to_cache =
|
|
if result.is_a? Config
|
|
result.to_json
|
|
elsif result.nil?
|
|
Config::NOT_FOUND
|
|
else
|
|
raise InvalidConfig.new "Expected PresenceChannel::Config or nil. Got a #{result.class.name}"
|
|
end
|
|
|
|
DiscourseRedis.ignore_readonly do
|
|
PresenceChannel.redis.set(redis_key_config, to_cache, ex: CONFIG_CACHE_SECONDS)
|
|
end
|
|
|
|
raise PresenceChannel::NotFound if result.nil?
|
|
result
|
|
end
|
|
end
|
|
|
|
def publish_message(entering_user_ids: nil, leaving_user_ids: nil)
|
|
message = {}
|
|
if config.count_only
|
|
message["count_delta"] = entering_user_ids&.count || 0
|
|
message["count_delta"] -= leaving_user_ids&.count || 0
|
|
return if message["count_delta"] == 0
|
|
else
|
|
message["leaving_user_ids"] = leaving_user_ids if leaving_user_ids.present?
|
|
if entering_user_ids.present?
|
|
users = User.where(id: entering_user_ids)
|
|
message["entering_users"] = ActiveModel::ArraySerializer.new(
|
|
users,
|
|
each_serializer: BasicUserSerializer,
|
|
)
|
|
end
|
|
end
|
|
|
|
params = {}
|
|
|
|
if config.public
|
|
# no params required
|
|
elsif config.allowed_user_ids || config.allowed_group_ids
|
|
params[:user_ids] = config.allowed_user_ids
|
|
params[:group_ids] = config.allowed_group_ids
|
|
else
|
|
# nobody is allowed... don't publish anything
|
|
return
|
|
end
|
|
|
|
MessageBus.publish(message_bus_channel_name, message.as_json, **params)
|
|
end
|
|
|
|
# Most atomic actions are achieved via lua scripts. However, when a lua action
|
|
# will result in publishing a messagebus message, the atomicity is broken.
|
|
#
|
|
# For example, if one process is handling a 'user enter' event, and another is
|
|
# handling a 'user leave' event, we need to make sure the messagebus messages
|
|
# are published in the same sequence that the PresenceChannel lua script are run.
|
|
#
|
|
# The present/leave/auto_leave lua scripts will automatically acquire this mutex
|
|
# if needed. If their return value indicates a change has occurred, the mutex
|
|
# should be released via #release_mutex after the messagebus message has been sent
|
|
#
|
|
# If they need a change, and the mutex is not available, they will raise an error
|
|
# and should be retried periodically
|
|
def redis_key_mutex
|
|
Discourse.redis.namespace_key("_presence_#{name}_mutex")
|
|
end
|
|
|
|
def release_mutex(mutex_value)
|
|
PresenceChannel.redis_eval(:release_mutex, [redis_key_mutex], [mutex_value])
|
|
end
|
|
|
|
def retry_on_mutex_error
|
|
attempts ||= 0
|
|
yield
|
|
rescue ::Redis::CommandError => e
|
|
if e.to_s =~ /#{MUTEX_LOCKED_ERROR}/ && attempts < 1000
|
|
attempts += 1
|
|
sleep 0.001
|
|
retry
|
|
else
|
|
raise
|
|
end
|
|
end
|
|
|
|
# The redis key which MessageBus uses to store the 'last_id' for the channel
|
|
# associated with this PresenceChannel.
|
|
def message_bus_last_id_key
|
|
return "" if Rails.env.test? && MessageBus.backend == :memory
|
|
|
|
# TODO: Avoid using private MessageBus methods here
|
|
encoded_channel_name = MessageBus.send(:encode_channel_name, message_bus_channel_name)
|
|
MessageBus.backend_instance.send(:backlog_id_key, encoded_channel_name)
|
|
end
|
|
|
|
def redis_keys
|
|
[
|
|
redis_key_zlist,
|
|
redis_key_hash,
|
|
self.class.redis_key_channel_list,
|
|
message_bus_last_id_key,
|
|
redis_key_mutex,
|
|
]
|
|
end
|
|
|
|
# The zlist is a list of client_ids, ranked by their expiration timestamp
|
|
# we periodically delete the 'lowest ranked' items in this list based on the `timeout` of the channel
|
|
def redis_key_zlist
|
|
Discourse.redis.namespace_key("_presence_#{name}_zlist")
|
|
end
|
|
|
|
# The hash contains a map of user_id => session_count
|
|
# when the count for a user reaches 0, the key is deleted
|
|
# We use this hash to efficiently count the number of present users
|
|
def redis_key_hash
|
|
Discourse.redis.namespace_key("_presence_#{name}_hash")
|
|
end
|
|
|
|
# The hash contains a map of user_id => session_count
|
|
# when the count for a user reaches 0, the key is deleted
|
|
# We use this hash to efficiently count the number of present users
|
|
def redis_key_config
|
|
Discourse.redis.namespace_key("_presence_#{name}_config")
|
|
end
|
|
|
|
# This list contains all active presence channels, ranked with the expiration timestamp of their least-recently-seen client_id
|
|
# We periodically check the 'lowest ranked' items in this list based on the `timeout` of the channel
|
|
def self.redis_key_channel_list
|
|
Discourse.redis.namespace_key("_presence_channels")
|
|
end
|
|
|
|
COMMON_PRESENT_LEAVE_LUA = <<~LUA
|
|
local channel = ARGV[1]
|
|
local user_id = ARGV[2]
|
|
local client_id = ARGV[3]
|
|
local expires = ARGV[4]
|
|
local mutex_value = ARGV[5]
|
|
|
|
local zlist_key = KEYS[1]
|
|
local hash_key = KEYS[2]
|
|
local channels_key = KEYS[3]
|
|
local message_bus_id_key = KEYS[4]
|
|
local mutex_key = KEYS[5]
|
|
|
|
local mutex_locked = redis.call('EXISTS', mutex_key) == 1
|
|
|
|
local zlist_elem = tostring(user_id) .. " " .. tostring(client_id)
|
|
LUA
|
|
|
|
UPDATE_GLOBAL_CHANNELS_LUA = <<~LUA
|
|
-- Update the global channels list with the timestamp of the oldest client
|
|
local oldest_client = redis.call('ZRANGE', zlist_key, 0, 0, 'WITHSCORES')
|
|
if table.getn(oldest_client) > 0 then
|
|
local oldest_client_expire_timestamp = oldest_client[2]
|
|
redis.call('ZADD', channels_key, tonumber(oldest_client_expire_timestamp), tostring(channel))
|
|
else
|
|
-- The channel is now empty, delete from global list
|
|
redis.call('ZREM', channels_key, tostring(channel))
|
|
end
|
|
LUA
|
|
|
|
LUA_SCRIPTS ||= {}
|
|
|
|
LUA_SCRIPTS[:present] = DiscourseRedis::EvalHelper.new <<~LUA
|
|
#{COMMON_PRESENT_LEAVE_LUA}
|
|
|
|
if mutex_locked then
|
|
local mutex_required = redis.call('HGET', hash_key, tostring(user_id)) == false
|
|
if mutex_required then
|
|
error("#{MUTEX_LOCKED_ERROR}")
|
|
end
|
|
end
|
|
|
|
local added_clients = redis.call('ZADD', zlist_key, expires, zlist_elem)
|
|
local added_users = 0
|
|
if tonumber(added_clients) > 0 then
|
|
local new_count = redis.call('HINCRBY', hash_key, tostring(user_id), 1)
|
|
if new_count == 1 then
|
|
added_users = 1
|
|
redis.call('SET', mutex_key, mutex_value, 'EX', #{MUTEX_TIMEOUT_SECONDS})
|
|
end
|
|
-- Add the channel to the global channel list. 'NX' means the value will
|
|
-- only be set if doesn't already exist
|
|
redis.call('ZADD', channels_key, "NX", expires, tostring(channel))
|
|
end
|
|
|
|
redis.call('EXPIREAT', hash_key, expires + #{GC_SECONDS})
|
|
redis.call('EXPIREAT', zlist_key, expires + #{GC_SECONDS})
|
|
|
|
return added_users
|
|
LUA
|
|
|
|
LUA_SCRIPTS[:leave] = DiscourseRedis::EvalHelper.new <<~LUA
|
|
#{COMMON_PRESENT_LEAVE_LUA}
|
|
|
|
if mutex_locked then
|
|
local user_session_count = redis.call('HGET', hash_key, tostring(user_id))
|
|
local mutex_required = user_session_count == 1 and redis.call('ZRANK', zlist_key, zlist_elem) ~= false
|
|
if mutex_required then
|
|
error("#{MUTEX_LOCKED_ERROR}")
|
|
end
|
|
end
|
|
|
|
-- Remove the user from the channel zlist
|
|
local removed_clients = redis.call('ZREM', zlist_key, zlist_elem)
|
|
|
|
local removed_users = 0
|
|
if tonumber(removed_clients) > 0 then
|
|
#{UPDATE_GLOBAL_CHANNELS_LUA}
|
|
|
|
-- Update the user session count in the channel hash
|
|
local val = redis.call('HINCRBY', hash_key, user_id, -1)
|
|
if val <= 0 then
|
|
redis.call('HDEL', hash_key, user_id)
|
|
removed_users = 1
|
|
redis.call('SET', mutex_key, mutex_value, 'EX', #{MUTEX_TIMEOUT_SECONDS})
|
|
end
|
|
end
|
|
|
|
return removed_users
|
|
LUA
|
|
|
|
LUA_SCRIPTS[:release_mutex] = DiscourseRedis::EvalHelper.new <<~LUA
|
|
local mutex_key = KEYS[1]
|
|
local expected_value = ARGV[1]
|
|
|
|
if redis.call("GET", mutex_key) == expected_value then
|
|
redis.call("DEL", mutex_key)
|
|
end
|
|
LUA
|
|
|
|
LUA_SCRIPTS[:user_ids] = DiscourseRedis::EvalHelper.new <<~LUA
|
|
local zlist_key = KEYS[1]
|
|
local hash_key = KEYS[2]
|
|
local message_bus_id_key = KEYS[4]
|
|
local mutex_key = KEYS[5]
|
|
|
|
if redis.call('EXISTS', mutex_key) > 0 then
|
|
error('#{MUTEX_LOCKED_ERROR}')
|
|
end
|
|
|
|
local user_ids = redis.call('HKEYS', hash_key)
|
|
table.foreach(user_ids, function(k,v) user_ids[k] = tonumber(v) end)
|
|
|
|
local message_bus_id = tonumber(redis.call('GET', message_bus_id_key))
|
|
if message_bus_id == nil then
|
|
message_bus_id = -1
|
|
end
|
|
|
|
return { message_bus_id, user_ids }
|
|
LUA
|
|
|
|
LUA_SCRIPTS[:count] = DiscourseRedis::EvalHelper.new <<~LUA
|
|
local zlist_key = KEYS[1]
|
|
local hash_key = KEYS[2]
|
|
local message_bus_id_key = KEYS[4]
|
|
local mutex_key = KEYS[5]
|
|
|
|
if redis.call('EXISTS', mutex_key) > 0 then
|
|
error('#{MUTEX_LOCKED_ERROR}')
|
|
end
|
|
|
|
local message_bus_id = tonumber(redis.call('GET', message_bus_id_key))
|
|
if message_bus_id == nil then
|
|
message_bus_id = -1
|
|
end
|
|
|
|
local count = redis.call('HLEN', hash_key)
|
|
|
|
return { message_bus_id, count }
|
|
LUA
|
|
|
|
LUA_SCRIPTS[:auto_leave] = DiscourseRedis::EvalHelper.new <<~LUA
|
|
local zlist_key = KEYS[1]
|
|
local hash_key = KEYS[2]
|
|
local channels_key = KEYS[3]
|
|
local mutex_key = KEYS[5]
|
|
local channel = ARGV[1]
|
|
local time = ARGV[2]
|
|
local mutex_value = ARGV[3]
|
|
|
|
local expire = redis.call('ZRANGEBYSCORE', zlist_key, '-inf', time)
|
|
|
|
local has_mutex = false
|
|
|
|
local get_mutex = function()
|
|
if redis.call('SETNX', mutex_key, mutex_value) == 0 then
|
|
error("#{MUTEX_LOCKED_ERROR}")
|
|
end
|
|
redis.call('EXPIRE', mutex_key, #{MUTEX_TIMEOUT_SECONDS})
|
|
has_mutex = true
|
|
end
|
|
|
|
local expired_user_ids = {}
|
|
|
|
local expireOld = function(k, v)
|
|
local user_id = v:match("[^ ]+")
|
|
|
|
if (not has_mutex) and (tonumber(redis.call('HGET', hash_key, user_id)) == 1) then
|
|
get_mutex()
|
|
end
|
|
|
|
local val = redis.call('HINCRBY', hash_key, user_id, -1)
|
|
if val <= 0 then
|
|
table.insert(expired_user_ids, tonumber(user_id))
|
|
redis.call('HDEL', hash_key, user_id)
|
|
end
|
|
redis.call('ZREM', zlist_key, v)
|
|
end
|
|
|
|
table.foreach(expire, expireOld)
|
|
|
|
#{UPDATE_GLOBAL_CHANNELS_LUA}
|
|
|
|
return expired_user_ids
|
|
LUA
|
|
end
|