mirror of
https://github.com/discourse/discourse.git
synced 2024-11-22 15:16:08 +08:00
FEATURE: Better thread reply counter cache (#21108)
This commit introduces a redis cache over the top of the thread replies_count DB cache, so that we can quickly and accurately increment/decrement the reply count for all users and not have to constantly update the database-level count. This is done so the UI can have a count that is displayed to the users on each thread indicator, that appears to live update on each chat message create/trash/recover inside the thread. This commit also introduces the `Chat::RestoreMessage` service and moves the restore endpoint into the `Api::ChannelMessages` controller as part of incremental migrations to move things out of ChatController. Finally, this commit refactors `Chat::Publisher` to be less repetitive with its `MessageBus` sending code.
This commit is contained in:
parent
b8583f274d
commit
180e3e11d1
|
@ -4,4 +4,10 @@ class Chat::Api::ChannelMessagesController < Chat::ApiController
|
|||
def destroy
|
||||
with_service(Chat::TrashMessage) { on_model_not_found(:message) { raise Discourse::NotFound } }
|
||||
end
|
||||
|
||||
def restore
|
||||
with_service(Chat::RestoreMessage) do
|
||||
on_model_not_found(:message) { raise Discourse::NotFound }
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -12,8 +12,7 @@ module Chat
|
|||
# these endpoints require a standalone find because they need to be
|
||||
# able to get deleted channels and recover them.
|
||||
before_action :find_chatable, only: %i[enable_chat disable_chat]
|
||||
before_action :find_chat_message,
|
||||
only: %i[delete restore lookup_message edit_message rebake message_link]
|
||||
before_action :find_chat_message, only: %i[lookup_message edit_message rebake message_link]
|
||||
before_action :set_channel_and_chatable_with_access_check,
|
||||
except: %i[
|
||||
respond
|
||||
|
@ -225,18 +224,6 @@ module Chat
|
|||
render json: success_json
|
||||
end
|
||||
|
||||
def restore
|
||||
chat_channel = @message.chat_channel
|
||||
guardian.ensure_can_restore_chat!(@message, chat_channel.chatable)
|
||||
updated = @message.recover!
|
||||
if updated
|
||||
Chat::Publisher.publish_restore!(chat_channel, @message)
|
||||
render json: success_json
|
||||
else
|
||||
render_json_error(@message)
|
||||
end
|
||||
end
|
||||
|
||||
def rebake
|
||||
guardian.ensure_can_rebake_chat_message!(@message)
|
||||
@message.rebake!(invalidate_oneboxes: true)
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
module Jobs
|
||||
module Chat
|
||||
class UpdateThreadReplyCount < Jobs::Base
|
||||
def execute(args = {})
|
||||
thread = ::Chat::Thread.find_by(id: args[:thread_id])
|
||||
return if thread.blank?
|
||||
return if thread.replies_count_cache_recently_updated?
|
||||
|
||||
Discourse.redis.setex(
|
||||
::Chat::Thread.replies_count_cache_updated_at_redis_key(thread.id),
|
||||
5.minutes.from_now.to_i,
|
||||
Time.zone.now.to_i,
|
||||
)
|
||||
thread.set_replies_count_cache(thread.replies.count, update_db: true)
|
||||
|
||||
::Chat::Publisher.publish_thread_original_message_metadata!(thread)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -4,6 +4,8 @@ module Chat
|
|||
class Thread < ActiveRecord::Base
|
||||
EXCERPT_LENGTH = 150
|
||||
|
||||
include Chat::ThreadCache
|
||||
|
||||
self.table_name = "chat_threads"
|
||||
|
||||
belongs_to :channel, foreign_key: "channel_id", class_name: "Chat::Channel"
|
||||
|
@ -11,7 +13,11 @@ module Chat
|
|||
belongs_to :original_message, foreign_key: "original_message_id", class_name: "Chat::Message"
|
||||
|
||||
has_many :chat_messages,
|
||||
-> { order("chat_messages.created_at ASC, chat_messages.id ASC") },
|
||||
-> {
|
||||
where("deleted_at IS NULL").order(
|
||||
"chat_messages.created_at ASC, chat_messages.id ASC",
|
||||
)
|
||||
},
|
||||
foreign_key: :thread_id,
|
||||
primary_key: :id,
|
||||
class_name: "Chat::Message"
|
||||
|
@ -61,7 +67,7 @@ module Chat
|
|||
#
|
||||
# It is updated eventually via Jobs::Chat::PeriodicalUpdates. In
|
||||
# future we may want to update this more frequently.
|
||||
DB.exec <<~SQL
|
||||
updated_thread_ids = DB.query_single <<~SQL
|
||||
UPDATE chat_threads threads
|
||||
SET replies_count = subquery.replies_count
|
||||
FROM (
|
||||
|
@ -72,7 +78,9 @@ module Chat
|
|||
) subquery
|
||||
WHERE threads.id = subquery.thread_id
|
||||
AND subquery.replies_count != threads.replies_count
|
||||
RETURNING threads.id AS thread_id;
|
||||
SQL
|
||||
self.clear_caches!(updated_thread_ids)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
69
plugins/chat/app/models/concerns/chat/thread_cache.rb
Normal file
69
plugins/chat/app/models/concerns/chat/thread_cache.rb
Normal file
|
@ -0,0 +1,69 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
module Chat
|
||||
module ThreadCache
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
class_methods do
|
||||
def replies_count_cache_updated_at_redis_key(id)
|
||||
"chat_thread:replies_count_cache_updated_at:#{id}"
|
||||
end
|
||||
|
||||
def replies_count_cache_redis_key(id)
|
||||
"chat_thread:replies_count_cache:#{id}"
|
||||
end
|
||||
|
||||
def clear_caches!(ids = nil)
|
||||
return Discourse.redis.delete_prefixed("chat_thread:") if ids.blank?
|
||||
ids = Array.wrap(ids)
|
||||
|
||||
keys_to_delete =
|
||||
ids
|
||||
.map do |id|
|
||||
[replies_count_cache_redis_key(id), replies_count_cache_updated_at_redis_key(id)]
|
||||
end
|
||||
.flatten
|
||||
Discourse.redis.del(keys_to_delete)
|
||||
end
|
||||
end
|
||||
|
||||
def replies_count_cache_recently_updated?
|
||||
replies_count_cache_updated_at.after?(5.minutes.ago)
|
||||
end
|
||||
|
||||
def replies_count_cache_updated_at
|
||||
Time.at(
|
||||
Discourse.redis.get(Chat::Thread.replies_count_cache_updated_at_redis_key(self.id)).to_i,
|
||||
in: Time.zone,
|
||||
)
|
||||
end
|
||||
|
||||
def replies_count_cache
|
||||
redis_cache = Discourse.redis.get(Chat::Thread.replies_count_cache_redis_key(self.id))&.to_i
|
||||
if redis_cache.present? && redis_cache != self.replies_count
|
||||
redis_cache
|
||||
else
|
||||
self.replies_count
|
||||
end
|
||||
end
|
||||
|
||||
def set_replies_count_cache(value, update_db: false)
|
||||
self.update!(replies_count: value) if update_db
|
||||
Discourse.redis.setex(
|
||||
Chat::Thread.replies_count_cache_redis_key(self.id),
|
||||
5.minutes.from_now.to_i,
|
||||
value,
|
||||
)
|
||||
Jobs.enqueue_in(5.seconds, Jobs::Chat::UpdateThreadReplyCount, thread_id: self.id)
|
||||
::Chat::Publisher.publish_thread_original_message_metadata!(self)
|
||||
end
|
||||
|
||||
def increment_replies_count_cache
|
||||
self.set_replies_count_cache(self.replies_count_cache + 1)
|
||||
end
|
||||
|
||||
def decrement_replies_count_cache
|
||||
self.set_replies_count_cache(self.replies_count_cache - 1)
|
||||
end
|
||||
end
|
||||
end
|
|
@ -166,7 +166,7 @@ module Chat
|
|||
end
|
||||
|
||||
def thread_reply_count
|
||||
object.thread&.replies_count || 0
|
||||
object.thread&.replies_count_cache || 0
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
module Chat
|
||||
module Publisher
|
||||
def self.new_messages_message_bus_channel(chat_channel_id)
|
||||
"/chat/#{chat_channel_id}/new-messages"
|
||||
"#{root_message_bus_channel(chat_channel_id)}/new-messages"
|
||||
end
|
||||
|
||||
def self.root_message_bus_channel(chat_channel_id)
|
||||
|
@ -31,31 +31,11 @@ module Chat
|
|||
|
||||
def self.publish_new!(chat_channel, chat_message, staged_id)
|
||||
message_bus_targets = calculate_publish_targets(chat_channel, chat_message)
|
||||
|
||||
content =
|
||||
Chat::MessageSerializer.new(
|
||||
chat_message,
|
||||
{ scope: anonymous_guardian, root: :chat_message },
|
||||
).as_json
|
||||
content[:type] = :sent
|
||||
content[:staged_id] = staged_id
|
||||
permissions = permissions(chat_channel)
|
||||
|
||||
message_bus_targets.each do |message_bus_channel|
|
||||
MessageBus.publish(message_bus_channel, content.as_json, permissions)
|
||||
end
|
||||
|
||||
if chat_message.thread_reply?
|
||||
MessageBus.publish(
|
||||
root_message_bus_channel(chat_channel.id),
|
||||
{
|
||||
type: :update_thread_original_message,
|
||||
original_message_id: chat_message.thread.original_message_id,
|
||||
action: :increment_reply_count,
|
||||
}.as_json,
|
||||
permissions,
|
||||
)
|
||||
end
|
||||
publish_to_targets!(
|
||||
message_bus_targets,
|
||||
chat_channel,
|
||||
serialize_message_with_type(chat_message, :sent).merge(staged_id: staged_id),
|
||||
)
|
||||
|
||||
# NOTE: This means that the read count is only updated in the client
|
||||
# for new messages in the main channel stream, maybe in future we want to
|
||||
|
@ -70,84 +50,67 @@ module Chat
|
|||
username: chat_message.user.username,
|
||||
thread_id: chat_message.thread_id,
|
||||
},
|
||||
permissions,
|
||||
permissions(chat_channel),
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
def self.publish_thread_created!(chat_channel, chat_message)
|
||||
content =
|
||||
Chat::MessageSerializer.new(
|
||||
chat_message,
|
||||
{ scope: anonymous_guardian, root: :chat_message },
|
||||
).as_json
|
||||
content[:type] = :thread_created
|
||||
permissions = permissions(chat_channel)
|
||||
def self.publish_thread_original_message_metadata!(thread)
|
||||
publish_to_channel!(
|
||||
thread.channel,
|
||||
{
|
||||
type: :update_thread_original_message,
|
||||
original_message_id: thread.original_message_id,
|
||||
replies_count: thread.replies_count_cache,
|
||||
},
|
||||
)
|
||||
end
|
||||
|
||||
MessageBus.publish(root_message_bus_channel(chat_channel.id), content.as_json, permissions)
|
||||
def self.publish_thread_created!(chat_channel, chat_message)
|
||||
publish_to_channel!(chat_channel, serialize_message_with_type(chat_message, :thread_created))
|
||||
end
|
||||
|
||||
def self.publish_processed!(chat_message)
|
||||
chat_channel = chat_message.chat_channel
|
||||
message_bus_targets = calculate_publish_targets(chat_channel, chat_message)
|
||||
|
||||
content = {
|
||||
type: :processed,
|
||||
chat_message: {
|
||||
id: chat_message.id,
|
||||
cooked: chat_message.cooked,
|
||||
},
|
||||
}
|
||||
|
||||
message_bus_targets.each do |message_bus_channel|
|
||||
MessageBus.publish(message_bus_channel, content.as_json, permissions(chat_channel))
|
||||
end
|
||||
publish_to_targets!(
|
||||
message_bus_targets,
|
||||
chat_channel,
|
||||
{ type: :processed, chat_message: { id: chat_message.id, cooked: chat_message.cooked } },
|
||||
)
|
||||
end
|
||||
|
||||
def self.publish_edit!(chat_channel, chat_message)
|
||||
message_bus_targets = calculate_publish_targets(chat_channel, chat_message)
|
||||
|
||||
content =
|
||||
Chat::MessageSerializer.new(
|
||||
chat_message,
|
||||
{ scope: anonymous_guardian, root: :chat_message },
|
||||
).as_json
|
||||
content[:type] = :edit
|
||||
|
||||
message_bus_targets.each do |message_bus_channel|
|
||||
MessageBus.publish(message_bus_channel, content.as_json, permissions(chat_channel))
|
||||
end
|
||||
publish_to_targets!(
|
||||
message_bus_targets,
|
||||
chat_channel,
|
||||
serialize_message_with_type(chat_message, :edit),
|
||||
)
|
||||
end
|
||||
|
||||
def self.publish_refresh!(chat_channel, chat_message)
|
||||
message_bus_targets = calculate_publish_targets(chat_channel, chat_message)
|
||||
|
||||
content =
|
||||
Chat::MessageSerializer.new(
|
||||
chat_message,
|
||||
{ scope: anonymous_guardian, root: :chat_message },
|
||||
).as_json
|
||||
content[:type] = :refresh
|
||||
|
||||
message_bus_targets.each do |message_bus_channel|
|
||||
MessageBus.publish(message_bus_channel, content.as_json, permissions(chat_channel))
|
||||
end
|
||||
publish_to_targets!(
|
||||
message_bus_targets,
|
||||
chat_channel,
|
||||
serialize_message_with_type(chat_message, :refresh),
|
||||
)
|
||||
end
|
||||
|
||||
def self.publish_reaction!(chat_channel, chat_message, action, user, emoji)
|
||||
message_bus_targets = calculate_publish_targets(chat_channel, chat_message)
|
||||
|
||||
content = {
|
||||
action: action,
|
||||
user: BasicUserSerializer.new(user, root: false).as_json,
|
||||
emoji: emoji,
|
||||
type: :reaction,
|
||||
chat_message_id: chat_message.id,
|
||||
}
|
||||
|
||||
message_bus_targets.each do |message_bus_channel|
|
||||
MessageBus.publish(message_bus_channel, content.as_json, permissions(chat_channel))
|
||||
end
|
||||
publish_to_targets!(
|
||||
message_bus_targets,
|
||||
chat_channel,
|
||||
{
|
||||
action: action,
|
||||
user: BasicUserSerializer.new(user, root: false).as_json,
|
||||
emoji: emoji,
|
||||
type: :reaction,
|
||||
chat_message_id: chat_message.id,
|
||||
},
|
||||
)
|
||||
end
|
||||
|
||||
def self.publish_presence!(chat_channel, user, typ)
|
||||
|
@ -156,28 +119,26 @@ module Chat
|
|||
|
||||
def self.publish_delete!(chat_channel, chat_message)
|
||||
message_bus_targets = calculate_publish_targets(chat_channel, chat_message)
|
||||
|
||||
message_bus_targets.each do |message_bus_channel|
|
||||
MessageBus.publish(
|
||||
message_bus_channel,
|
||||
{ type: "delete", deleted_id: chat_message.id, deleted_at: chat_message.deleted_at },
|
||||
permissions(chat_channel),
|
||||
)
|
||||
end
|
||||
publish_to_targets!(
|
||||
message_bus_targets,
|
||||
chat_channel,
|
||||
{ type: "delete", deleted_id: chat_message.id, deleted_at: chat_message.deleted_at },
|
||||
)
|
||||
end
|
||||
|
||||
def self.publish_bulk_delete!(chat_channel, deleted_message_ids)
|
||||
channel_permissions = permissions(chat_channel)
|
||||
Chat::Thread
|
||||
.grouped_messages(message_ids: deleted_message_ids)
|
||||
.each do |group|
|
||||
MessageBus.publish(
|
||||
thread_message_bus_channel(chat_channel.id, group.thread_id),
|
||||
{
|
||||
type: "bulk_delete",
|
||||
type: :bulk_delete,
|
||||
deleted_ids: group.thread_message_ids,
|
||||
deleted_at: Time.zone.now,
|
||||
},
|
||||
permissions(chat_channel),
|
||||
channel_permissions,
|
||||
)
|
||||
|
||||
# Don't need to publish to the main channel if the messages deleted
|
||||
|
@ -189,54 +150,74 @@ module Chat
|
|||
|
||||
return if deleted_message_ids.empty?
|
||||
|
||||
MessageBus.publish(
|
||||
root_message_bus_channel(chat_channel.id),
|
||||
{ type: "bulk_delete", deleted_ids: deleted_message_ids, deleted_at: Time.zone.now },
|
||||
permissions(chat_channel),
|
||||
publish_to_channel!(
|
||||
chat_channel,
|
||||
{ type: :bulk_delete, deleted_ids: deleted_message_ids, deleted_at: Time.zone.now },
|
||||
)
|
||||
end
|
||||
|
||||
def self.publish_restore!(chat_channel, chat_message)
|
||||
message_bus_targets = calculate_publish_targets(chat_channel, chat_message)
|
||||
|
||||
content =
|
||||
Chat::MessageSerializer.new(
|
||||
chat_message,
|
||||
{ scope: anonymous_guardian, root: :chat_message },
|
||||
).as_json
|
||||
content[:type] = :restore
|
||||
|
||||
message_bus_targets.each do |message_bus_channel|
|
||||
MessageBus.publish(message_bus_channel, content.as_json, permissions(chat_channel))
|
||||
end
|
||||
publish_to_targets!(
|
||||
message_bus_targets,
|
||||
chat_channel,
|
||||
serialize_message_with_type(chat_message, :restore),
|
||||
)
|
||||
end
|
||||
|
||||
def self.publish_flag!(chat_message, user, reviewable, score)
|
||||
message_bus_targets = calculate_publish_targets(chat_message.chat_channel, chat_message)
|
||||
|
||||
message_bus_targets.each do |message_bus_channel|
|
||||
# Publish to user who created flag
|
||||
MessageBus.publish(
|
||||
message_bus_channel,
|
||||
{
|
||||
type: "self_flagged",
|
||||
user_flag_status: score.status_for_database,
|
||||
chat_message_id: chat_message.id,
|
||||
}.as_json,
|
||||
# Publish to user who created flag
|
||||
publish_to_targets!(
|
||||
message_bus_targets,
|
||||
chat_message.chat_channel,
|
||||
{
|
||||
type: :self_flagged,
|
||||
user_flag_status: score.status_for_database,
|
||||
chat_message_id: chat_message.id,
|
||||
},
|
||||
permissions: {
|
||||
user_ids: [user.id],
|
||||
)
|
||||
end
|
||||
},
|
||||
)
|
||||
|
||||
message_bus_targets.each do |message_bus_channel|
|
||||
# Publish flag with link to reviewable to staff
|
||||
# Publish flag with link to reviewable to staff
|
||||
publish_to_targets!(
|
||||
message_bus_targets,
|
||||
chat_message.chat_channel,
|
||||
{ type: :flag, chat_message_id: chat_message.id, reviewable_id: reviewable.id },
|
||||
permissions: {
|
||||
group_ids: [Group::AUTO_GROUPS[:staff]],
|
||||
},
|
||||
)
|
||||
end
|
||||
|
||||
def self.publish_to_channel!(channel, payload)
|
||||
MessageBus.publish(
|
||||
root_message_bus_channel(channel.id),
|
||||
payload.as_json,
|
||||
permissions(channel),
|
||||
)
|
||||
end
|
||||
|
||||
def self.publish_to_targets!(targets, channel, payload, permissions: nil)
|
||||
targets.each do |message_bus_channel|
|
||||
MessageBus.publish(
|
||||
message_bus_channel,
|
||||
{ type: "flag", chat_message_id: chat_message.id, reviewable_id: reviewable.id }.as_json,
|
||||
group_ids: [Group::AUTO_GROUPS[:staff]],
|
||||
payload.as_json,
|
||||
permissions || permissions(channel),
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
def self.serialize_message_with_type(chat_message, type)
|
||||
Chat::MessageSerializer
|
||||
.new(chat_message, { scope: anonymous_guardian, root: :chat_message })
|
||||
.as_json
|
||||
.merge(type: type)
|
||||
end
|
||||
|
||||
def self.user_tracking_state_message_bus_channel(user_id)
|
||||
"/chat/user-tracking-state/#{user_id}"
|
||||
end
|
||||
|
|
63
plugins/chat/app/services/chat/restore_message.rb
Normal file
63
plugins/chat/app/services/chat/restore_message.rb
Normal file
|
@ -0,0 +1,63 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
module Chat
|
||||
# Service responsible for restoring a trashed chat message
|
||||
# for a channel and ensuring that the client and read state is
|
||||
# updated.
|
||||
#
|
||||
# @example
|
||||
# Chat::RestoreMessage.call(message_id: 2, channel_id: 1, guardian: guardian)
|
||||
#
|
||||
class RestoreMessage
|
||||
include Service::Base
|
||||
|
||||
# @!method call(message_id:, channel_id:, guardian:)
|
||||
# @param [Integer] message_id
|
||||
# @param [Integer] channel_id
|
||||
# @param [Guardian] guardian
|
||||
# @return [Service::Base::Context]
|
||||
|
||||
contract
|
||||
model :message
|
||||
policy :invalid_access
|
||||
transaction do
|
||||
step :restore_message
|
||||
step :update_thread_reply_cache
|
||||
end
|
||||
step :publish_events
|
||||
|
||||
# @!visibility private
|
||||
class Contract
|
||||
attribute :message_id, :integer
|
||||
attribute :channel_id, :integer
|
||||
validates :message_id, presence: true
|
||||
validates :channel_id, presence: true
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def fetch_message(contract:, **)
|
||||
Chat::Message
|
||||
.with_deleted
|
||||
.includes(chat_channel: :chatable)
|
||||
.find_by(id: contract.message_id, chat_channel_id: contract.channel_id)
|
||||
end
|
||||
|
||||
def invalid_access(guardian:, message:, **)
|
||||
guardian.can_restore_chat?(message, message.chat_channel.chatable)
|
||||
end
|
||||
|
||||
def restore_message(message:, **)
|
||||
message.recover!
|
||||
end
|
||||
|
||||
def update_thread_reply_cache(message:, **)
|
||||
message.thread&.increment_replies_count_cache
|
||||
end
|
||||
|
||||
def publish_events(guardian:, message:, **)
|
||||
DiscourseEvent.trigger(:chat_message_restored, message, message.chat_channel, guardian.user)
|
||||
Chat::Publisher.publish_restore!(message.chat_channel, message)
|
||||
end
|
||||
end
|
||||
end
|
|
@ -24,6 +24,7 @@ module Chat
|
|||
step :trash_message
|
||||
step :destroy_mentions
|
||||
step :update_tracking_state
|
||||
step :update_thread_reply_cache
|
||||
end
|
||||
step :publish_events
|
||||
|
||||
|
@ -62,6 +63,10 @@ module Chat
|
|||
)
|
||||
end
|
||||
|
||||
def update_thread_reply_cache(message:, **)
|
||||
message.thread&.decrement_replies_count_cache
|
||||
end
|
||||
|
||||
def publish_events(guardian:, message:, **)
|
||||
DiscourseEvent.trigger(:chat_message_trashed, message, message.chat_channel, guardian.user)
|
||||
Chat::Publisher.publish_delete!(message.chat_channel, message)
|
||||
|
|
|
@ -350,10 +350,9 @@ export default class ChatApi extends Service {
|
|||
* @param {number} messageId - The ID of the message being restored.
|
||||
*/
|
||||
restoreMessage(channelId, messageId) {
|
||||
// TODO (martin) Not ideal, this should have a chat API controller endpoint.
|
||||
return ajax(`/chat/${channelId}/restore/${messageId}`, {
|
||||
type: "PUT",
|
||||
});
|
||||
return this.#putRequest(
|
||||
`/channels/${channelId}/messages/${messageId}/restore`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -37,11 +37,8 @@ export default class ChatChannelPaneSubscriptionsManager extends ChatPaneBaseSub
|
|||
handleThreadOriginalMessageUpdate(data) {
|
||||
const message = this.messagesManager.findMessage(data.original_message_id);
|
||||
if (message) {
|
||||
if (data.action === "increment_reply_count") {
|
||||
// TODO (martin) In future we should use a replies_count delivered
|
||||
// from the server and simply update the message accordingly, for
|
||||
// now we don't have an accurate enough count for this.
|
||||
message.threadReplyCount += 1;
|
||||
if (data.replies_count) {
|
||||
message.threadReplyCount = data.replies_count;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ Chat::Engine.routes.draw do
|
|||
|
||||
get "/channels/:channel_id/threads/:thread_id" => "channel_threads#show"
|
||||
|
||||
put "/channels/:channel_id/messages/:message_id/restore" => "channel_messages#restore"
|
||||
delete "/channels/:channel_id/messages/:message_id" => "channel_messages#destroy"
|
||||
end
|
||||
|
||||
|
@ -61,7 +62,6 @@ Chat::Engine.routes.draw do
|
|||
put "/:chat_channel_id/:message_id/rebake" => "chat#rebake"
|
||||
post "/:chat_channel_id/:message_id/flag" => "chat#flag"
|
||||
post "/:chat_channel_id/quote" => "chat#quote_messages"
|
||||
put "/:chat_channel_id/restore/:message_id" => "chat#restore"
|
||||
get "/lookup/:message_id" => "chat#lookup_message"
|
||||
put "/:chat_channel_id/read/:message_id" => "chat#update_user_last_read"
|
||||
put "/user_chat_enabled/:user_id" => "chat#set_user_chat_status"
|
||||
|
|
|
@ -58,6 +58,7 @@ module Chat
|
|||
@chat_message.attach_uploads(uploads)
|
||||
Chat::Draft.where(user_id: @user.id, chat_channel_id: @chat_channel.id).destroy_all
|
||||
Chat::Publisher.publish_new!(@chat_channel, @chat_message, @staged_id)
|
||||
resolved_thread&.increment_replies_count_cache
|
||||
Jobs.enqueue(Jobs::Chat::ProcessMessage, { chat_message_id: @chat_message.id })
|
||||
Chat::Notifier.notify_new(chat_message: @chat_message, timestamp: @chat_message.created_at)
|
||||
@chat_channel.touch(:last_message_sent_at)
|
||||
|
@ -206,5 +207,9 @@ module Chat
|
|||
WHERE thread_id IS NULL AND chat_messages.id = thread_updater.id
|
||||
SQL
|
||||
end
|
||||
|
||||
def resolved_thread
|
||||
@existing_thread || @chat_message.thread
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
RSpec.describe "Chat::Thread replies_count cache accuracy" do
|
||||
include ActiveSupport::Testing::TimeHelpers
|
||||
|
||||
fab!(:user) { Fabricate(:user) }
|
||||
fab!(:thread) { Fabricate(:chat_thread) }
|
||||
|
||||
it "keeps an accurate replies_count cache" do
|
||||
freeze_time
|
||||
Jobs.run_immediately!
|
||||
|
||||
expect(thread.replies_count).to eq(0)
|
||||
expect(thread.replies_count_cache).to eq(0)
|
||||
|
||||
# Create 5 replies
|
||||
5.times do |i|
|
||||
Chat::MessageCreator.create(
|
||||
chat_channel: thread.channel,
|
||||
user: user,
|
||||
thread_id: thread.id,
|
||||
content: "Hello world #{i}",
|
||||
)
|
||||
end
|
||||
|
||||
# The job only runs to completion if the cache has not been recently
|
||||
# updated, so the DB count will only be 1.
|
||||
expect(thread.replies_count_cache).to eq(5)
|
||||
expect(thread.reload.replies_count).to eq(1)
|
||||
|
||||
# Travel to the future so the cache expires.
|
||||
travel_to 6.minutes.from_now
|
||||
Chat::MessageCreator.create(
|
||||
chat_channel: thread.channel,
|
||||
user: user,
|
||||
thread_id: thread.id,
|
||||
content: "Hello world now that time has passed",
|
||||
)
|
||||
expect(thread.replies_count_cache).to eq(6)
|
||||
expect(thread.reload.replies_count).to eq(6)
|
||||
|
||||
# Lose the cache intentionally.
|
||||
Chat::Thread.clear_caches!
|
||||
message_to_destroy = thread.replies.last
|
||||
Chat::TrashMessage.call(
|
||||
message_id: message_to_destroy.id,
|
||||
channel_id: thread.channel_id,
|
||||
guardian: Guardian.new(user),
|
||||
)
|
||||
expect(thread.replies_count_cache).to eq(5)
|
||||
expect(thread.reload.replies_count).to eq(5)
|
||||
|
||||
# Lose the cache intentionally.
|
||||
Chat::Thread.clear_caches!
|
||||
Chat::RestoreMessage.call(
|
||||
message_id: message_to_destroy.id,
|
||||
channel_id: thread.channel_id,
|
||||
guardian: Guardian.new(user),
|
||||
)
|
||||
expect(thread.replies_count_cache).to eq(6)
|
||||
expect(thread.reload.replies_count).to eq(6)
|
||||
end
|
||||
end
|
|
@ -0,0 +1,46 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
RSpec.describe Jobs::Chat::UpdateThreadReplyCount do
|
||||
fab!(:thread) { Fabricate(:chat_thread) }
|
||||
fab!(:message_1) { Fabricate(:chat_message, thread: thread) }
|
||||
fab!(:message_2) { Fabricate(:chat_message, thread: thread) }
|
||||
|
||||
before { Chat::Thread.clear_caches! }
|
||||
|
||||
it "does not error if the thread is deleted" do
|
||||
id = thread.id
|
||||
thread.destroy!
|
||||
expect { described_class.new.execute(thread_id: id) }.not_to raise_error
|
||||
end
|
||||
|
||||
it "does not set the reply count in the DB if it has been changed recently" do
|
||||
described_class.new.execute(thread_id: thread.id)
|
||||
expect(thread.reload.replies_count).to eq(2)
|
||||
Fabricate(:chat_message, thread: thread)
|
||||
described_class.new.execute(thread_id: thread.id)
|
||||
expect(thread.reload.replies_count).to eq(2)
|
||||
end
|
||||
|
||||
it "sets the updated_at cache to the current time" do
|
||||
freeze_time
|
||||
described_class.new.execute(thread_id: thread.id)
|
||||
expect(thread.replies_count_cache_updated_at).to eq_time(
|
||||
Time.at(Time.zone.now.to_i, in: Time.zone),
|
||||
)
|
||||
end
|
||||
|
||||
it "publishes the thread original message metadata" do
|
||||
messages =
|
||||
MessageBus.track_publish("/chat/#{thread.channel_id}") do
|
||||
described_class.new.execute(thread_id: thread.id)
|
||||
end
|
||||
|
||||
expect(messages.first.data).to eq(
|
||||
{
|
||||
"original_message_id" => thread.original_message_id,
|
||||
"replies_count" => 2,
|
||||
"type" => "update_thread_original_message",
|
||||
},
|
||||
)
|
||||
end
|
||||
end
|
|
@ -76,9 +76,9 @@ describe Chat::MessageMover do
|
|||
deleted_messages = Chat::Message.with_deleted.where(id: move_message_ids).order(:id)
|
||||
expect(deleted_messages.count).to eq(3)
|
||||
expect(messages.first.channel).to eq("/chat/#{source_channel.id}")
|
||||
expect(messages.first.data[:type]).to eq("bulk_delete")
|
||||
expect(messages.first.data[:deleted_ids]).to eq(deleted_messages.map(&:id))
|
||||
expect(messages.first.data[:deleted_at]).not_to eq(nil)
|
||||
expect(messages.first.data["type"]).to eq("bulk_delete")
|
||||
expect(messages.first.data["deleted_ids"]).to eq(deleted_messages.map(&:id))
|
||||
expect(messages.first.data["deleted_at"]).not_to eq(nil)
|
||||
end
|
||||
|
||||
it "creates a message in the source channel to indicate that the messages have been moved" do
|
||||
|
|
|
@ -280,9 +280,9 @@ describe Chat::ReviewQueue do
|
|||
end
|
||||
.map(&:data)
|
||||
|
||||
delete_msg = messages.detect { |m| m[:type] == "delete" }
|
||||
delete_msg = messages.detect { |m| m["type"] == "delete" }
|
||||
|
||||
expect(delete_msg[:deleted_id]).to eq(message.id)
|
||||
expect(delete_msg["deleted_id"]).to eq(message.id)
|
||||
end
|
||||
|
||||
it "agrees with other flags on the same message" do
|
||||
|
|
|
@ -40,6 +40,20 @@ RSpec.describe Chat::Thread do
|
|||
described_class.ensure_consistency!
|
||||
expect(thread_1.reload.replies_count).to eq(0)
|
||||
end
|
||||
|
||||
it "clears the affected replies_count caches" do
|
||||
thread_1.set_replies_count_cache(100)
|
||||
expect(thread_1.replies_count_cache).to eq(100)
|
||||
expect(thread_1.replies_count_cache_updated_at).not_to eq(nil)
|
||||
|
||||
described_class.ensure_consistency!
|
||||
expect(Discourse.redis.get(Chat::Thread.replies_count_cache_redis_key(thread_1.id))).to eq(
|
||||
nil,
|
||||
)
|
||||
expect(
|
||||
Discourse.redis.get(Chat::Thread.replies_count_cache_updated_at_redis_key(thread_1.id)),
|
||||
).to eq(nil)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -106,4 +106,95 @@ RSpec.describe Chat::Api::ChannelMessagesController do
|
|||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "#restore" do
|
||||
RSpec.shared_examples "chat_message_restoration" do
|
||||
it "doesn't allow a user to restore another user's message" do
|
||||
sign_in(other_user)
|
||||
|
||||
put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json"
|
||||
expect(response.status).to eq(403)
|
||||
end
|
||||
|
||||
it "allows a user to restore their own messages" do
|
||||
sign_in(current_user)
|
||||
|
||||
put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json"
|
||||
expect(response.status).to eq(200)
|
||||
expect(deleted_message.reload.deleted_at).to be_nil
|
||||
end
|
||||
|
||||
it "allows admin to restore others' messages" do
|
||||
sign_in(admin)
|
||||
|
||||
put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json"
|
||||
expect(response.status).to eq(200)
|
||||
expect(deleted_message.reload.deleted_at).to be_nil
|
||||
end
|
||||
|
||||
it "does not allow message restore when channel is read_only" do
|
||||
sign_in(current_user)
|
||||
|
||||
chat_channel.update!(status: :read_only)
|
||||
|
||||
put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json"
|
||||
expect(response.status).to eq(403)
|
||||
expect(deleted_message.reload.deleted_at).not_to be_nil
|
||||
|
||||
sign_in(admin)
|
||||
put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json"
|
||||
expect(response.status).to eq(403)
|
||||
end
|
||||
|
||||
it "only allows admin to restore when channel is closed" do
|
||||
sign_in(admin)
|
||||
|
||||
chat_channel.update!(status: :read_only)
|
||||
|
||||
put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json"
|
||||
expect(response.status).to eq(403)
|
||||
expect(deleted_message.reload.deleted_at).not_to be_nil
|
||||
|
||||
chat_channel.update!(status: :closed)
|
||||
put "/chat/api/channels/#{chat_channel.id}/messages/#{deleted_message.id}/restore.json"
|
||||
expect(response.status).to eq(200)
|
||||
expect(deleted_message.reload.deleted_at).to be_nil
|
||||
end
|
||||
end
|
||||
|
||||
fab!(:admin) { Fabricate(:admin) }
|
||||
fab!(:second_user) { Fabricate(:user) }
|
||||
|
||||
before do
|
||||
message =
|
||||
Chat::Message.create(
|
||||
user: current_user,
|
||||
message: "this is a message",
|
||||
chat_channel: chat_channel,
|
||||
)
|
||||
message.trash!
|
||||
end
|
||||
|
||||
let(:deleted_message) do
|
||||
Chat::Message.unscoped.where(user: current_user, chat_channel: chat_channel).last
|
||||
end
|
||||
|
||||
describe "for category" do
|
||||
fab!(:category) { Fabricate(:category) }
|
||||
fab!(:chat_channel) { Fabricate(:category_channel, chatable: category) }
|
||||
|
||||
it_behaves_like "chat_message_restoration" do
|
||||
let(:other_user) { second_user }
|
||||
end
|
||||
end
|
||||
|
||||
describe "for dm channel" do
|
||||
fab!(:user_2) { Fabricate(:user) }
|
||||
fab!(:chat_channel) { Fabricate(:direct_message_channel, users: [current_user, user_2]) }
|
||||
|
||||
it_behaves_like "chat_message_restoration" do
|
||||
let(:other_user) { user_2 }
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -559,82 +559,6 @@ RSpec.describe Chat::ChatController do
|
|||
end
|
||||
end
|
||||
|
||||
RSpec.shared_examples "chat_message_restoration" do
|
||||
it "doesn't allow a user to restore another user's message" do
|
||||
sign_in(other_user)
|
||||
|
||||
put "/chat/#{chat_channel.id}/restore/#{Chat::Message.unscoped.last.id}.json"
|
||||
expect(response.status).to eq(403)
|
||||
end
|
||||
|
||||
it "allows a user to restore their own posts" do
|
||||
sign_in(user)
|
||||
|
||||
deleted_message = Chat::Message.unscoped.last
|
||||
put "/chat/#{chat_channel.id}/restore/#{deleted_message.id}.json"
|
||||
expect(response.status).to eq(200)
|
||||
expect(deleted_message.reload.deleted_at).to be_nil
|
||||
end
|
||||
|
||||
it "allows admin to restore others' posts" do
|
||||
sign_in(admin)
|
||||
|
||||
deleted_message = Chat::Message.unscoped.last
|
||||
put "/chat/#{chat_channel.id}/restore/#{deleted_message.id}.json"
|
||||
expect(response.status).to eq(200)
|
||||
expect(deleted_message.reload.deleted_at).to be_nil
|
||||
end
|
||||
|
||||
it "does not allow message restore when chat channel is read_only" do
|
||||
sign_in(Chat::Message.last.user)
|
||||
|
||||
chat_channel.update!(status: :read_only)
|
||||
|
||||
deleted_message = Chat::Message.unscoped.last
|
||||
put "/chat/#{chat_channel.id}/restore/#{deleted_message.id}.json"
|
||||
expect(response.status).to eq(403)
|
||||
expect(deleted_message.reload.deleted_at).not_to be_nil
|
||||
|
||||
sign_in(admin)
|
||||
put "/chat/#{chat_channel.id}/restore/#{deleted_message.id}.json"
|
||||
expect(response.status).to eq(403)
|
||||
end
|
||||
|
||||
it "only allows admin to restore when chat channel is closed" do
|
||||
sign_in(admin)
|
||||
|
||||
chat_channel.update!(status: :read_only)
|
||||
|
||||
deleted_message = Chat::Message.unscoped.last
|
||||
put "/chat/#{chat_channel.id}/restore/#{deleted_message.id}.json"
|
||||
expect(response.status).to eq(403)
|
||||
expect(deleted_message.reload.deleted_at).not_to be_nil
|
||||
|
||||
chat_channel.update!(status: :closed)
|
||||
put "/chat/#{chat_channel.id}/restore/#{deleted_message.id}.json"
|
||||
expect(response.status).to eq(200)
|
||||
expect(deleted_message.reload.deleted_at).to be_nil
|
||||
end
|
||||
end
|
||||
|
||||
describe "#restore" do
|
||||
fab!(:second_user) { Fabricate(:user) }
|
||||
|
||||
before do
|
||||
message =
|
||||
Chat::Message.create(user: user, message: "this is a message", chat_channel: chat_channel)
|
||||
message.trash!
|
||||
end
|
||||
|
||||
describe "for category" do
|
||||
fab!(:chat_channel) { Fabricate(:category_channel, chatable: category) }
|
||||
|
||||
it_behaves_like "chat_message_restoration" do
|
||||
let(:other_user) { second_user }
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "react" do
|
||||
fab!(:chat_channel) { Fabricate(:category_channel) }
|
||||
fab!(:chat_message) { Fabricate(:chat_message, chat_channel: chat_channel, user: user) }
|
||||
|
|
74
plugins/chat/spec/services/chat/restore_message_spec.rb
Normal file
74
plugins/chat/spec/services/chat/restore_message_spec.rb
Normal file
|
@ -0,0 +1,74 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
RSpec.describe Chat::RestoreMessage do
|
||||
fab!(:current_user) { Fabricate(:user) }
|
||||
let!(:guardian) { Guardian.new(current_user) }
|
||||
fab!(:message) { Fabricate(:chat_message, user: current_user) }
|
||||
|
||||
before { message.trash! }
|
||||
|
||||
describe ".call" do
|
||||
subject(:result) { described_class.call(params) }
|
||||
|
||||
context "when params are not valid" do
|
||||
let(:params) { { guardian: guardian } }
|
||||
|
||||
it { is_expected.to fail_a_contract }
|
||||
end
|
||||
|
||||
context "when params are valid" do
|
||||
let(:params) do
|
||||
{ guardian: guardian, message_id: message.id, channel_id: message.chat_channel_id }
|
||||
end
|
||||
|
||||
context "when the user does not have permission to restore" do
|
||||
before { message.update!(user: Fabricate(:admin)) }
|
||||
|
||||
it { is_expected.to fail_a_policy(:invalid_access) }
|
||||
end
|
||||
|
||||
context "when the channel does not match the message" do
|
||||
let(:params) do
|
||||
{ guardian: guardian, message_id: message.id, channel_id: Fabricate(:chat_channel).id }
|
||||
end
|
||||
|
||||
it { is_expected.to fail_to_find_a_model(:message) }
|
||||
end
|
||||
|
||||
context "when the user has permission to restore" do
|
||||
it "sets the service result as successful" do
|
||||
expect(result).to be_a_success
|
||||
end
|
||||
|
||||
it "restores the message" do
|
||||
result
|
||||
expect(Chat::Message.find_by(id: message.id)).not_to be_nil
|
||||
end
|
||||
|
||||
it "publishes associated Discourse and MessageBus events" do
|
||||
freeze_time
|
||||
messages = nil
|
||||
event =
|
||||
DiscourseEvent.track_events { messages = MessageBus.track_publish { result } }.first
|
||||
expect(event[:event_name]).to eq(:chat_message_restored)
|
||||
expect(event[:params]).to eq([message, message.chat_channel, current_user])
|
||||
expect(
|
||||
messages.find { |m| m.channel == "/chat/#{message.chat_channel_id}" }.data,
|
||||
).to include({ "type" => "restore" })
|
||||
end
|
||||
|
||||
context "when the message has a thread" do
|
||||
fab!(:thread) { Fabricate(:chat_thread, channel: message.chat_channel) }
|
||||
|
||||
before { message.update!(thread: thread) }
|
||||
|
||||
it "increments the thread reply count" do
|
||||
thread.set_replies_count_cache(1)
|
||||
result
|
||||
expect(thread.replies_count_cache).to eq(2)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -57,7 +57,11 @@ RSpec.describe Chat::TrashMessage do
|
|||
expect(event[:event_name]).to eq(:chat_message_trashed)
|
||||
expect(event[:params]).to eq([message, message.chat_channel, current_user])
|
||||
expect(messages.find { |m| m.channel == "/chat/#{message.chat_channel_id}" }.data).to eq(
|
||||
{ type: "delete", deleted_id: message.id, deleted_at: Time.zone.now },
|
||||
{
|
||||
"type" => "delete",
|
||||
"deleted_id" => message.id,
|
||||
"deleted_at" => message.reload.deleted_at.iso8601(3),
|
||||
},
|
||||
)
|
||||
end
|
||||
|
||||
|
@ -86,6 +90,18 @@ RSpec.describe Chat::TrashMessage do
|
|||
expect(membership_3.reload.last_read_message_id).not_to be_nil
|
||||
end
|
||||
|
||||
context "when the message has a thread" do
|
||||
fab!(:thread) { Fabricate(:chat_thread, channel: message.chat_channel) }
|
||||
|
||||
before { message.update!(thread: thread) }
|
||||
|
||||
it "decrements the thread reply count" do
|
||||
thread.set_replies_count_cache(5)
|
||||
result
|
||||
expect(thread.replies_count_cache).to eq(4)
|
||||
end
|
||||
end
|
||||
|
||||
context "when message is already deleted" do
|
||||
before { message.trash! }
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user