From 24ec06ff85c7acbad9621092b5e50eec2ede7b83 Mon Sep 17 00:00:00 2001 From: Martin Brennan Date: Mon, 24 Apr 2023 09:32:04 +1000 Subject: [PATCH] FEATURE: Reintroduce better thread reply counter cache (#21197) This was reverted in 38cebd3ed509524ad635adb107163d0496d0c550. The issue was that I was using Discourse.redis.delete_prefixed which does a slow redis KEYS lookup, which is not advised in production. This commit removes that, and also ensures the periodical thread count update only happens if threading is enabled. I changed to use a redis INCR/DECR for reply count cache. This avoids a round trip to redis to GET the current count, and also avoids multi-process issues, where if there's two processes trying to increment at the same time, they may both receive the same value, add one to it, then both write the same value back. Then, it's only n+1 instead of n+2. This also prevents almost all chat scheduled jobs from running if chat is disabled, the only one remaining is the message retention job. --- .../chat/api/channel_messages_controller.rb | 6 + .../app/controllers/chat/chat_controller.rb | 15 +- .../regular/chat/update_thread_reply_count.rb | 24 ++ .../jobs/scheduled/chat/auto_join_users.rb | 2 + .../scheduled/chat/email_notifications.rb | 2 +- .../jobs/scheduled/chat/periodical_updates.rb | 2 + .../chat/update_user_counts_for_channels.rb | 2 + plugins/chat/app/models/chat/thread.rb | 14 +- .../app/models/concerns/chat/thread_cache.rb | 82 +++++++ .../serializers/chat/message_serializer.rb | 2 +- plugins/chat/app/services/chat/publisher.rb | 225 ++++++++---------- .../chat/app/services/chat/restore_message.rb | 63 +++++ .../chat/app/services/chat/trash_message.rb | 5 + .../discourse/services/chat-api.js | 7 +- ...chat-channel-pane-subscriptions-manager.js | 7 +- plugins/chat/config/routes.rb | 2 +- plugins/chat/lib/chat/message_creator.rb | 5 + ...hread_replies_count_cache_accuracy_spec.rb | 68 ++++++ .../regular/update_thread_reply_count_spec.rb | 55 +++++ .../chat/spec/lib/chat/message_mover_spec.rb | 6 +- .../chat/spec/lib/chat/review_queue_spec.rb | 4 +- plugins/chat/spec/models/chat/thread_spec.rb | 68 ++++++ plugins/chat/spec/plugin_helper.rb | 2 +- .../chat/api/messages_controller_spec.rb | 91 +++++++ .../spec/requests/chat_controller_spec.rb | 76 ------ .../services/chat/restore_message_spec.rb | 74 ++++++ .../spec/services/chat/trash_message_spec.rb | 18 +- 27 files changed, 694 insertions(+), 233 deletions(-) create mode 100644 plugins/chat/app/jobs/regular/chat/update_thread_reply_count.rb create mode 100644 plugins/chat/app/models/concerns/chat/thread_cache.rb create mode 100644 plugins/chat/app/services/chat/restore_message.rb create mode 100644 plugins/chat/spec/integration/thread_replies_count_cache_accuracy_spec.rb create mode 100644 plugins/chat/spec/jobs/regular/update_thread_reply_count_spec.rb create mode 100644 plugins/chat/spec/services/chat/restore_message_spec.rb diff --git a/plugins/chat/app/controllers/chat/api/channel_messages_controller.rb b/plugins/chat/app/controllers/chat/api/channel_messages_controller.rb index 067d4a9213d..5496aae6e9c 100644 --- a/plugins/chat/app/controllers/chat/api/channel_messages_controller.rb +++ b/plugins/chat/app/controllers/chat/api/channel_messages_controller.rb @@ -4,4 +4,10 @@ class Chat::Api::ChannelMessagesController < Chat::ApiController def destroy with_service(Chat::TrashMessage) { on_model_not_found(:message) { raise Discourse::NotFound } } end + + def restore + with_service(Chat::RestoreMessage) do + on_model_not_found(:message) { raise Discourse::NotFound } + end + end end diff --git a/plugins/chat/app/controllers/chat/chat_controller.rb b/plugins/chat/app/controllers/chat/chat_controller.rb index db3ae4bcf47..0a5ec268eb8 100644 --- a/plugins/chat/app/controllers/chat/chat_controller.rb +++ b/plugins/chat/app/controllers/chat/chat_controller.rb @@ -12,8 +12,7 @@ module Chat # these endpoints require a standalone find because they need to be # able to get deleted channels and recover them. before_action :find_chatable, only: %i[enable_chat disable_chat] - before_action :find_chat_message, - only: %i[delete restore lookup_message edit_message rebake message_link] + before_action :find_chat_message, only: %i[lookup_message edit_message rebake message_link] before_action :set_channel_and_chatable_with_access_check, except: %i[ respond @@ -233,18 +232,6 @@ module Chat render json: success_json end - def restore - chat_channel = @message.chat_channel - guardian.ensure_can_restore_chat!(@message, chat_channel.chatable) - updated = @message.recover! - if updated - Chat::Publisher.publish_restore!(chat_channel, @message) - render json: success_json - else - render_json_error(@message) - end - end - def rebake guardian.ensure_can_rebake_chat_message!(@message) @message.rebake!(invalidate_oneboxes: true) diff --git a/plugins/chat/app/jobs/regular/chat/update_thread_reply_count.rb b/plugins/chat/app/jobs/regular/chat/update_thread_reply_count.rb new file mode 100644 index 00000000000..d367e489846 --- /dev/null +++ b/plugins/chat/app/jobs/regular/chat/update_thread_reply_count.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +module Jobs + module Chat + class UpdateThreadReplyCount < Jobs::Base + def execute(args = {}) + return if !SiteSetting.enable_experimental_chat_threaded_discussions + + thread = ::Chat::Thread.find_by(id: args[:thread_id]) + return if thread.blank? + return if thread.replies_count_cache_recently_updated? + + Discourse.redis.setex( + ::Chat::Thread.replies_count_cache_updated_at_redis_key(thread.id), + 5.minutes.from_now.to_i, + Time.zone.now.to_i, + ) + thread.set_replies_count_cache(thread.replies.count, update_db: true) + + ::Chat::Publisher.publish_thread_original_message_metadata!(thread) + end + end + end +end diff --git a/plugins/chat/app/jobs/scheduled/chat/auto_join_users.rb b/plugins/chat/app/jobs/scheduled/chat/auto_join_users.rb index c22ee543fec..5d2c50938bf 100644 --- a/plugins/chat/app/jobs/scheduled/chat/auto_join_users.rb +++ b/plugins/chat/app/jobs/scheduled/chat/auto_join_users.rb @@ -6,6 +6,8 @@ module Jobs every 1.hour def execute(_args) + return if !SiteSetting.chat_enabled + ::Chat::Channel .where(auto_join_users: true) .each do |channel| diff --git a/plugins/chat/app/jobs/scheduled/chat/email_notifications.rb b/plugins/chat/app/jobs/scheduled/chat/email_notifications.rb index a5adac40c0c..4b88c7122d3 100644 --- a/plugins/chat/app/jobs/scheduled/chat/email_notifications.rb +++ b/plugins/chat/app/jobs/scheduled/chat/email_notifications.rb @@ -6,7 +6,7 @@ module Jobs every 5.minutes def execute(args = {}) - return unless ::SiteSetting.chat_enabled + return if !SiteSetting.chat_enabled ::Chat::Mailer.send_unread_mentions_summary end diff --git a/plugins/chat/app/jobs/scheduled/chat/periodical_updates.rb b/plugins/chat/app/jobs/scheduled/chat/periodical_updates.rb index e2faabe637c..5879ebec095 100644 --- a/plugins/chat/app/jobs/scheduled/chat/periodical_updates.rb +++ b/plugins/chat/app/jobs/scheduled/chat/periodical_updates.rb @@ -6,6 +6,8 @@ module Jobs every 15.minutes def execute(args = nil) + return if !SiteSetting.chat_enabled + # TODO: Add rebaking of old messages (baked_version < # Chat::Message::BAKED_VERSION or baked_version IS NULL) ::Chat::Channel.ensure_consistency! diff --git a/plugins/chat/app/jobs/scheduled/chat/update_user_counts_for_channels.rb b/plugins/chat/app/jobs/scheduled/chat/update_user_counts_for_channels.rb index 4478bd4f910..5dff311bd63 100644 --- a/plugins/chat/app/jobs/scheduled/chat/update_user_counts_for_channels.rb +++ b/plugins/chat/app/jobs/scheduled/chat/update_user_counts_for_channels.rb @@ -12,6 +12,8 @@ module Jobs # channels updated or with new messages in the past N days? Perhaps # we could update all the counts in a single query as well? def execute(args = {}) + return if !SiteSetting.chat_enabled + ::Chat::Channel .where(status: %i[open closed]) .find_each { |chat_channel| set_user_count(chat_channel) } diff --git a/plugins/chat/app/models/chat/thread.rb b/plugins/chat/app/models/chat/thread.rb index 6b1eafedaea..60c08b61525 100644 --- a/plugins/chat/app/models/chat/thread.rb +++ b/plugins/chat/app/models/chat/thread.rb @@ -4,6 +4,8 @@ module Chat class Thread < ActiveRecord::Base EXCERPT_LENGTH = 150 + include Chat::ThreadCache + self.table_name = "chat_threads" belongs_to :channel, foreign_key: "channel_id", class_name: "Chat::Channel" @@ -11,7 +13,11 @@ module Chat belongs_to :original_message, foreign_key: "original_message_id", class_name: "Chat::Message" has_many :chat_messages, - -> { order("chat_messages.created_at ASC, chat_messages.id ASC") }, + -> { + where("deleted_at IS NULL").order( + "chat_messages.created_at ASC, chat_messages.id ASC", + ) + }, foreign_key: :thread_id, primary_key: :id, class_name: "Chat::Message" @@ -50,6 +56,7 @@ module Chat end def self.ensure_consistency! + return if !SiteSetting.enable_experimental_chat_threaded_discussions update_counts end @@ -61,7 +68,7 @@ module Chat # # It is updated eventually via Jobs::Chat::PeriodicalUpdates. In # future we may want to update this more frequently. - DB.exec <<~SQL + updated_thread_ids = DB.query_single <<~SQL UPDATE chat_threads threads SET replies_count = subquery.replies_count FROM ( @@ -72,7 +79,10 @@ module Chat ) subquery WHERE threads.id = subquery.thread_id AND subquery.replies_count != threads.replies_count + RETURNING threads.id AS thread_id; SQL + return if updated_thread_ids.empty? + self.clear_caches!(updated_thread_ids) end end end diff --git a/plugins/chat/app/models/concerns/chat/thread_cache.rb b/plugins/chat/app/models/concerns/chat/thread_cache.rb new file mode 100644 index 00000000000..a87a1c7666d --- /dev/null +++ b/plugins/chat/app/models/concerns/chat/thread_cache.rb @@ -0,0 +1,82 @@ +# frozen_string_literal: true + +module Chat + module ThreadCache + extend ActiveSupport::Concern + + class_methods do + def replies_count_cache_updated_at_redis_key(id) + "chat_thread:replies_count_cache_updated_at:#{id}" + end + + def replies_count_cache_redis_key(id) + "chat_thread:replies_count_cache:#{id}" + end + + def clear_caches!(ids) + ids = Array.wrap(ids) + keys_to_delete = + ids + .map do |id| + [replies_count_cache_redis_key(id), replies_count_cache_updated_at_redis_key(id)] + end + .flatten + Discourse.redis.del(keys_to_delete) + end + end + + def replies_count_cache_recently_updated? + replies_count_cache_updated_at.after?(5.minutes.ago) + end + + def replies_count_cache_updated_at + Time.at( + Discourse.redis.get(Chat::Thread.replies_count_cache_updated_at_redis_key(self.id)).to_i, + in: Time.zone, + ) + end + + def replies_count_cache + redis_cache = Discourse.redis.get(Chat::Thread.replies_count_cache_redis_key(self.id))&.to_i + + # If the cache is not present for whatever reason, set it to the current value, + # otherwise INCR/DECR will be way off. No need to enqueue the job or publish, + # since this is likely fetched by a serializer. + if !redis_cache.present? + set_replies_count_redis_cache(self.replies_count) + self.replies_count + else + redis_cache != self.replies_count ? redis_cache : self.replies_count + end + end + + def set_replies_count_cache(value, update_db: false) + self.update!(replies_count: value) if update_db + set_replies_count_redis_cache(value) + thread_reply_count_cache_changed + end + + def set_replies_count_redis_cache(value) + Discourse.redis.setex( + Chat::Thread.replies_count_cache_redis_key(self.id), + 5.minutes.from_now.to_i, + value, + ) + end + + def increment_replies_count_cache + Discourse.redis.incr(Chat::Thread.replies_count_cache_redis_key(self.id)) + thread_reply_count_cache_changed + end + + def decrement_replies_count_cache + Discourse.redis.decr(Chat::Thread.replies_count_cache_redis_key(self.id)) + thread_reply_count_cache_changed + end + + def thread_reply_count_cache_changed + Jobs.enqueue_in(5.seconds, Jobs::Chat::UpdateThreadReplyCount, thread_id: self.id) + ::Chat::Publisher.publish_thread_original_message_metadata!(self) + end + end +end diff --git a/plugins/chat/app/serializers/chat/message_serializer.rb b/plugins/chat/app/serializers/chat/message_serializer.rb index 58d32b85edb..6c91a6f3f61 100644 --- a/plugins/chat/app/serializers/chat/message_serializer.rb +++ b/plugins/chat/app/serializers/chat/message_serializer.rb @@ -166,7 +166,7 @@ module Chat end def thread_reply_count - object.thread&.replies_count || 0 + object.thread&.replies_count_cache || 0 end end end diff --git a/plugins/chat/app/services/chat/publisher.rb b/plugins/chat/app/services/chat/publisher.rb index 6d63f395b73..a14bc288dcd 100644 --- a/plugins/chat/app/services/chat/publisher.rb +++ b/plugins/chat/app/services/chat/publisher.rb @@ -3,7 +3,7 @@ module Chat module Publisher def self.new_messages_message_bus_channel(chat_channel_id) - "/chat/#{chat_channel_id}/new-messages" + "#{root_message_bus_channel(chat_channel_id)}/new-messages" end def self.root_message_bus_channel(chat_channel_id) @@ -35,31 +35,11 @@ module Chat def self.publish_new!(chat_channel, chat_message, staged_id) message_bus_targets = calculate_publish_targets(chat_channel, chat_message) - - content = - Chat::MessageSerializer.new( - chat_message, - { scope: anonymous_guardian, root: :chat_message }, - ).as_json - content[:type] = :sent - content[:staged_id] = staged_id - permissions = permissions(chat_channel) - - message_bus_targets.each do |message_bus_channel| - MessageBus.publish(message_bus_channel, content.as_json, permissions) - end - - if chat_message.thread_reply? - MessageBus.publish( - root_message_bus_channel(chat_channel.id), - { - type: :update_thread_original_message, - original_message_id: chat_message.thread.original_message_id, - action: :increment_reply_count, - }.as_json, - permissions, - ) - end + publish_to_targets!( + message_bus_targets, + chat_channel, + serialize_message_with_type(chat_message, :sent).merge(staged_id: staged_id), + ) # NOTE: This means that the read count is only updated in the client # for new messages in the main channel stream, maybe in future we want to @@ -74,84 +54,67 @@ module Chat username: chat_message.user.username, thread_id: chat_message.thread_id, }, - permissions, + permissions(chat_channel), ) end end - def self.publish_thread_created!(chat_channel, chat_message) - content = - Chat::MessageSerializer.new( - chat_message, - { scope: anonymous_guardian, root: :chat_message }, - ).as_json - content[:type] = :thread_created - permissions = permissions(chat_channel) + def self.publish_thread_original_message_metadata!(thread) + publish_to_channel!( + thread.channel, + { + type: :update_thread_original_message, + original_message_id: thread.original_message_id, + replies_count: thread.replies_count_cache, + }, + ) + end - MessageBus.publish(root_message_bus_channel(chat_channel.id), content.as_json, permissions) + def self.publish_thread_created!(chat_channel, chat_message) + publish_to_channel!(chat_channel, serialize_message_with_type(chat_message, :thread_created)) end def self.publish_processed!(chat_message) chat_channel = chat_message.chat_channel message_bus_targets = calculate_publish_targets(chat_channel, chat_message) - - content = { - type: :processed, - chat_message: { - id: chat_message.id, - cooked: chat_message.cooked, - }, - } - - message_bus_targets.each do |message_bus_channel| - MessageBus.publish(message_bus_channel, content.as_json, permissions(chat_channel)) - end + publish_to_targets!( + message_bus_targets, + chat_channel, + { type: :processed, chat_message: { id: chat_message.id, cooked: chat_message.cooked } }, + ) end def self.publish_edit!(chat_channel, chat_message) message_bus_targets = calculate_publish_targets(chat_channel, chat_message) - - content = - Chat::MessageSerializer.new( - chat_message, - { scope: anonymous_guardian, root: :chat_message }, - ).as_json - content[:type] = :edit - - message_bus_targets.each do |message_bus_channel| - MessageBus.publish(message_bus_channel, content.as_json, permissions(chat_channel)) - end + publish_to_targets!( + message_bus_targets, + chat_channel, + serialize_message_with_type(chat_message, :edit), + ) end def self.publish_refresh!(chat_channel, chat_message) message_bus_targets = calculate_publish_targets(chat_channel, chat_message) - - content = - Chat::MessageSerializer.new( - chat_message, - { scope: anonymous_guardian, root: :chat_message }, - ).as_json - content[:type] = :refresh - - message_bus_targets.each do |message_bus_channel| - MessageBus.publish(message_bus_channel, content.as_json, permissions(chat_channel)) - end + publish_to_targets!( + message_bus_targets, + chat_channel, + serialize_message_with_type(chat_message, :refresh), + ) end def self.publish_reaction!(chat_channel, chat_message, action, user, emoji) message_bus_targets = calculate_publish_targets(chat_channel, chat_message) - - content = { - action: action, - user: BasicUserSerializer.new(user, root: false).as_json, - emoji: emoji, - type: :reaction, - chat_message_id: chat_message.id, - } - - message_bus_targets.each do |message_bus_channel| - MessageBus.publish(message_bus_channel, content.as_json, permissions(chat_channel)) - end + publish_to_targets!( + message_bus_targets, + chat_channel, + { + action: action, + user: BasicUserSerializer.new(user, root: false).as_json, + emoji: emoji, + type: :reaction, + chat_message_id: chat_message.id, + }, + ) end def self.publish_presence!(chat_channel, user, typ) @@ -160,28 +123,26 @@ module Chat def self.publish_delete!(chat_channel, chat_message) message_bus_targets = calculate_publish_targets(chat_channel, chat_message) - - message_bus_targets.each do |message_bus_channel| - MessageBus.publish( - message_bus_channel, - { type: "delete", deleted_id: chat_message.id, deleted_at: chat_message.deleted_at }, - permissions(chat_channel), - ) - end + publish_to_targets!( + message_bus_targets, + chat_channel, + { type: "delete", deleted_id: chat_message.id, deleted_at: chat_message.deleted_at }, + ) end def self.publish_bulk_delete!(chat_channel, deleted_message_ids) + channel_permissions = permissions(chat_channel) Chat::Thread .grouped_messages(message_ids: deleted_message_ids) .each do |group| MessageBus.publish( thread_message_bus_channel(chat_channel.id, group.thread_id), { - type: "bulk_delete", + type: :bulk_delete, deleted_ids: group.thread_message_ids, deleted_at: Time.zone.now, }, - permissions(chat_channel), + channel_permissions, ) # Don't need to publish to the main channel if the messages deleted @@ -193,54 +154,74 @@ module Chat return if deleted_message_ids.empty? - MessageBus.publish( - root_message_bus_channel(chat_channel.id), - { type: "bulk_delete", deleted_ids: deleted_message_ids, deleted_at: Time.zone.now }, - permissions(chat_channel), + publish_to_channel!( + chat_channel, + { type: :bulk_delete, deleted_ids: deleted_message_ids, deleted_at: Time.zone.now }, ) end def self.publish_restore!(chat_channel, chat_message) message_bus_targets = calculate_publish_targets(chat_channel, chat_message) - - content = - Chat::MessageSerializer.new( - chat_message, - { scope: anonymous_guardian, root: :chat_message }, - ).as_json - content[:type] = :restore - - message_bus_targets.each do |message_bus_channel| - MessageBus.publish(message_bus_channel, content.as_json, permissions(chat_channel)) - end + publish_to_targets!( + message_bus_targets, + chat_channel, + serialize_message_with_type(chat_message, :restore), + ) end def self.publish_flag!(chat_message, user, reviewable, score) message_bus_targets = calculate_publish_targets(chat_message.chat_channel, chat_message) - message_bus_targets.each do |message_bus_channel| - # Publish to user who created flag - MessageBus.publish( - message_bus_channel, - { - type: "self_flagged", - user_flag_status: score.status_for_database, - chat_message_id: chat_message.id, - }.as_json, + # Publish to user who created flag + publish_to_targets!( + message_bus_targets, + chat_message.chat_channel, + { + type: :self_flagged, + user_flag_status: score.status_for_database, + chat_message_id: chat_message.id, + }, + permissions: { user_ids: [user.id], - ) - end + }, + ) - message_bus_targets.each do |message_bus_channel| - # Publish flag with link to reviewable to staff + # Publish flag with link to reviewable to staff + publish_to_targets!( + message_bus_targets, + chat_message.chat_channel, + { type: :flag, chat_message_id: chat_message.id, reviewable_id: reviewable.id }, + permissions: { + group_ids: [Group::AUTO_GROUPS[:staff]], + }, + ) + end + + def self.publish_to_channel!(channel, payload) + MessageBus.publish( + root_message_bus_channel(channel.id), + payload.as_json, + permissions(channel), + ) + end + + def self.publish_to_targets!(targets, channel, payload, permissions: nil) + targets.each do |message_bus_channel| MessageBus.publish( message_bus_channel, - { type: "flag", chat_message_id: chat_message.id, reviewable_id: reviewable.id }.as_json, - group_ids: [Group::AUTO_GROUPS[:staff]], + payload.as_json, + permissions || permissions(channel), ) end end + def self.serialize_message_with_type(chat_message, type) + Chat::MessageSerializer + .new(chat_message, { scope: anonymous_guardian, root: :chat_message }) + .as_json + .merge(type: type) + end + def self.user_tracking_state_message_bus_channel(user_id) "/chat/user-tracking-state/#{user_id}" end diff --git a/plugins/chat/app/services/chat/restore_message.rb b/plugins/chat/app/services/chat/restore_message.rb new file mode 100644 index 00000000000..fa1a6b173b8 --- /dev/null +++ b/plugins/chat/app/services/chat/restore_message.rb @@ -0,0 +1,63 @@ +# frozen_string_literal: true + +module Chat + # Service responsible for restoring a trashed chat message + # for a channel and ensuring that the client and read state is + # updated. + # + # @example + # Chat::RestoreMessage.call(message_id: 2, channel_id: 1, guardian: guardian) + # + class RestoreMessage + include Service::Base + + # @!method call(message_id:, channel_id:, guardian:) + # @param [Integer] message_id + # @param [Integer] channel_id + # @param [Guardian] guardian + # @return [Service::Base::Context] + + contract + model :message + policy :invalid_access + transaction do + step :restore_message + step :update_thread_reply_cache + end + step :publish_events + + # @!visibility private + class Contract + attribute :message_id, :integer + attribute :channel_id, :integer + validates :message_id, presence: true + validates :channel_id, presence: true + end + + private + + def fetch_message(contract:, **) + Chat::Message + .with_deleted + .includes(chat_channel: :chatable) + .find_by(id: contract.message_id, chat_channel_id: contract.channel_id) + end + + def invalid_access(guardian:, message:, **) + guardian.can_restore_chat?(message, message.chat_channel.chatable) + end + + def restore_message(message:, **) + message.recover! + end + + def update_thread_reply_cache(message:, **) + message.thread&.increment_replies_count_cache + end + + def publish_events(guardian:, message:, **) + DiscourseEvent.trigger(:chat_message_restored, message, message.chat_channel, guardian.user) + Chat::Publisher.publish_restore!(message.chat_channel, message) + end + end +end diff --git a/plugins/chat/app/services/chat/trash_message.rb b/plugins/chat/app/services/chat/trash_message.rb index 2139a8a0f77..71bf33a7b63 100644 --- a/plugins/chat/app/services/chat/trash_message.rb +++ b/plugins/chat/app/services/chat/trash_message.rb @@ -24,6 +24,7 @@ module Chat step :trash_message step :destroy_mentions step :update_tracking_state + step :update_thread_reply_cache end step :publish_events @@ -62,6 +63,10 @@ module Chat ) end + def update_thread_reply_cache(message:, **) + message.thread&.decrement_replies_count_cache + end + def publish_events(guardian:, message:, **) DiscourseEvent.trigger(:chat_message_trashed, message, message.chat_channel, guardian.user) Chat::Publisher.publish_delete!(message.chat_channel, message) diff --git a/plugins/chat/assets/javascripts/discourse/services/chat-api.js b/plugins/chat/assets/javascripts/discourse/services/chat-api.js index f459a5faab5..7be02b5ed3c 100644 --- a/plugins/chat/assets/javascripts/discourse/services/chat-api.js +++ b/plugins/chat/assets/javascripts/discourse/services/chat-api.js @@ -350,10 +350,9 @@ export default class ChatApi extends Service { * @param {number} messageId - The ID of the message being restored. */ restoreMessage(channelId, messageId) { - // TODO (martin) Not ideal, this should have a chat API controller endpoint. - return ajax(`/chat/${channelId}/restore/${messageId}`, { - type: "PUT", - }); + return this.#putRequest( + `/channels/${channelId}/messages/${messageId}/restore` + ); } /** diff --git a/plugins/chat/assets/javascripts/discourse/services/chat-channel-pane-subscriptions-manager.js b/plugins/chat/assets/javascripts/discourse/services/chat-channel-pane-subscriptions-manager.js index e8d1080baa5..93a218eb2af 100644 --- a/plugins/chat/assets/javascripts/discourse/services/chat-channel-pane-subscriptions-manager.js +++ b/plugins/chat/assets/javascripts/discourse/services/chat-channel-pane-subscriptions-manager.js @@ -37,11 +37,8 @@ export default class ChatChannelPaneSubscriptionsManager extends ChatPaneBaseSub handleThreadOriginalMessageUpdate(data) { const message = this.messagesManager.findMessage(data.original_message_id); if (message) { - if (data.action === "increment_reply_count") { - // TODO (martin) In future we should use a replies_count delivered - // from the server and simply update the message accordingly, for - // now we don't have an accurate enough count for this. - message.threadReplyCount += 1; + if (data.replies_count) { + message.threadReplyCount = data.replies_count; } } } diff --git a/plugins/chat/config/routes.rb b/plugins/chat/config/routes.rb index 4fed17209d4..53d1a723fb6 100644 --- a/plugins/chat/config/routes.rb +++ b/plugins/chat/config/routes.rb @@ -30,6 +30,7 @@ Chat::Engine.routes.draw do get "/channels/:channel_id/threads/:thread_id" => "channel_threads#show" + put "/channels/:channel_id/messages/:message_id/restore" => "channel_messages#restore" delete "/channels/:channel_id/messages/:message_id" => "channel_messages#destroy" end @@ -61,7 +62,6 @@ Chat::Engine.routes.draw do put "/:chat_channel_id/:message_id/rebake" => "chat#rebake" post "/:chat_channel_id/:message_id/flag" => "chat#flag" post "/:chat_channel_id/quote" => "chat#quote_messages" - put "/:chat_channel_id/restore/:message_id" => "chat#restore" get "/lookup/:message_id" => "chat#lookup_message" put "/:chat_channel_id/read/:message_id" => "chat#update_user_last_read" put "/user_chat_enabled/:user_id" => "chat#set_user_chat_status" diff --git a/plugins/chat/lib/chat/message_creator.rb b/plugins/chat/lib/chat/message_creator.rb index d5fc84a941c..9a608bd51e4 100644 --- a/plugins/chat/lib/chat/message_creator.rb +++ b/plugins/chat/lib/chat/message_creator.rb @@ -58,6 +58,7 @@ module Chat @chat_message.attach_uploads(uploads) Chat::Draft.where(user_id: @user.id, chat_channel_id: @chat_channel.id).destroy_all Chat::Publisher.publish_new!(@chat_channel, @chat_message, @staged_id) + resolved_thread&.increment_replies_count_cache Jobs.enqueue(Jobs::Chat::ProcessMessage, { chat_message_id: @chat_message.id }) Chat::Notifier.notify_new(chat_message: @chat_message, timestamp: @chat_message.created_at) @chat_channel.touch(:last_message_sent_at) @@ -206,5 +207,9 @@ module Chat WHERE thread_id IS NULL AND chat_messages.id = thread_updater.id SQL end + + def resolved_thread + @existing_thread || @chat_message.thread + end end end diff --git a/plugins/chat/spec/integration/thread_replies_count_cache_accuracy_spec.rb b/plugins/chat/spec/integration/thread_replies_count_cache_accuracy_spec.rb new file mode 100644 index 00000000000..4f01fe2db00 --- /dev/null +++ b/plugins/chat/spec/integration/thread_replies_count_cache_accuracy_spec.rb @@ -0,0 +1,68 @@ +# frozen_string_literal: true + +RSpec.describe "Chat::Thread replies_count cache accuracy" do + include ActiveSupport::Testing::TimeHelpers + + fab!(:user) { Fabricate(:user) } + fab!(:thread) { Fabricate(:chat_thread) } + + before do + SiteSetting.chat_enabled = true + SiteSetting.enable_experimental_chat_threaded_discussions = true + end + + it "keeps an accurate replies_count cache" do + freeze_time + Jobs.run_immediately! + + expect(thread.replies_count).to eq(0) + expect(thread.replies_count_cache).to eq(0) + + # Create 5 replies + 5.times do |i| + Chat::MessageCreator.create( + chat_channel: thread.channel, + user: user, + thread_id: thread.id, + content: "Hello world #{i}", + ) + end + + # The job only runs to completion if the cache has not been recently + # updated, so the DB count will only be 1. + expect(thread.replies_count_cache).to eq(5) + expect(thread.reload.replies_count).to eq(1) + + # Travel to the future so the cache expires. + travel_to 6.minutes.from_now + Chat::MessageCreator.create( + chat_channel: thread.channel, + user: user, + thread_id: thread.id, + content: "Hello world now that time has passed", + ) + expect(thread.replies_count_cache).to eq(6) + expect(thread.reload.replies_count).to eq(6) + + # Lose the cache intentionally. + Chat::Thread.clear_caches!(thread.id) + message_to_destroy = thread.replies.last + Chat::TrashMessage.call( + message_id: message_to_destroy.id, + channel_id: thread.channel_id, + guardian: Guardian.new(user), + ) + expect(thread.replies_count_cache).to eq(5) + expect(thread.reload.replies_count).to eq(5) + + # Lose the cache intentionally. + Chat::Thread.clear_caches!(thread.id) + Chat::RestoreMessage.call( + message_id: message_to_destroy.id, + channel_id: thread.channel_id, + guardian: Guardian.new(user), + ) + expect(thread.replies_count_cache).to eq(6) + expect(thread.reload.replies_count).to eq(6) + end +end diff --git a/plugins/chat/spec/jobs/regular/update_thread_reply_count_spec.rb b/plugins/chat/spec/jobs/regular/update_thread_reply_count_spec.rb new file mode 100644 index 00000000000..d67afa54e52 --- /dev/null +++ b/plugins/chat/spec/jobs/regular/update_thread_reply_count_spec.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true + +RSpec.describe Jobs::Chat::UpdateThreadReplyCount do + fab!(:thread) { Fabricate(:chat_thread) } + fab!(:message_1) { Fabricate(:chat_message, thread: thread) } + fab!(:message_2) { Fabricate(:chat_message, thread: thread) } + + before do + Chat::Thread.clear_caches!(thread.id) + SiteSetting.enable_experimental_chat_threaded_discussions = true + end + + it "does nothing if enable_experimental_chat_threaded_discussions is false" do + SiteSetting.enable_experimental_chat_threaded_discussions = false + Chat::Thread.any_instance.expects(:set_replies_count_cache).never + described_class.new.execute(thread_id: thread.id) + end + + it "does not error if the thread is deleted" do + id = thread.id + thread.destroy! + expect { described_class.new.execute(thread_id: id) }.not_to raise_error + end + + it "does not set the reply count in the DB if it has been changed recently" do + described_class.new.execute(thread_id: thread.id) + expect(thread.reload.replies_count).to eq(2) + Fabricate(:chat_message, thread: thread) + described_class.new.execute(thread_id: thread.id) + expect(thread.reload.replies_count).to eq(2) + end + + it "sets the updated_at cache to the current time" do + freeze_time + described_class.new.execute(thread_id: thread.id) + expect(thread.replies_count_cache_updated_at).to eq_time( + Time.at(Time.zone.now.to_i, in: Time.zone), + ) + end + + it "publishes the thread original message metadata" do + messages = + MessageBus.track_publish("/chat/#{thread.channel_id}") do + described_class.new.execute(thread_id: thread.id) + end + + expect(messages.first.data).to eq( + { + "original_message_id" => thread.original_message_id, + "replies_count" => 2, + "type" => "update_thread_original_message", + }, + ) + end +end diff --git a/plugins/chat/spec/lib/chat/message_mover_spec.rb b/plugins/chat/spec/lib/chat/message_mover_spec.rb index c694b246d9d..aef48ddb241 100644 --- a/plugins/chat/spec/lib/chat/message_mover_spec.rb +++ b/plugins/chat/spec/lib/chat/message_mover_spec.rb @@ -76,9 +76,9 @@ describe Chat::MessageMover do deleted_messages = Chat::Message.with_deleted.where(id: move_message_ids).order(:id) expect(deleted_messages.count).to eq(3) expect(messages.first.channel).to eq("/chat/#{source_channel.id}") - expect(messages.first.data[:type]).to eq("bulk_delete") - expect(messages.first.data[:deleted_ids]).to eq(deleted_messages.map(&:id)) - expect(messages.first.data[:deleted_at]).not_to eq(nil) + expect(messages.first.data["type"]).to eq("bulk_delete") + expect(messages.first.data["deleted_ids"]).to eq(deleted_messages.map(&:id)) + expect(messages.first.data["deleted_at"]).not_to eq(nil) end it "creates a message in the source channel to indicate that the messages have been moved" do diff --git a/plugins/chat/spec/lib/chat/review_queue_spec.rb b/plugins/chat/spec/lib/chat/review_queue_spec.rb index 672b7dff64d..beed58049cf 100644 --- a/plugins/chat/spec/lib/chat/review_queue_spec.rb +++ b/plugins/chat/spec/lib/chat/review_queue_spec.rb @@ -280,9 +280,9 @@ describe Chat::ReviewQueue do end .map(&:data) - delete_msg = messages.detect { |m| m[:type] == "delete" } + delete_msg = messages.detect { |m| m["type"] == "delete" } - expect(delete_msg[:deleted_id]).to eq(message.id) + expect(delete_msg["deleted_id"]).to eq(message.id) end it "agrees with other flags on the same message" do diff --git a/plugins/chat/spec/models/chat/thread_spec.rb b/plugins/chat/spec/models/chat/thread_spec.rb index f48a4f04f94..55fa93bfc3b 100644 --- a/plugins/chat/spec/models/chat/thread_spec.rb +++ b/plugins/chat/spec/models/chat/thread_spec.rb @@ -1,6 +1,11 @@ # frozen_string_literal: true RSpec.describe Chat::Thread do + before do + SiteSetting.chat_enabled = true + SiteSetting.enable_experimental_chat_threaded_discussions = true + end + describe ".ensure_consistency!" do fab!(:channel) { Fabricate(:category_channel) } fab!(:thread_1) { Fabricate(:chat_thread, channel: channel) } @@ -40,6 +45,69 @@ RSpec.describe Chat::Thread do described_class.ensure_consistency! expect(thread_1.reload.replies_count).to eq(0) end + + it "clears the affected replies_count caches" do + thread_1.set_replies_count_cache(100) + expect(thread_1.replies_count_cache).to eq(100) + expect(thread_1.replies_count_cache_updated_at).not_to eq(nil) + + described_class.ensure_consistency! + expect(Discourse.redis.get(Chat::Thread.replies_count_cache_redis_key(thread_1.id))).to eq( + nil, + ) + expect( + Discourse.redis.get(Chat::Thread.replies_count_cache_updated_at_redis_key(thread_1.id)), + ).to eq(nil) + end + + it "does not attempt to clear caches if no replies_count caches are updated" do + described_class.ensure_consistency! + Chat::Thread.expects(:clear_caches!).never + described_class.ensure_consistency! + end + + it "does nothing if threads are disabled" do + SiteSetting.enable_experimental_chat_threaded_discussions = false + Chat::Thread.expects(:update_counts).never + described_class.ensure_consistency! + end + end + end + + describe ".clear_caches" do + fab!(:channel) { Fabricate(:category_channel) } + fab!(:thread_1) { Fabricate(:chat_thread, channel: channel) } + fab!(:thread_2) { Fabricate(:chat_thread, channel: channel) } + + before do + thread_1.set_replies_count_cache(100) + thread_2.set_replies_count_cache(100) + end + + it "clears multiple keys" do + Chat::Thread.clear_caches!([thread_1.id, thread_2.id]) + expect(Discourse.redis.get(Chat::Thread.replies_count_cache_redis_key(thread_1.id))).to eq( + nil, + ) + expect( + Discourse.redis.get(Chat::Thread.replies_count_cache_updated_at_redis_key(thread_1.id)), + ).to eq(nil) + expect(Discourse.redis.get(Chat::Thread.replies_count_cache_redis_key(thread_2.id))).to eq( + nil, + ) + expect( + Discourse.redis.get(Chat::Thread.replies_count_cache_updated_at_redis_key(thread_2.id)), + ).to eq(nil) + end + + it "wraps the ids into an array if only an integer is provided" do + Chat::Thread.clear_caches!(thread_1.id) + expect(Discourse.redis.get(Chat::Thread.replies_count_cache_redis_key(thread_1.id))).to eq( + nil, + ) + expect( + Discourse.redis.get(Chat::Thread.replies_count_cache_updated_at_redis_key(thread_1.id)), + ).to eq(nil) end end diff --git a/plugins/chat/spec/plugin_helper.rb b/plugins/chat/spec/plugin_helper.rb index 6dfd5c66c3b..22616ee9a24 100644 --- a/plugins/chat/spec/plugin_helper.rb +++ b/plugins/chat/spec/plugin_helper.rb @@ -49,7 +49,7 @@ module ChatSystemHelpers last_message = creator.chat_message end - last_message.thread.update!(replies_count: messages_count - 1) + last_message.thread.set_replies_count_cache(messages_count - 1, update_db: true) last_message.thread end end diff --git a/plugins/chat/spec/requests/chat/api/messages_controller_spec.rb b/plugins/chat/spec/requests/chat/api/messages_controller_spec.rb index c29c25ff596..749c760f631 100644 --- a/plugins/chat/spec/requests/chat/api/messages_controller_spec.rb +++ b/plugins/chat/spec/requests/chat/api/messages_controller_spec.rb @@ -106,4 +106,95 @@ RSpec.describe Chat::Api::ChannelMessagesController do end end end + + describe "#restore" do + RSpec.shared_examples "chat_message_restoration" do + it "doesn't allow a user to restore another user's message" do + sign_in(other_user) + + put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json" + expect(response.status).to eq(403) + end + + it "allows a user to restore their own messages" do + sign_in(current_user) + + put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json" + expect(response.status).to eq(200) + expect(deleted_message.reload.deleted_at).to be_nil + end + + it "allows admin to restore others' messages" do + sign_in(admin) + + put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json" + expect(response.status).to eq(200) + expect(deleted_message.reload.deleted_at).to be_nil + end + + it "does not allow message restore when channel is read_only" do + sign_in(current_user) + + chat_channel.update!(status: :read_only) + + put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json" + expect(response.status).to eq(403) + expect(deleted_message.reload.deleted_at).not_to be_nil + + sign_in(admin) + put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json" + expect(response.status).to eq(403) + end + + it "only allows admin to restore when channel is closed" do + sign_in(admin) + + chat_channel.update!(status: :read_only) + + put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json" + expect(response.status).to eq(403) + expect(deleted_message.reload.deleted_at).not_to be_nil + + chat_channel.update!(status: :closed) + put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json" + expect(response.status).to eq(200) + expect(deleted_message.reload.deleted_at).to be_nil + end + end + + fab!(:admin) { Fabricate(:admin) } + fab!(:second_user) { Fabricate(:user) } + + before do + message = + Chat::Message.create( + user: current_user, + message: "this is a message", + chat_channel: chat_channel, + ) + message.trash! + end + + let(:deleted_message) do + Chat::Message.unscoped.where(user: current_user, chat_channel: chat_channel).last + end + + describe "for category" do + fab!(:category) { Fabricate(:category) } + fab!(:chat_channel) { Fabricate(:category_channel, chatable: category) } + + it_behaves_like "chat_message_restoration" do + let(:other_user) { second_user } + end + end + + describe "for dm channel" do + fab!(:user_2) { Fabricate(:user) } + fab!(:chat_channel) { Fabricate(:direct_message_channel, users: [current_user, user_2]) } + + it_behaves_like "chat_message_restoration" do + let(:other_user) { user_2 } + end + end + end end diff --git a/plugins/chat/spec/requests/chat_controller_spec.rb b/plugins/chat/spec/requests/chat_controller_spec.rb index abaa63a34ce..573474f8a5d 100644 --- a/plugins/chat/spec/requests/chat_controller_spec.rb +++ b/plugins/chat/spec/requests/chat_controller_spec.rb @@ -612,82 +612,6 @@ RSpec.describe Chat::ChatController do end end - RSpec.shared_examples "chat_message_restoration" do - it "doesn't allow a user to restore another user's message" do - sign_in(other_user) - - put "/chat/#{chat_channel.id}/restore/#{Chat::Message.unscoped.last.id}.json" - expect(response.status).to eq(403) - end - - it "allows a user to restore their own posts" do - sign_in(user) - - deleted_message = Chat::Message.unscoped.last - put "/chat/#{chat_channel.id}/restore/#{deleted_message.id}.json" - expect(response.status).to eq(200) - expect(deleted_message.reload.deleted_at).to be_nil - end - - it "allows admin to restore others' posts" do - sign_in(admin) - - deleted_message = Chat::Message.unscoped.last - put "/chat/#{chat_channel.id}/restore/#{deleted_message.id}.json" - expect(response.status).to eq(200) - expect(deleted_message.reload.deleted_at).to be_nil - end - - it "does not allow message restore when chat channel is read_only" do - sign_in(Chat::Message.last.user) - - chat_channel.update!(status: :read_only) - - deleted_message = Chat::Message.unscoped.last - put "/chat/#{chat_channel.id}/restore/#{deleted_message.id}.json" - expect(response.status).to eq(403) - expect(deleted_message.reload.deleted_at).not_to be_nil - - sign_in(admin) - put "/chat/#{chat_channel.id}/restore/#{deleted_message.id}.json" - expect(response.status).to eq(403) - end - - it "only allows admin to restore when chat channel is closed" do - sign_in(admin) - - chat_channel.update!(status: :read_only) - - deleted_message = Chat::Message.unscoped.last - put "/chat/#{chat_channel.id}/restore/#{deleted_message.id}.json" - expect(response.status).to eq(403) - expect(deleted_message.reload.deleted_at).not_to be_nil - - chat_channel.update!(status: :closed) - put "/chat/#{chat_channel.id}/restore/#{deleted_message.id}.json" - expect(response.status).to eq(200) - expect(deleted_message.reload.deleted_at).to be_nil - end - end - - describe "#restore" do - fab!(:second_user) { Fabricate(:user) } - - before do - message = - Chat::Message.create(user: user, message: "this is a message", chat_channel: chat_channel) - message.trash! - end - - describe "for category" do - fab!(:chat_channel) { Fabricate(:category_channel, chatable: category) } - - it_behaves_like "chat_message_restoration" do - let(:other_user) { second_user } - end - end - end - describe "react" do fab!(:chat_channel) { Fabricate(:category_channel) } fab!(:chat_message) { Fabricate(:chat_message, chat_channel: chat_channel, user: user) } diff --git a/plugins/chat/spec/services/chat/restore_message_spec.rb b/plugins/chat/spec/services/chat/restore_message_spec.rb new file mode 100644 index 00000000000..80a1c077abd --- /dev/null +++ b/plugins/chat/spec/services/chat/restore_message_spec.rb @@ -0,0 +1,74 @@ +# frozen_string_literal: true + +RSpec.describe Chat::RestoreMessage do + fab!(:current_user) { Fabricate(:user) } + let!(:guardian) { Guardian.new(current_user) } + fab!(:message) { Fabricate(:chat_message, user: current_user) } + + before { message.trash! } + + describe ".call" do + subject(:result) { described_class.call(params) } + + context "when params are not valid" do + let(:params) { { guardian: guardian } } + + it { is_expected.to fail_a_contract } + end + + context "when params are valid" do + let(:params) do + { guardian: guardian, message_id: message.id, channel_id: message.chat_channel_id } + end + + context "when the user does not have permission to restore" do + before { message.update!(user: Fabricate(:admin)) } + + it { is_expected.to fail_a_policy(:invalid_access) } + end + + context "when the channel does not match the message" do + let(:params) do + { guardian: guardian, message_id: message.id, channel_id: Fabricate(:chat_channel).id } + end + + it { is_expected.to fail_to_find_a_model(:message) } + end + + context "when the user has permission to restore" do + it "sets the service result as successful" do + expect(result).to be_a_success + end + + it "restores the message" do + result + expect(Chat::Message.find_by(id: message.id)).not_to be_nil + end + + it "publishes associated Discourse and MessageBus events" do + freeze_time + messages = nil + event = + DiscourseEvent.track_events { messages = MessageBus.track_publish { result } }.first + expect(event[:event_name]).to eq(:chat_message_restored) + expect(event[:params]).to eq([message, message.chat_channel, current_user]) + expect( + messages.find { |m| m.channel == "/chat/#{message.chat_channel_id}" }.data, + ).to include({ "type" => "restore" }) + end + + context "when the message has a thread" do + fab!(:thread) { Fabricate(:chat_thread, channel: message.chat_channel) } + + before { message.update!(thread: thread) } + + it "increments the thread reply count" do + thread.set_replies_count_cache(1) + result + expect(thread.replies_count_cache).to eq(2) + end + end + end + end + end +end diff --git a/plugins/chat/spec/services/chat/trash_message_spec.rb b/plugins/chat/spec/services/chat/trash_message_spec.rb index 68902d36843..b8d045b600b 100644 --- a/plugins/chat/spec/services/chat/trash_message_spec.rb +++ b/plugins/chat/spec/services/chat/trash_message_spec.rb @@ -57,7 +57,11 @@ RSpec.describe Chat::TrashMessage do expect(event[:event_name]).to eq(:chat_message_trashed) expect(event[:params]).to eq([message, message.chat_channel, current_user]) expect(messages.find { |m| m.channel == "/chat/#{message.chat_channel_id}" }.data).to eq( - { type: "delete", deleted_id: message.id, deleted_at: Time.zone.now }, + { + "type" => "delete", + "deleted_id" => message.id, + "deleted_at" => message.reload.deleted_at.iso8601(3), + }, ) end @@ -86,6 +90,18 @@ RSpec.describe Chat::TrashMessage do expect(membership_3.reload.last_read_message_id).not_to be_nil end + context "when the message has a thread" do + fab!(:thread) { Fabricate(:chat_thread, channel: message.chat_channel) } + + before { message.update!(thread: thread) } + + it "decrements the thread reply count" do + thread.set_replies_count_cache(5) + result + expect(thread.replies_count_cache).to eq(4) + end + end + context "when message is already deleted" do before { message.trash! }