FEATURE: Reintroduce better thread reply counter cache (#21197)

This was reverted in 38cebd3ed5.
The issue was that I was using Discourse.redis.delete_prefixed
which does a slow redis KEYS lookup, which is not advised in
production. This commit removes that, and also ensures the periodical
thread count update only happens if threading is enabled.

I changed to use a redis INCR/DECR for reply count
cache. This avoids a round trip to redis to GET the current
count, and also avoids multi-process issues, where
if there's two processes trying to increment at the
same time, they may both receive the same value, add one
to it, then both write the same value back.
Then, it's only n+1 instead of n+2.

This also prevents almost all chat scheduled jobs from
running if chat is disabled, the only one remaining is
the message retention job.
This commit is contained in:
Martin Brennan 2023-04-24 09:32:04 +10:00 committed by GitHub
parent 21f93731a3
commit 24ec06ff85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 694 additions and 233 deletions

View File

@ -4,4 +4,10 @@ class Chat::Api::ChannelMessagesController < Chat::ApiController
def destroy def destroy
with_service(Chat::TrashMessage) { on_model_not_found(:message) { raise Discourse::NotFound } } with_service(Chat::TrashMessage) { on_model_not_found(:message) { raise Discourse::NotFound } }
end end
def restore
with_service(Chat::RestoreMessage) do
on_model_not_found(:message) { raise Discourse::NotFound }
end
end
end end

View File

@ -12,8 +12,7 @@ module Chat
# these endpoints require a standalone find because they need to be # these endpoints require a standalone find because they need to be
# able to get deleted channels and recover them. # able to get deleted channels and recover them.
before_action :find_chatable, only: %i[enable_chat disable_chat] before_action :find_chatable, only: %i[enable_chat disable_chat]
before_action :find_chat_message, before_action :find_chat_message, only: %i[lookup_message edit_message rebake message_link]
only: %i[delete restore lookup_message edit_message rebake message_link]
before_action :set_channel_and_chatable_with_access_check, before_action :set_channel_and_chatable_with_access_check,
except: %i[ except: %i[
respond respond
@ -233,18 +232,6 @@ module Chat
render json: success_json render json: success_json
end end
def restore
chat_channel = @message.chat_channel
guardian.ensure_can_restore_chat!(@message, chat_channel.chatable)
updated = @message.recover!
if updated
Chat::Publisher.publish_restore!(chat_channel, @message)
render json: success_json
else
render_json_error(@message)
end
end
def rebake def rebake
guardian.ensure_can_rebake_chat_message!(@message) guardian.ensure_can_rebake_chat_message!(@message)
@message.rebake!(invalidate_oneboxes: true) @message.rebake!(invalidate_oneboxes: true)

View File

@ -0,0 +1,24 @@
# frozen_string_literal: true
module Jobs
module Chat
class UpdateThreadReplyCount < Jobs::Base
def execute(args = {})
return if !SiteSetting.enable_experimental_chat_threaded_discussions
thread = ::Chat::Thread.find_by(id: args[:thread_id])
return if thread.blank?
return if thread.replies_count_cache_recently_updated?
Discourse.redis.setex(
::Chat::Thread.replies_count_cache_updated_at_redis_key(thread.id),
5.minutes.from_now.to_i,
Time.zone.now.to_i,
)
thread.set_replies_count_cache(thread.replies.count, update_db: true)
::Chat::Publisher.publish_thread_original_message_metadata!(thread)
end
end
end
end

View File

@ -6,6 +6,8 @@ module Jobs
every 1.hour every 1.hour
def execute(_args) def execute(_args)
return if !SiteSetting.chat_enabled
::Chat::Channel ::Chat::Channel
.where(auto_join_users: true) .where(auto_join_users: true)
.each do |channel| .each do |channel|

View File

@ -6,7 +6,7 @@ module Jobs
every 5.minutes every 5.minutes
def execute(args = {}) def execute(args = {})
return unless ::SiteSetting.chat_enabled return if !SiteSetting.chat_enabled
::Chat::Mailer.send_unread_mentions_summary ::Chat::Mailer.send_unread_mentions_summary
end end

View File

@ -6,6 +6,8 @@ module Jobs
every 15.minutes every 15.minutes
def execute(args = nil) def execute(args = nil)
return if !SiteSetting.chat_enabled
# TODO: Add rebaking of old messages (baked_version < # TODO: Add rebaking of old messages (baked_version <
# Chat::Message::BAKED_VERSION or baked_version IS NULL) # Chat::Message::BAKED_VERSION or baked_version IS NULL)
::Chat::Channel.ensure_consistency! ::Chat::Channel.ensure_consistency!

View File

@ -12,6 +12,8 @@ module Jobs
# channels updated or with new messages in the past N days? Perhaps # channels updated or with new messages in the past N days? Perhaps
# we could update all the counts in a single query as well? # we could update all the counts in a single query as well?
def execute(args = {}) def execute(args = {})
return if !SiteSetting.chat_enabled
::Chat::Channel ::Chat::Channel
.where(status: %i[open closed]) .where(status: %i[open closed])
.find_each { |chat_channel| set_user_count(chat_channel) } .find_each { |chat_channel| set_user_count(chat_channel) }

View File

@ -4,6 +4,8 @@ module Chat
class Thread < ActiveRecord::Base class Thread < ActiveRecord::Base
EXCERPT_LENGTH = 150 EXCERPT_LENGTH = 150
include Chat::ThreadCache
self.table_name = "chat_threads" self.table_name = "chat_threads"
belongs_to :channel, foreign_key: "channel_id", class_name: "Chat::Channel" belongs_to :channel, foreign_key: "channel_id", class_name: "Chat::Channel"
@ -11,7 +13,11 @@ module Chat
belongs_to :original_message, foreign_key: "original_message_id", class_name: "Chat::Message" belongs_to :original_message, foreign_key: "original_message_id", class_name: "Chat::Message"
has_many :chat_messages, has_many :chat_messages,
-> { order("chat_messages.created_at ASC, chat_messages.id ASC") }, -> {
where("deleted_at IS NULL").order(
"chat_messages.created_at ASC, chat_messages.id ASC",
)
},
foreign_key: :thread_id, foreign_key: :thread_id,
primary_key: :id, primary_key: :id,
class_name: "Chat::Message" class_name: "Chat::Message"
@ -50,6 +56,7 @@ module Chat
end end
def self.ensure_consistency! def self.ensure_consistency!
return if !SiteSetting.enable_experimental_chat_threaded_discussions
update_counts update_counts
end end
@ -61,7 +68,7 @@ module Chat
# #
# It is updated eventually via Jobs::Chat::PeriodicalUpdates. In # It is updated eventually via Jobs::Chat::PeriodicalUpdates. In
# future we may want to update this more frequently. # future we may want to update this more frequently.
DB.exec <<~SQL updated_thread_ids = DB.query_single <<~SQL
UPDATE chat_threads threads UPDATE chat_threads threads
SET replies_count = subquery.replies_count SET replies_count = subquery.replies_count
FROM ( FROM (
@ -72,7 +79,10 @@ module Chat
) subquery ) subquery
WHERE threads.id = subquery.thread_id WHERE threads.id = subquery.thread_id
AND subquery.replies_count != threads.replies_count AND subquery.replies_count != threads.replies_count
RETURNING threads.id AS thread_id;
SQL SQL
return if updated_thread_ids.empty?
self.clear_caches!(updated_thread_ids)
end end
end end
end end

View File

@ -0,0 +1,82 @@
# frozen_string_literal: true
module Chat
module ThreadCache
extend ActiveSupport::Concern
class_methods do
def replies_count_cache_updated_at_redis_key(id)
"chat_thread:replies_count_cache_updated_at:#{id}"
end
def replies_count_cache_redis_key(id)
"chat_thread:replies_count_cache:#{id}"
end
def clear_caches!(ids)
ids = Array.wrap(ids)
keys_to_delete =
ids
.map do |id|
[replies_count_cache_redis_key(id), replies_count_cache_updated_at_redis_key(id)]
end
.flatten
Discourse.redis.del(keys_to_delete)
end
end
def replies_count_cache_recently_updated?
replies_count_cache_updated_at.after?(5.minutes.ago)
end
def replies_count_cache_updated_at
Time.at(
Discourse.redis.get(Chat::Thread.replies_count_cache_updated_at_redis_key(self.id)).to_i,
in: Time.zone,
)
end
def replies_count_cache
redis_cache = Discourse.redis.get(Chat::Thread.replies_count_cache_redis_key(self.id))&.to_i
# If the cache is not present for whatever reason, set it to the current value,
# otherwise INCR/DECR will be way off. No need to enqueue the job or publish,
# since this is likely fetched by a serializer.
if !redis_cache.present?
set_replies_count_redis_cache(self.replies_count)
self.replies_count
else
redis_cache != self.replies_count ? redis_cache : self.replies_count
end
end
def set_replies_count_cache(value, update_db: false)
self.update!(replies_count: value) if update_db
set_replies_count_redis_cache(value)
thread_reply_count_cache_changed
end
def set_replies_count_redis_cache(value)
Discourse.redis.setex(
Chat::Thread.replies_count_cache_redis_key(self.id),
5.minutes.from_now.to_i,
value,
)
end
def increment_replies_count_cache
Discourse.redis.incr(Chat::Thread.replies_count_cache_redis_key(self.id))
thread_reply_count_cache_changed
end
def decrement_replies_count_cache
Discourse.redis.decr(Chat::Thread.replies_count_cache_redis_key(self.id))
thread_reply_count_cache_changed
end
def thread_reply_count_cache_changed
Jobs.enqueue_in(5.seconds, Jobs::Chat::UpdateThreadReplyCount, thread_id: self.id)
::Chat::Publisher.publish_thread_original_message_metadata!(self)
end
end
end

View File

@ -166,7 +166,7 @@ module Chat
end end
def thread_reply_count def thread_reply_count
object.thread&.replies_count || 0 object.thread&.replies_count_cache || 0
end end
end end
end end

View File

@ -3,7 +3,7 @@
module Chat module Chat
module Publisher module Publisher
def self.new_messages_message_bus_channel(chat_channel_id) def self.new_messages_message_bus_channel(chat_channel_id)
"/chat/#{chat_channel_id}/new-messages" "#{root_message_bus_channel(chat_channel_id)}/new-messages"
end end
def self.root_message_bus_channel(chat_channel_id) def self.root_message_bus_channel(chat_channel_id)
@ -35,31 +35,11 @@ module Chat
def self.publish_new!(chat_channel, chat_message, staged_id) def self.publish_new!(chat_channel, chat_message, staged_id)
message_bus_targets = calculate_publish_targets(chat_channel, chat_message) message_bus_targets = calculate_publish_targets(chat_channel, chat_message)
publish_to_targets!(
content = message_bus_targets,
Chat::MessageSerializer.new( chat_channel,
chat_message, serialize_message_with_type(chat_message, :sent).merge(staged_id: staged_id),
{ scope: anonymous_guardian, root: :chat_message }, )
).as_json
content[:type] = :sent
content[:staged_id] = staged_id
permissions = permissions(chat_channel)
message_bus_targets.each do |message_bus_channel|
MessageBus.publish(message_bus_channel, content.as_json, permissions)
end
if chat_message.thread_reply?
MessageBus.publish(
root_message_bus_channel(chat_channel.id),
{
type: :update_thread_original_message,
original_message_id: chat_message.thread.original_message_id,
action: :increment_reply_count,
}.as_json,
permissions,
)
end
# NOTE: This means that the read count is only updated in the client # NOTE: This means that the read count is only updated in the client
# for new messages in the main channel stream, maybe in future we want to # for new messages in the main channel stream, maybe in future we want to
@ -74,84 +54,67 @@ module Chat
username: chat_message.user.username, username: chat_message.user.username,
thread_id: chat_message.thread_id, thread_id: chat_message.thread_id,
}, },
permissions, permissions(chat_channel),
) )
end end
end end
def self.publish_thread_created!(chat_channel, chat_message) def self.publish_thread_original_message_metadata!(thread)
content = publish_to_channel!(
Chat::MessageSerializer.new( thread.channel,
chat_message, {
{ scope: anonymous_guardian, root: :chat_message }, type: :update_thread_original_message,
).as_json original_message_id: thread.original_message_id,
content[:type] = :thread_created replies_count: thread.replies_count_cache,
permissions = permissions(chat_channel) },
)
end
MessageBus.publish(root_message_bus_channel(chat_channel.id), content.as_json, permissions) def self.publish_thread_created!(chat_channel, chat_message)
publish_to_channel!(chat_channel, serialize_message_with_type(chat_message, :thread_created))
end end
def self.publish_processed!(chat_message) def self.publish_processed!(chat_message)
chat_channel = chat_message.chat_channel chat_channel = chat_message.chat_channel
message_bus_targets = calculate_publish_targets(chat_channel, chat_message) message_bus_targets = calculate_publish_targets(chat_channel, chat_message)
publish_to_targets!(
content = { message_bus_targets,
type: :processed, chat_channel,
chat_message: { { type: :processed, chat_message: { id: chat_message.id, cooked: chat_message.cooked } },
id: chat_message.id, )
cooked: chat_message.cooked,
},
}
message_bus_targets.each do |message_bus_channel|
MessageBus.publish(message_bus_channel, content.as_json, permissions(chat_channel))
end
end end
def self.publish_edit!(chat_channel, chat_message) def self.publish_edit!(chat_channel, chat_message)
message_bus_targets = calculate_publish_targets(chat_channel, chat_message) message_bus_targets = calculate_publish_targets(chat_channel, chat_message)
publish_to_targets!(
content = message_bus_targets,
Chat::MessageSerializer.new( chat_channel,
chat_message, serialize_message_with_type(chat_message, :edit),
{ scope: anonymous_guardian, root: :chat_message }, )
).as_json
content[:type] = :edit
message_bus_targets.each do |message_bus_channel|
MessageBus.publish(message_bus_channel, content.as_json, permissions(chat_channel))
end
end end
def self.publish_refresh!(chat_channel, chat_message) def self.publish_refresh!(chat_channel, chat_message)
message_bus_targets = calculate_publish_targets(chat_channel, chat_message) message_bus_targets = calculate_publish_targets(chat_channel, chat_message)
publish_to_targets!(
content = message_bus_targets,
Chat::MessageSerializer.new( chat_channel,
chat_message, serialize_message_with_type(chat_message, :refresh),
{ scope: anonymous_guardian, root: :chat_message }, )
).as_json
content[:type] = :refresh
message_bus_targets.each do |message_bus_channel|
MessageBus.publish(message_bus_channel, content.as_json, permissions(chat_channel))
end
end end
def self.publish_reaction!(chat_channel, chat_message, action, user, emoji) def self.publish_reaction!(chat_channel, chat_message, action, user, emoji)
message_bus_targets = calculate_publish_targets(chat_channel, chat_message) message_bus_targets = calculate_publish_targets(chat_channel, chat_message)
publish_to_targets!(
content = { message_bus_targets,
action: action, chat_channel,
user: BasicUserSerializer.new(user, root: false).as_json, {
emoji: emoji, action: action,
type: :reaction, user: BasicUserSerializer.new(user, root: false).as_json,
chat_message_id: chat_message.id, emoji: emoji,
} type: :reaction,
chat_message_id: chat_message.id,
message_bus_targets.each do |message_bus_channel| },
MessageBus.publish(message_bus_channel, content.as_json, permissions(chat_channel)) )
end
end end
def self.publish_presence!(chat_channel, user, typ) def self.publish_presence!(chat_channel, user, typ)
@ -160,28 +123,26 @@ module Chat
def self.publish_delete!(chat_channel, chat_message) def self.publish_delete!(chat_channel, chat_message)
message_bus_targets = calculate_publish_targets(chat_channel, chat_message) message_bus_targets = calculate_publish_targets(chat_channel, chat_message)
publish_to_targets!(
message_bus_targets.each do |message_bus_channel| message_bus_targets,
MessageBus.publish( chat_channel,
message_bus_channel, { type: "delete", deleted_id: chat_message.id, deleted_at: chat_message.deleted_at },
{ type: "delete", deleted_id: chat_message.id, deleted_at: chat_message.deleted_at }, )
permissions(chat_channel),
)
end
end end
def self.publish_bulk_delete!(chat_channel, deleted_message_ids) def self.publish_bulk_delete!(chat_channel, deleted_message_ids)
channel_permissions = permissions(chat_channel)
Chat::Thread Chat::Thread
.grouped_messages(message_ids: deleted_message_ids) .grouped_messages(message_ids: deleted_message_ids)
.each do |group| .each do |group|
MessageBus.publish( MessageBus.publish(
thread_message_bus_channel(chat_channel.id, group.thread_id), thread_message_bus_channel(chat_channel.id, group.thread_id),
{ {
type: "bulk_delete", type: :bulk_delete,
deleted_ids: group.thread_message_ids, deleted_ids: group.thread_message_ids,
deleted_at: Time.zone.now, deleted_at: Time.zone.now,
}, },
permissions(chat_channel), channel_permissions,
) )
# Don't need to publish to the main channel if the messages deleted # Don't need to publish to the main channel if the messages deleted
@ -193,54 +154,74 @@ module Chat
return if deleted_message_ids.empty? return if deleted_message_ids.empty?
MessageBus.publish( publish_to_channel!(
root_message_bus_channel(chat_channel.id), chat_channel,
{ type: "bulk_delete", deleted_ids: deleted_message_ids, deleted_at: Time.zone.now }, { type: :bulk_delete, deleted_ids: deleted_message_ids, deleted_at: Time.zone.now },
permissions(chat_channel),
) )
end end
def self.publish_restore!(chat_channel, chat_message) def self.publish_restore!(chat_channel, chat_message)
message_bus_targets = calculate_publish_targets(chat_channel, chat_message) message_bus_targets = calculate_publish_targets(chat_channel, chat_message)
publish_to_targets!(
content = message_bus_targets,
Chat::MessageSerializer.new( chat_channel,
chat_message, serialize_message_with_type(chat_message, :restore),
{ scope: anonymous_guardian, root: :chat_message }, )
).as_json
content[:type] = :restore
message_bus_targets.each do |message_bus_channel|
MessageBus.publish(message_bus_channel, content.as_json, permissions(chat_channel))
end
end end
def self.publish_flag!(chat_message, user, reviewable, score) def self.publish_flag!(chat_message, user, reviewable, score)
message_bus_targets = calculate_publish_targets(chat_message.chat_channel, chat_message) message_bus_targets = calculate_publish_targets(chat_message.chat_channel, chat_message)
message_bus_targets.each do |message_bus_channel| # Publish to user who created flag
# Publish to user who created flag publish_to_targets!(
MessageBus.publish( message_bus_targets,
message_bus_channel, chat_message.chat_channel,
{ {
type: "self_flagged", type: :self_flagged,
user_flag_status: score.status_for_database, user_flag_status: score.status_for_database,
chat_message_id: chat_message.id, chat_message_id: chat_message.id,
}.as_json, },
permissions: {
user_ids: [user.id], user_ids: [user.id],
) },
end )
message_bus_targets.each do |message_bus_channel| # Publish flag with link to reviewable to staff
# Publish flag with link to reviewable to staff publish_to_targets!(
message_bus_targets,
chat_message.chat_channel,
{ type: :flag, chat_message_id: chat_message.id, reviewable_id: reviewable.id },
permissions: {
group_ids: [Group::AUTO_GROUPS[:staff]],
},
)
end
def self.publish_to_channel!(channel, payload)
MessageBus.publish(
root_message_bus_channel(channel.id),
payload.as_json,
permissions(channel),
)
end
def self.publish_to_targets!(targets, channel, payload, permissions: nil)
targets.each do |message_bus_channel|
MessageBus.publish( MessageBus.publish(
message_bus_channel, message_bus_channel,
{ type: "flag", chat_message_id: chat_message.id, reviewable_id: reviewable.id }.as_json, payload.as_json,
group_ids: [Group::AUTO_GROUPS[:staff]], permissions || permissions(channel),
) )
end end
end end
def self.serialize_message_with_type(chat_message, type)
Chat::MessageSerializer
.new(chat_message, { scope: anonymous_guardian, root: :chat_message })
.as_json
.merge(type: type)
end
def self.user_tracking_state_message_bus_channel(user_id) def self.user_tracking_state_message_bus_channel(user_id)
"/chat/user-tracking-state/#{user_id}" "/chat/user-tracking-state/#{user_id}"
end end

View File

@ -0,0 +1,63 @@
# frozen_string_literal: true
module Chat
# Service responsible for restoring a trashed chat message
# for a channel and ensuring that the client and read state is
# updated.
#
# @example
# Chat::RestoreMessage.call(message_id: 2, channel_id: 1, guardian: guardian)
#
class RestoreMessage
include Service::Base
# @!method call(message_id:, channel_id:, guardian:)
# @param [Integer] message_id
# @param [Integer] channel_id
# @param [Guardian] guardian
# @return [Service::Base::Context]
contract
model :message
policy :invalid_access
transaction do
step :restore_message
step :update_thread_reply_cache
end
step :publish_events
# @!visibility private
class Contract
attribute :message_id, :integer
attribute :channel_id, :integer
validates :message_id, presence: true
validates :channel_id, presence: true
end
private
def fetch_message(contract:, **)
Chat::Message
.with_deleted
.includes(chat_channel: :chatable)
.find_by(id: contract.message_id, chat_channel_id: contract.channel_id)
end
def invalid_access(guardian:, message:, **)
guardian.can_restore_chat?(message, message.chat_channel.chatable)
end
def restore_message(message:, **)
message.recover!
end
def update_thread_reply_cache(message:, **)
message.thread&.increment_replies_count_cache
end
def publish_events(guardian:, message:, **)
DiscourseEvent.trigger(:chat_message_restored, message, message.chat_channel, guardian.user)
Chat::Publisher.publish_restore!(message.chat_channel, message)
end
end
end

View File

@ -24,6 +24,7 @@ module Chat
step :trash_message step :trash_message
step :destroy_mentions step :destroy_mentions
step :update_tracking_state step :update_tracking_state
step :update_thread_reply_cache
end end
step :publish_events step :publish_events
@ -62,6 +63,10 @@ module Chat
) )
end end
def update_thread_reply_cache(message:, **)
message.thread&.decrement_replies_count_cache
end
def publish_events(guardian:, message:, **) def publish_events(guardian:, message:, **)
DiscourseEvent.trigger(:chat_message_trashed, message, message.chat_channel, guardian.user) DiscourseEvent.trigger(:chat_message_trashed, message, message.chat_channel, guardian.user)
Chat::Publisher.publish_delete!(message.chat_channel, message) Chat::Publisher.publish_delete!(message.chat_channel, message)

View File

@ -350,10 +350,9 @@ export default class ChatApi extends Service {
* @param {number} messageId - The ID of the message being restored. * @param {number} messageId - The ID of the message being restored.
*/ */
restoreMessage(channelId, messageId) { restoreMessage(channelId, messageId) {
// TODO (martin) Not ideal, this should have a chat API controller endpoint. return this.#putRequest(
return ajax(`/chat/${channelId}/restore/${messageId}`, { `/channels/${channelId}/messages/${messageId}/restore`
type: "PUT", );
});
} }
/** /**

View File

@ -37,11 +37,8 @@ export default class ChatChannelPaneSubscriptionsManager extends ChatPaneBaseSub
handleThreadOriginalMessageUpdate(data) { handleThreadOriginalMessageUpdate(data) {
const message = this.messagesManager.findMessage(data.original_message_id); const message = this.messagesManager.findMessage(data.original_message_id);
if (message) { if (message) {
if (data.action === "increment_reply_count") { if (data.replies_count) {
// TODO (martin) In future we should use a replies_count delivered message.threadReplyCount = data.replies_count;
// from the server and simply update the message accordingly, for
// now we don't have an accurate enough count for this.
message.threadReplyCount += 1;
} }
} }
} }

View File

@ -30,6 +30,7 @@ Chat::Engine.routes.draw do
get "/channels/:channel_id/threads/:thread_id" => "channel_threads#show" get "/channels/:channel_id/threads/:thread_id" => "channel_threads#show"
put "/channels/:channel_id/messages/:message_id/restore" => "channel_messages#restore"
delete "/channels/:channel_id/messages/:message_id" => "channel_messages#destroy" delete "/channels/:channel_id/messages/:message_id" => "channel_messages#destroy"
end end
@ -61,7 +62,6 @@ Chat::Engine.routes.draw do
put "/:chat_channel_id/:message_id/rebake" => "chat#rebake" put "/:chat_channel_id/:message_id/rebake" => "chat#rebake"
post "/:chat_channel_id/:message_id/flag" => "chat#flag" post "/:chat_channel_id/:message_id/flag" => "chat#flag"
post "/:chat_channel_id/quote" => "chat#quote_messages" post "/:chat_channel_id/quote" => "chat#quote_messages"
put "/:chat_channel_id/restore/:message_id" => "chat#restore"
get "/lookup/:message_id" => "chat#lookup_message" get "/lookup/:message_id" => "chat#lookup_message"
put "/:chat_channel_id/read/:message_id" => "chat#update_user_last_read" put "/:chat_channel_id/read/:message_id" => "chat#update_user_last_read"
put "/user_chat_enabled/:user_id" => "chat#set_user_chat_status" put "/user_chat_enabled/:user_id" => "chat#set_user_chat_status"

View File

@ -58,6 +58,7 @@ module Chat
@chat_message.attach_uploads(uploads) @chat_message.attach_uploads(uploads)
Chat::Draft.where(user_id: @user.id, chat_channel_id: @chat_channel.id).destroy_all Chat::Draft.where(user_id: @user.id, chat_channel_id: @chat_channel.id).destroy_all
Chat::Publisher.publish_new!(@chat_channel, @chat_message, @staged_id) Chat::Publisher.publish_new!(@chat_channel, @chat_message, @staged_id)
resolved_thread&.increment_replies_count_cache
Jobs.enqueue(Jobs::Chat::ProcessMessage, { chat_message_id: @chat_message.id }) Jobs.enqueue(Jobs::Chat::ProcessMessage, { chat_message_id: @chat_message.id })
Chat::Notifier.notify_new(chat_message: @chat_message, timestamp: @chat_message.created_at) Chat::Notifier.notify_new(chat_message: @chat_message, timestamp: @chat_message.created_at)
@chat_channel.touch(:last_message_sent_at) @chat_channel.touch(:last_message_sent_at)
@ -206,5 +207,9 @@ module Chat
WHERE thread_id IS NULL AND chat_messages.id = thread_updater.id WHERE thread_id IS NULL AND chat_messages.id = thread_updater.id
SQL SQL
end end
def resolved_thread
@existing_thread || @chat_message.thread
end
end end
end end

View File

@ -0,0 +1,68 @@
# frozen_string_literal: true
RSpec.describe "Chat::Thread replies_count cache accuracy" do
include ActiveSupport::Testing::TimeHelpers
fab!(:user) { Fabricate(:user) }
fab!(:thread) { Fabricate(:chat_thread) }
before do
SiteSetting.chat_enabled = true
SiteSetting.enable_experimental_chat_threaded_discussions = true
end
it "keeps an accurate replies_count cache" do
freeze_time
Jobs.run_immediately!
expect(thread.replies_count).to eq(0)
expect(thread.replies_count_cache).to eq(0)
# Create 5 replies
5.times do |i|
Chat::MessageCreator.create(
chat_channel: thread.channel,
user: user,
thread_id: thread.id,
content: "Hello world #{i}",
)
end
# The job only runs to completion if the cache has not been recently
# updated, so the DB count will only be 1.
expect(thread.replies_count_cache).to eq(5)
expect(thread.reload.replies_count).to eq(1)
# Travel to the future so the cache expires.
travel_to 6.minutes.from_now
Chat::MessageCreator.create(
chat_channel: thread.channel,
user: user,
thread_id: thread.id,
content: "Hello world now that time has passed",
)
expect(thread.replies_count_cache).to eq(6)
expect(thread.reload.replies_count).to eq(6)
# Lose the cache intentionally.
Chat::Thread.clear_caches!(thread.id)
message_to_destroy = thread.replies.last
Chat::TrashMessage.call(
message_id: message_to_destroy.id,
channel_id: thread.channel_id,
guardian: Guardian.new(user),
)
expect(thread.replies_count_cache).to eq(5)
expect(thread.reload.replies_count).to eq(5)
# Lose the cache intentionally.
Chat::Thread.clear_caches!(thread.id)
Chat::RestoreMessage.call(
message_id: message_to_destroy.id,
channel_id: thread.channel_id,
guardian: Guardian.new(user),
)
expect(thread.replies_count_cache).to eq(6)
expect(thread.reload.replies_count).to eq(6)
end
end

View File

@ -0,0 +1,55 @@
# frozen_string_literal: true
RSpec.describe Jobs::Chat::UpdateThreadReplyCount do
fab!(:thread) { Fabricate(:chat_thread) }
fab!(:message_1) { Fabricate(:chat_message, thread: thread) }
fab!(:message_2) { Fabricate(:chat_message, thread: thread) }
before do
Chat::Thread.clear_caches!(thread.id)
SiteSetting.enable_experimental_chat_threaded_discussions = true
end
it "does nothing if enable_experimental_chat_threaded_discussions is false" do
SiteSetting.enable_experimental_chat_threaded_discussions = false
Chat::Thread.any_instance.expects(:set_replies_count_cache).never
described_class.new.execute(thread_id: thread.id)
end
it "does not error if the thread is deleted" do
id = thread.id
thread.destroy!
expect { described_class.new.execute(thread_id: id) }.not_to raise_error
end
it "does not set the reply count in the DB if it has been changed recently" do
described_class.new.execute(thread_id: thread.id)
expect(thread.reload.replies_count).to eq(2)
Fabricate(:chat_message, thread: thread)
described_class.new.execute(thread_id: thread.id)
expect(thread.reload.replies_count).to eq(2)
end
it "sets the updated_at cache to the current time" do
freeze_time
described_class.new.execute(thread_id: thread.id)
expect(thread.replies_count_cache_updated_at).to eq_time(
Time.at(Time.zone.now.to_i, in: Time.zone),
)
end
it "publishes the thread original message metadata" do
messages =
MessageBus.track_publish("/chat/#{thread.channel_id}") do
described_class.new.execute(thread_id: thread.id)
end
expect(messages.first.data).to eq(
{
"original_message_id" => thread.original_message_id,
"replies_count" => 2,
"type" => "update_thread_original_message",
},
)
end
end

View File

@ -76,9 +76,9 @@ describe Chat::MessageMover do
deleted_messages = Chat::Message.with_deleted.where(id: move_message_ids).order(:id) deleted_messages = Chat::Message.with_deleted.where(id: move_message_ids).order(:id)
expect(deleted_messages.count).to eq(3) expect(deleted_messages.count).to eq(3)
expect(messages.first.channel).to eq("/chat/#{source_channel.id}") expect(messages.first.channel).to eq("/chat/#{source_channel.id}")
expect(messages.first.data[:type]).to eq("bulk_delete") expect(messages.first.data["type"]).to eq("bulk_delete")
expect(messages.first.data[:deleted_ids]).to eq(deleted_messages.map(&:id)) expect(messages.first.data["deleted_ids"]).to eq(deleted_messages.map(&:id))
expect(messages.first.data[:deleted_at]).not_to eq(nil) expect(messages.first.data["deleted_at"]).not_to eq(nil)
end end
it "creates a message in the source channel to indicate that the messages have been moved" do it "creates a message in the source channel to indicate that the messages have been moved" do

View File

@ -280,9 +280,9 @@ describe Chat::ReviewQueue do
end end
.map(&:data) .map(&:data)
delete_msg = messages.detect { |m| m[:type] == "delete" } delete_msg = messages.detect { |m| m["type"] == "delete" }
expect(delete_msg[:deleted_id]).to eq(message.id) expect(delete_msg["deleted_id"]).to eq(message.id)
end end
it "agrees with other flags on the same message" do it "agrees with other flags on the same message" do

View File

@ -1,6 +1,11 @@
# frozen_string_literal: true # frozen_string_literal: true
RSpec.describe Chat::Thread do RSpec.describe Chat::Thread do
before do
SiteSetting.chat_enabled = true
SiteSetting.enable_experimental_chat_threaded_discussions = true
end
describe ".ensure_consistency!" do describe ".ensure_consistency!" do
fab!(:channel) { Fabricate(:category_channel) } fab!(:channel) { Fabricate(:category_channel) }
fab!(:thread_1) { Fabricate(:chat_thread, channel: channel) } fab!(:thread_1) { Fabricate(:chat_thread, channel: channel) }
@ -40,6 +45,69 @@ RSpec.describe Chat::Thread do
described_class.ensure_consistency! described_class.ensure_consistency!
expect(thread_1.reload.replies_count).to eq(0) expect(thread_1.reload.replies_count).to eq(0)
end end
it "clears the affected replies_count caches" do
thread_1.set_replies_count_cache(100)
expect(thread_1.replies_count_cache).to eq(100)
expect(thread_1.replies_count_cache_updated_at).not_to eq(nil)
described_class.ensure_consistency!
expect(Discourse.redis.get(Chat::Thread.replies_count_cache_redis_key(thread_1.id))).to eq(
nil,
)
expect(
Discourse.redis.get(Chat::Thread.replies_count_cache_updated_at_redis_key(thread_1.id)),
).to eq(nil)
end
it "does not attempt to clear caches if no replies_count caches are updated" do
described_class.ensure_consistency!
Chat::Thread.expects(:clear_caches!).never
described_class.ensure_consistency!
end
it "does nothing if threads are disabled" do
SiteSetting.enable_experimental_chat_threaded_discussions = false
Chat::Thread.expects(:update_counts).never
described_class.ensure_consistency!
end
end
end
describe ".clear_caches" do
fab!(:channel) { Fabricate(:category_channel) }
fab!(:thread_1) { Fabricate(:chat_thread, channel: channel) }
fab!(:thread_2) { Fabricate(:chat_thread, channel: channel) }
before do
thread_1.set_replies_count_cache(100)
thread_2.set_replies_count_cache(100)
end
it "clears multiple keys" do
Chat::Thread.clear_caches!([thread_1.id, thread_2.id])
expect(Discourse.redis.get(Chat::Thread.replies_count_cache_redis_key(thread_1.id))).to eq(
nil,
)
expect(
Discourse.redis.get(Chat::Thread.replies_count_cache_updated_at_redis_key(thread_1.id)),
).to eq(nil)
expect(Discourse.redis.get(Chat::Thread.replies_count_cache_redis_key(thread_2.id))).to eq(
nil,
)
expect(
Discourse.redis.get(Chat::Thread.replies_count_cache_updated_at_redis_key(thread_2.id)),
).to eq(nil)
end
it "wraps the ids into an array if only an integer is provided" do
Chat::Thread.clear_caches!(thread_1.id)
expect(Discourse.redis.get(Chat::Thread.replies_count_cache_redis_key(thread_1.id))).to eq(
nil,
)
expect(
Discourse.redis.get(Chat::Thread.replies_count_cache_updated_at_redis_key(thread_1.id)),
).to eq(nil)
end end
end end

View File

@ -49,7 +49,7 @@ module ChatSystemHelpers
last_message = creator.chat_message last_message = creator.chat_message
end end
last_message.thread.update!(replies_count: messages_count - 1) last_message.thread.set_replies_count_cache(messages_count - 1, update_db: true)
last_message.thread last_message.thread
end end
end end

View File

@ -106,4 +106,95 @@ RSpec.describe Chat::Api::ChannelMessagesController do
end end
end end
end end
describe "#restore" do
RSpec.shared_examples "chat_message_restoration" do
it "doesn't allow a user to restore another user's message" do
sign_in(other_user)
put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json"
expect(response.status).to eq(403)
end
it "allows a user to restore their own messages" do
sign_in(current_user)
put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json"
expect(response.status).to eq(200)
expect(deleted_message.reload.deleted_at).to be_nil
end
it "allows admin to restore others' messages" do
sign_in(admin)
put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json"
expect(response.status).to eq(200)
expect(deleted_message.reload.deleted_at).to be_nil
end
it "does not allow message restore when channel is read_only" do
sign_in(current_user)
chat_channel.update!(status: :read_only)
put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json"
expect(response.status).to eq(403)
expect(deleted_message.reload.deleted_at).not_to be_nil
sign_in(admin)
put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json"
expect(response.status).to eq(403)
end
it "only allows admin to restore when channel is closed" do
sign_in(admin)
chat_channel.update!(status: :read_only)
put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json"
expect(response.status).to eq(403)
expect(deleted_message.reload.deleted_at).not_to be_nil
chat_channel.update!(status: :closed)
put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json"
expect(response.status).to eq(200)
expect(deleted_message.reload.deleted_at).to be_nil
end
end
fab!(:admin) { Fabricate(:admin) }
fab!(:second_user) { Fabricate(:user) }
before do
message =
Chat::Message.create(
user: current_user,
message: "this is a message",
chat_channel: chat_channel,
)
message.trash!
end
let(:deleted_message) do
Chat::Message.unscoped.where(user: current_user, chat_channel: chat_channel).last
end
describe "for category" do
fab!(:category) { Fabricate(:category) }
fab!(:chat_channel) { Fabricate(:category_channel, chatable: category) }
it_behaves_like "chat_message_restoration" do
let(:other_user) { second_user }
end
end
describe "for dm channel" do
fab!(:user_2) { Fabricate(:user) }
fab!(:chat_channel) { Fabricate(:direct_message_channel, users: [current_user, user_2]) }
it_behaves_like "chat_message_restoration" do
let(:other_user) { user_2 }
end
end
end
end end

View File

@ -612,82 +612,6 @@ RSpec.describe Chat::ChatController do
end end
end end
RSpec.shared_examples "chat_message_restoration" do
it "doesn't allow a user to restore another user's message" do
sign_in(other_user)
put "/chat/#{chat_channel.id}/restore/#{Chat::Message.unscoped.last.id}.json"
expect(response.status).to eq(403)
end
it "allows a user to restore their own posts" do
sign_in(user)
deleted_message = Chat::Message.unscoped.last
put "/chat/#{chat_channel.id}/restore/#{deleted_message.id}.json"
expect(response.status).to eq(200)
expect(deleted_message.reload.deleted_at).to be_nil
end
it "allows admin to restore others' posts" do
sign_in(admin)
deleted_message = Chat::Message.unscoped.last
put "/chat/#{chat_channel.id}/restore/#{deleted_message.id}.json"
expect(response.status).to eq(200)
expect(deleted_message.reload.deleted_at).to be_nil
end
it "does not allow message restore when chat channel is read_only" do
sign_in(Chat::Message.last.user)
chat_channel.update!(status: :read_only)
deleted_message = Chat::Message.unscoped.last
put "/chat/#{chat_channel.id}/restore/#{deleted_message.id}.json"
expect(response.status).to eq(403)
expect(deleted_message.reload.deleted_at).not_to be_nil
sign_in(admin)
put "/chat/#{chat_channel.id}/restore/#{deleted_message.id}.json"
expect(response.status).to eq(403)
end
it "only allows admin to restore when chat channel is closed" do
sign_in(admin)
chat_channel.update!(status: :read_only)
deleted_message = Chat::Message.unscoped.last
put "/chat/#{chat_channel.id}/restore/#{deleted_message.id}.json"
expect(response.status).to eq(403)
expect(deleted_message.reload.deleted_at).not_to be_nil
chat_channel.update!(status: :closed)
put "/chat/#{chat_channel.id}/restore/#{deleted_message.id}.json"
expect(response.status).to eq(200)
expect(deleted_message.reload.deleted_at).to be_nil
end
end
describe "#restore" do
fab!(:second_user) { Fabricate(:user) }
before do
message =
Chat::Message.create(user: user, message: "this is a message", chat_channel: chat_channel)
message.trash!
end
describe "for category" do
fab!(:chat_channel) { Fabricate(:category_channel, chatable: category) }
it_behaves_like "chat_message_restoration" do
let(:other_user) { second_user }
end
end
end
describe "react" do describe "react" do
fab!(:chat_channel) { Fabricate(:category_channel) } fab!(:chat_channel) { Fabricate(:category_channel) }
fab!(:chat_message) { Fabricate(:chat_message, chat_channel: chat_channel, user: user) } fab!(:chat_message) { Fabricate(:chat_message, chat_channel: chat_channel, user: user) }

View File

@ -0,0 +1,74 @@
# frozen_string_literal: true
RSpec.describe Chat::RestoreMessage do
fab!(:current_user) { Fabricate(:user) }
let!(:guardian) { Guardian.new(current_user) }
fab!(:message) { Fabricate(:chat_message, user: current_user) }
before { message.trash! }
describe ".call" do
subject(:result) { described_class.call(params) }
context "when params are not valid" do
let(:params) { { guardian: guardian } }
it { is_expected.to fail_a_contract }
end
context "when params are valid" do
let(:params) do
{ guardian: guardian, message_id: message.id, channel_id: message.chat_channel_id }
end
context "when the user does not have permission to restore" do
before { message.update!(user: Fabricate(:admin)) }
it { is_expected.to fail_a_policy(:invalid_access) }
end
context "when the channel does not match the message" do
let(:params) do
{ guardian: guardian, message_id: message.id, channel_id: Fabricate(:chat_channel).id }
end
it { is_expected.to fail_to_find_a_model(:message) }
end
context "when the user has permission to restore" do
it "sets the service result as successful" do
expect(result).to be_a_success
end
it "restores the message" do
result
expect(Chat::Message.find_by(id: message.id)).not_to be_nil
end
it "publishes associated Discourse and MessageBus events" do
freeze_time
messages = nil
event =
DiscourseEvent.track_events { messages = MessageBus.track_publish { result } }.first
expect(event[:event_name]).to eq(:chat_message_restored)
expect(event[:params]).to eq([message, message.chat_channel, current_user])
expect(
messages.find { |m| m.channel == "/chat/#{message.chat_channel_id}" }.data,
).to include({ "type" => "restore" })
end
context "when the message has a thread" do
fab!(:thread) { Fabricate(:chat_thread, channel: message.chat_channel) }
before { message.update!(thread: thread) }
it "increments the thread reply count" do
thread.set_replies_count_cache(1)
result
expect(thread.replies_count_cache).to eq(2)
end
end
end
end
end
end

View File

@ -57,7 +57,11 @@ RSpec.describe Chat::TrashMessage do
expect(event[:event_name]).to eq(:chat_message_trashed) expect(event[:event_name]).to eq(:chat_message_trashed)
expect(event[:params]).to eq([message, message.chat_channel, current_user]) expect(event[:params]).to eq([message, message.chat_channel, current_user])
expect(messages.find { |m| m.channel == "/chat/#{message.chat_channel_id}" }.data).to eq( expect(messages.find { |m| m.channel == "/chat/#{message.chat_channel_id}" }.data).to eq(
{ type: "delete", deleted_id: message.id, deleted_at: Time.zone.now }, {
"type" => "delete",
"deleted_id" => message.id,
"deleted_at" => message.reload.deleted_at.iso8601(3),
},
) )
end end
@ -86,6 +90,18 @@ RSpec.describe Chat::TrashMessage do
expect(membership_3.reload.last_read_message_id).not_to be_nil expect(membership_3.reload.last_read_message_id).not_to be_nil
end end
context "when the message has a thread" do
fab!(:thread) { Fabricate(:chat_thread, channel: message.chat_channel) }
before { message.update!(thread: thread) }
it "decrements the thread reply count" do
thread.set_replies_count_cache(5)
result
expect(thread.replies_count_cache).to eq(4)
end
end
context "when message is already deleted" do context "when message is already deleted" do
before { message.trash! } before { message.trash! }