discourse/app/models/topic_tracking_state.rb
Alan Guo Xiang Tan 48c8ed49d6
FIX: Dismissing unread posts did not publish changes to other clients (#22584)
Why this change?

Prior to this change, dismissing unreads posts did not publish the
changes across clients for the same user. As a result, users can end up
seeing an unread count being present but saw no topics being loaded when
visiting the `/unread` route.
2023-07-13 18:05:56 +08:00

600 lines
18 KiB
Ruby

# frozen_string_literal: true
# This class is used to mirror unread and new status back to end users
# in JavaScript there is a mirror class that is kept in-sync using MessageBus
# the allows end users to always know which topics have unread posts in them
# and which topics are new. This is used in various places in the UI, such as
# counters, indicators, and messages at the top of topic lists, so the user
# knows there is something worth reading at a glance.
#
# The TopicTrackingState.report data is preloaded in ApplicationController
# for the current user under the topicTrackingStates key, and the existing
# state is loaded into memory on page load. From there the MessageBus is
# used to keep topic state up to date, as well as syncing with topics from
# corresponding lists fetched from the server (e.g. the /new, /latest,
# /unread topic lists).
#
# See discourse/app/models/topic-tracking-state.js
class TopicTrackingState
include ActiveModel::SerializerSupport
include TopicTrackingStatePublishable
UNREAD_MESSAGE_TYPE = "unread"
LATEST_MESSAGE_TYPE = "latest"
MUTED_MESSAGE_TYPE = "muted"
UNMUTED_MESSAGE_TYPE = "unmuted"
NEW_TOPIC_MESSAGE_TYPE = "new_topic"
RECOVER_MESSAGE_TYPE = "recover"
DELETE_MESSAGE_TYPE = "delete"
DESTROY_MESSAGE_TYPE = "destroy"
READ_MESSAGE_TYPE = "read"
DISMISS_NEW_MESSAGE_TYPE = "dismiss_new"
DISMISS_NEW_POSTS_MESSAGE_TYPE = "dismiss_new_posts"
MAX_TOPICS = 5000
NEW_MESSAGE_BUS_CHANNEL = "/new"
LATEST_MESSAGE_BUS_CHANNEL = "/latest"
UNREAD_MESSAGE_BUS_CHANNEL = "/unread"
RECOVER_MESSAGE_BUS_CHANNEL = "/recover"
DELETE_MESSAGE_BUS_CHANNEL = "/delete"
DESTROY_MESSAGE_BUS_CHANNEL = "/destroy"
def self.publish_new(topic)
return unless topic.regular?
tag_ids, tags = nil
tag_ids, tags = topic.tags.pluck(:id, :name).transpose if SiteSetting.tagging_enabled
payload = {
last_read_post_number: nil,
highest_post_number: 1,
created_at: topic.created_at,
category_id: topic.category_id,
archetype: topic.archetype,
created_in_new_period: true,
}
if tags
payload[:tags] = tags
payload[:topic_tag_ids] = tag_ids
end
message = { topic_id: topic.id, message_type: NEW_TOPIC_MESSAGE_TYPE, payload: payload }
group_ids = secure_category_group_ids(topic)
MessageBus.publish(NEW_MESSAGE_BUS_CHANNEL, message.as_json, group_ids: group_ids)
publish_read(topic.id, 1, topic.user)
end
def self.publish_latest(topic, whisper = false)
return unless topic.regular?
tag_ids, tags = nil
tag_ids, tags = topic.tags.pluck(:id, :name).transpose if SiteSetting.tagging_enabled
message = {
topic_id: topic.id,
message_type: LATEST_MESSAGE_TYPE,
payload: {
bumped_at: topic.bumped_at,
category_id: topic.category_id,
archetype: topic.archetype,
},
}
if tags
message[:payload][:tags] = tags
message[:payload][:topic_tag_ids] = tag_ids
end
group_ids =
if whisper
[Group::AUTO_GROUPS[:staff], *SiteSetting.whispers_allowed_group_ids]
else
secure_category_group_ids(topic)
end
MessageBus.publish(LATEST_MESSAGE_BUS_CHANNEL, message.as_json, group_ids: group_ids)
end
def self.unread_channel_key(user_id)
"/unread/#{user_id}"
end
def self.publish_muted(topic)
return unless topic.regular?
user_ids =
topic
.topic_users
.where(notification_level: NotificationLevels.all[:muted])
.joins(:user)
.where("users.last_seen_at > ?", 7.days.ago)
.order("users.last_seen_at DESC")
.limit(100)
.pluck(:user_id)
return if user_ids.blank?
message = { topic_id: topic.id, message_type: MUTED_MESSAGE_TYPE }
MessageBus.publish(LATEST_MESSAGE_BUS_CHANNEL, message.as_json, user_ids: user_ids)
end
def self.publish_unmuted(topic)
return unless topic.regular?
user_ids =
User
.watching_topic(topic)
.where("users.last_seen_at > ?", 7.days.ago)
.order("users.last_seen_at DESC")
.limit(100)
.pluck(:id)
return if user_ids.blank?
message = { topic_id: topic.id, message_type: UNMUTED_MESSAGE_TYPE }
MessageBus.publish(LATEST_MESSAGE_BUS_CHANNEL, message.as_json, user_ids: user_ids)
end
def self.publish_unread(post)
return unless post.topic.regular?
# TODO at high scale we are going to have to defer this,
# perhaps cut down to users that are around in the last 7 days as well
tags = nil
tag_ids = nil
tag_ids, tags = post.topic.tags.pluck(:id, :name).transpose if include_tags_in_report?
# We don't need to publish unread to the person who just made the post,
# this is why they are excluded from the initial scope.
scope =
TopicUser.tracking(post.topic_id).includes(user: :user_stat).where.not(user_id: post.user_id)
group_ids =
if post.post_type == Post.types[:whisper]
[Group::AUTO_GROUPS[:staff], *SiteSetting.whispers_allowed_group_ids]
else
post.topic.category && post.topic.category.secure_group_ids
end
if group_ids.present?
scope =
scope.joins("INNER JOIN group_users gu ON gu.user_id = topic_users.user_id").where(
"gu.group_id IN (?)",
group_ids,
)
end
user_ids = scope.pluck(:user_id)
return if user_ids.empty?
payload = {
highest_post_number: post.post_number,
updated_at: post.topic.updated_at,
created_at: post.created_at,
category_id: post.topic.category_id,
archetype: post.topic.archetype,
}
if tags
payload[:tags] = tags
payload[:topic_tag_ids] = tag_ids
end
message = { topic_id: post.topic_id, message_type: UNREAD_MESSAGE_TYPE, payload: payload }
MessageBus.publish(UNREAD_MESSAGE_BUS_CHANNEL, message.as_json, user_ids: user_ids)
end
def self.publish_recover(topic)
return unless topic.regular?
group_ids = secure_category_group_ids(topic)
message = { topic_id: topic.id, message_type: RECOVER_MESSAGE_TYPE }
MessageBus.publish(RECOVER_MESSAGE_BUS_CHANNEL, message.as_json, group_ids: group_ids)
end
def self.publish_delete(topic)
return unless topic.regular?
group_ids = secure_category_group_ids(topic)
message = { topic_id: topic.id, message_type: DELETE_MESSAGE_TYPE }
MessageBus.publish("/delete", message.as_json, group_ids: group_ids)
end
def self.publish_destroy(topic)
return unless topic.regular?
group_ids = secure_category_group_ids(topic)
message = { topic_id: topic.id, message_type: DESTROY_MESSAGE_TYPE }
MessageBus.publish(DESTROY_MESSAGE_BUS_CHANNEL, message.as_json, group_ids: group_ids)
end
def self.publish_read(topic_id, last_read_post_number, user, notification_level = nil)
self.publish_read_message(
message_type: READ_MESSAGE_TYPE,
channel_name: self.unread_channel_key(user.id),
topic_id: topic_id,
user: user,
last_read_post_number: last_read_post_number,
notification_level: notification_level,
)
end
def self.publish_dismiss_new(user_id, topic_ids: [])
message = { message_type: DISMISS_NEW_MESSAGE_TYPE, payload: { topic_ids: topic_ids } }
MessageBus.publish(self.unread_channel_key(user_id), message.as_json, user_ids: [user_id])
end
def self.publish_dismiss_new_posts(user_id, topic_ids: [])
message = { message_type: DISMISS_NEW_POSTS_MESSAGE_TYPE, payload: { topic_ids: topic_ids } }
MessageBus.publish(self.unread_channel_key(user_id), message.as_json, user_ids: [user_id])
end
def self.new_filter_sql
TopicQuery
.new_filter(Topic, treat_as_new_topic_clause_sql: treat_as_new_topic_clause)
.where_clause
.ast
.to_sql + " AND topics.created_at > :min_new_topic_date" +
" AND dismissed_topic_users.id IS NULL"
end
def self.unread_filter_sql(whisperer: false)
TopicQuery.unread_filter(Topic, whisperer: whisperer).where_clause.ast.to_sql
end
def self.treat_as_new_topic_clause
User
.where(
"GREATEST(CASE
WHEN COALESCE(uo.new_topic_duration_minutes, :default_duration) = :always THEN u.created_at
WHEN COALESCE(uo.new_topic_duration_minutes, :default_duration) = :last_visit THEN COALESCE(u.previous_visit_at,u.created_at)
ELSE (:now::timestamp - INTERVAL '1 MINUTE' * COALESCE(uo.new_topic_duration_minutes, :default_duration))
END, u.created_at, :min_date)",
treat_as_new_topic_params,
)
.where_clause
.ast
.to_sql
end
def self.treat_as_new_topic_params
{
now: DateTime.now,
last_visit: User::NewTopicDuration::LAST_VISIT,
always: User::NewTopicDuration::ALWAYS,
default_duration: SiteSetting.default_other_new_topic_duration_minutes,
min_date: Time.at(SiteSetting.min_new_topics_time).to_datetime,
}
end
def self.include_tags_in_report?
SiteSetting.tagging_enabled && (@include_tags_in_report || !SiteSetting.legacy_navigation_menu?)
end
def self.include_tags_in_report=(v)
@include_tags_in_report = v
end
# Sam: this is a hairy report, in particular I need custom joins and fancy conditions
# Dropping to sql_builder so I can make sense of it.
#
# Keep in mind, we need to be able to filter on a GROUP of users, and zero in on topic
# all our existing scope work does not do this
#
# This code needs to be VERY efficient as it is triggered via the message bus and may steal
# cycles from usual requests
def self.report(user, topic_id = nil)
tag_ids = muted_tag_ids(user)
sql = new_and_unread_sql(topic_id, user, tag_ids)
sql = tags_included_wrapped_sql(sql)
report =
DB.query(
sql + "\n\n LIMIT :max_topics",
{
user_id: user.id,
topic_id: topic_id,
min_new_topic_date: Time.at(SiteSetting.min_new_topics_time).to_datetime,
max_topics: TopicTrackingState::MAX_TOPICS,
}.merge(treat_as_new_topic_params),
)
report
end
def self.new_and_unread_sql(topic_id, user, tag_ids)
sql =
report_raw_sql(
topic_id: topic_id,
skip_unread: true,
skip_order: true,
staff: user.staff?,
admin: user.admin?,
whisperer: user.whisperer?,
user: user,
muted_tag_ids: tag_ids,
)
sql << "\nUNION ALL\n\n"
sql << report_raw_sql(
topic_id: topic_id,
skip_new: true,
skip_order: true,
staff: user.staff?,
filter_old_unread: true,
admin: user.admin?,
whisperer: user.whisperer?,
user: user,
muted_tag_ids: tag_ids,
)
end
def self.tags_included_wrapped_sql(sql)
return <<~SQL if SiteSetting.tagging_enabled && TopicTrackingState.include_tags_in_report?
WITH tags_included_cte AS (
#{sql}
)
SELECT *, (
SELECT ARRAY_AGG(name) from topic_tags
JOIN tags on tags.id = topic_tags.tag_id
WHERE topic_id = tags_included_cte.topic_id
) tags
FROM tags_included_cte
SQL
sql
end
def self.muted_tag_ids(user)
TagUser.lookup(user, :muted).pluck(:tag_id)
end
def self.report_raw_sql(
user:,
muted_tag_ids:,
topic_id: nil,
filter_old_unread: false,
skip_new: false,
skip_unread: false,
skip_order: false,
staff: false,
admin: false,
whisperer: false,
select: nil,
custom_state_filter: nil,
additional_join_sql: nil
)
unread =
if skip_unread
"1=0"
else
unread_filter_sql(whisperer: whisperer)
end
filter_old_unread_sql =
if filter_old_unread
" topics.updated_at >= us.first_unread_at AND "
else
""
end
new =
if skip_new
"1=0"
else
new_filter_sql
end
category_topic_id_column_select =
if SiteSetting.show_category_definitions_in_topic_lists
""
else
"c.topic_id AS category_topic_id,"
end
select_sql =
select ||
"
DISTINCT topics.id as topic_id,
u.id as user_id,
topics.created_at,
topics.updated_at,
#{highest_post_number_column_select(whisperer)},
last_read_post_number,
c.id as category_id,
#{category_topic_id_column_select}
tu.notification_level,
us.first_unread_at,
GREATEST(
CASE
WHEN COALESCE(uo.new_topic_duration_minutes, :default_duration) = :always THEN u.created_at
WHEN COALESCE(uo.new_topic_duration_minutes, :default_duration) = :last_visit THEN COALESCE(
u.previous_visit_at,u.created_at
)
ELSE (:now::timestamp - INTERVAL '1 MINUTE' * COALESCE(uo.new_topic_duration_minutes, :default_duration))
END, u.created_at, :min_date
) AS treat_as_new_topic_start_date"
category_filter =
if admin
""
else
append = "OR u.admin" if !admin
<<~SQL
(
NOT c.read_restricted #{append} OR c.id IN (
SELECT c2.id FROM categories c2
JOIN category_groups cg ON cg.category_id = c2.id
JOIN group_users gu ON gu.user_id = :user_id AND cg.group_id = gu.group_id
WHERE c2.read_restricted )
) AND
SQL
end
visibility_filter =
if staff
""
else
append = "OR u.admin OR u.moderator" if !staff
"(topics.visible #{append}) AND"
end
tags_filter = ""
if muted_tag_ids.present? &&
%w[always only_muted].include?(SiteSetting.remove_muted_tags_from_latest)
existing_tags_sql =
"(select array_agg(tag_id) from topic_tags where topic_tags.topic_id = topics.id)"
muted_tags_array_sql = "ARRAY[#{muted_tag_ids.join(",")}]"
if SiteSetting.remove_muted_tags_from_latest == "always"
tags_filter = <<~SQL
NOT (
COALESCE(#{existing_tags_sql}, ARRAY[]::int[]) && #{muted_tags_array_sql}
) AND
SQL
else # only muted
tags_filter = <<~SQL
NOT (
COALESCE(#{existing_tags_sql}, ARRAY[-999]) <@ #{muted_tags_array_sql}
) AND
SQL
end
end
sql = +<<~SQL
SELECT #{select_sql}
FROM topics
JOIN users u on u.id = :user_id
JOIN user_stats AS us ON us.user_id = u.id
JOIN user_options AS uo ON uo.user_id = u.id
JOIN categories c ON c.id = topics.category_id
LEFT JOIN topic_users tu ON tu.topic_id = topics.id AND tu.user_id = u.id
#{skip_new ? "" : "LEFT JOIN dismissed_topic_users ON dismissed_topic_users.topic_id = topics.id AND dismissed_topic_users.user_id = :user_id"}
#{additional_join_sql}
WHERE u.id = :user_id AND
#{filter_old_unread_sql}
topics.archetype <> 'private_message' AND
#{custom_state_filter ? custom_state_filter : "((#{unread}) OR (#{new})) AND"}
#{visibility_filter}
#{tags_filter}
topics.deleted_at IS NULL AND
#{category_filter}
NOT (
#{(skip_new && skip_unread) ? "" : "last_read_post_number IS NULL AND"}
(
topics.category_id IN (#{CategoryUser.muted_category_ids_query(user, include_direct: true).select("categories.id").to_sql})
AND tu.notification_level <= #{TopicUser.notification_levels[:regular]}
)
)
SQL
sql << " AND topics.id = :topic_id" if topic_id
sql << " ORDER BY topics.bumped_at DESC" unless skip_order
sql
end
def self.highest_post_number_column_select(whisperer)
"#{whisperer ? "topics.highest_staff_post_number AS highest_post_number" : "topics.highest_post_number"}"
end
def self.publish_read_indicator_on_write(topic_id, last_read_post_number, user_id)
topic =
Topic
.includes(:allowed_groups)
.select(:highest_post_number, :archetype, :id)
.find_by(id: topic_id)
if topic&.private_message?
groups = read_allowed_groups_of(topic)
update_topic_list_read_indicator(topic, groups, topic.highest_post_number, user_id, true)
end
end
def self.publish_read_indicator_on_read(topic_id, last_read_post_number, user_id)
topic =
Topic
.includes(:allowed_groups)
.select(:highest_post_number, :archetype, :id)
.find_by(id: topic_id)
if topic&.private_message?
groups = read_allowed_groups_of(topic)
post = Post.find_by(topic_id: topic.id, post_number: last_read_post_number)
trigger_post_read_count_update(post, groups, last_read_post_number, user_id)
update_topic_list_read_indicator(topic, groups, last_read_post_number, user_id, false)
end
end
def self.read_allowed_groups_of(topic)
topic
.allowed_groups
.joins(:group_users)
.where(publish_read_state: true)
.select("ARRAY_AGG(group_users.user_id) AS members", :name, :id)
.group("groups.id")
end
def self.update_topic_list_read_indicator(
topic,
groups,
last_read_post_number,
user_id,
write_event
)
return unless last_read_post_number == topic.highest_post_number
message = { topic_id: topic.id, show_indicator: write_event }.as_json
groups_to_update = []
groups.each do |group|
member = group.members.include?(user_id)
member_writing = (write_event && member)
non_member_reading = (!write_event && !member)
next if non_member_reading || member_writing
groups_to_update << group
end
return if groups_to_update.empty?
MessageBus.publish(
"/private-messages/unread-indicator/#{topic.id}",
message,
user_ids: groups_to_update.flat_map(&:members),
)
end
def self.trigger_post_read_count_update(post, groups, last_read_post_number, user_id)
return if !post
return if groups.empty?
opts = { readers_count: post.readers_count, reader_id: user_id }
post.publish_change_to_clients!(:read, opts)
end
def self.secure_category_group_ids(topic)
category = topic.category
return [Group::AUTO_GROUPS[:admins]] if category.nil?
if category.read_restricted
ids = [Group::AUTO_GROUPS[:admins]]
ids.push(*category.secure_group_ids)
ids.uniq
else
nil
end
end
private_class_method :secure_category_group_ids
end