DEV: Generic bulk chat import support (#28697)

* DEV: WIP generic bulk chat import support

This first iteration implements bulk import for:

* direct_messages
* chat_channels
* user_chat_channel_memberships
* chat_threads
* user_chat_thread_memberships
* chat_messages
* chat_reactions
* chat_mentions

* DEV: Refactor raw placeholder interpolation to support chat messages

This change adds support for chat message placeholder interpolation
and switches to using `Chat::Message.cook` for cooking in the interim
instead of hand-cooking chat messages like we currently do for posts

* DEV: Extend upload references import to support chat message uploads

* DEV: Explicity set chat retention

- Set both channel and dm chats to 0
- Add temporary workaround for testing only chat imports

* DEV: Compute channel and thread membership metadata

Compute and set various user channel/thread membership stats and
remove hardcoded test index seed data

* FIX: Fix chat reactions import

Allow multiple reactions on a message by a user
This commit is contained in:
Selase Krakani 2024-10-08 11:55:30 +00:00 committed by GitHub
parent 0ba7a7ecab
commit 9825bde811
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 856 additions and 35 deletions

View File

@ -110,7 +110,17 @@ class BulkImport::Base
ActiveSupport::Inflector.transliterate("test")
end
MAPPING_TYPES = Enum.new(upload: 1, badge: 2, poll: 3, poll_option: 4)
MAPPING_TYPES =
Enum.new(
upload: 1,
badge: 2,
poll: 3,
poll_option: 4,
direct_message_channel: 5,
chat_channel: 6,
chat_thread: 7,
chat_message: 8,
)
def create_migration_mappings_table
puts "Creating migration mappings table..."
@ -286,6 +296,19 @@ class BulkImport::Base
@poll_option_mapping = load_index(MAPPING_TYPES[:poll_option])
@last_poll_id = last_id(Poll)
@last_poll_option_id = last_id(PollOption)
puts "Loading chat indexes..."
@chat_direct_message_channel_mapping = load_index(MAPPING_TYPES[:direct_message_channel])
@last_chat_direct_message_channel_id = last_id(Chat::DirectMessage)
@chat_channel_mapping = load_index(MAPPING_TYPES[:chat_channel])
@last_chat_channel_id = last_id(Chat::Channel)
@chat_thread_mapping = load_index(MAPPING_TYPES[:chat_thread])
@last_chat_thread_id = last_id(Chat::Thread)
@chat_message_mapping = load_index(MAPPING_TYPES[:chat_message])
@last_chat_message_id = last_id(Chat::Message)
end
def use_bbcode_to_md?
@ -347,6 +370,26 @@ class BulkImport::Base
if @last_poll_option_id > 0
@raw_connection.exec("SELECT setval('#{PollOption.sequence_name}', #{@last_poll_option_id})")
end
if @last_chat_direct_message_channel_id > 0
@raw_connection.exec(
"SELECT setval('#{Chat::DirectMessage.sequence_name}', #{@last_chat_direct_message_channel_id})",
)
end
if @last_chat_channel_id > 0
@raw_connection.exec(
"SELECT setval('#{Chat::Channel.sequence_name}', #{@last_chat_channel_id})",
)
end
if @last_chat_thread_id > 0
@raw_connection.exec(
"SELECT setval('#{Chat::Thread.sequence_name}', #{@last_chat_thread_id})",
)
end
if @last_chat_message_id > 0
@raw_connection.exec(
"SELECT setval('#{Chat::Message.sequence_name}', #{@last_chat_message_id})",
)
end
end
def group_id_from_imported_id(id)
@ -416,6 +459,22 @@ class BulkImport::Base
@poll_option_mapping[id.to_s]&.to_i
end
def chat_channel_id_from_original_id(id)
@chat_channel_mapping[id.to_s]&.to_i
end
def chat_direct_message_channel_id_from_original_id(id)
@chat_direct_message_channel_mapping[id.to_s]&.to_i
end
def chat_thread_id_from_original_id(id)
@chat_thread_mapping[id.to_s]&.to_i
end
def chat_message_id_from_original_id(id)
@chat_message_mapping[id.to_s]&.to_i
end
GROUP_COLUMNS ||= %i[
id
name
@ -785,6 +844,82 @@ class BulkImport::Base
updated_at
]
CHAT_DIRECT_MESSAGE_CHANNEL_COLUMNS ||= %i[id group created_at updated_at]
CHAT_CHANNEL_COLUMNS ||= %i[
id
name
description
slug
status
chatable_id
chatable_type
user_count
messages_count
type
created_at
updated_at
allow_channel_wide_mentions
auto_join_users
threading_enabled
]
USER_CHAT_CHANNEL_MEMBERSHIP_COLUMNS ||= %i[
chat_channel_id
user_id
created_at
updated_at
following
muted
desktop_notification_level
mobile_notification_level
last_read_message_id
join_mode
last_viewed_at
]
DIRECT_MESSAGE_USER_COLUMNS ||= %i[direct_message_channel_id user_id created_at updated_at]
CHAT_THREAD_COLUMNS ||= %i[
id
channel_id
original_message_id
original_message_user_id
status
title
created_at
updated_at
replies_count
]
USER_CHAT_THREAD_MEMBERSHIP_COLUMNS ||= %i[
user_id
thread_id
notification_level
created_at
updated_at
]
CHAT_MESSAGE_COLUMNS ||= %i[
id
chat_channel_id
user_id
created_at
updated_at
deleted_at
deleted_by_id
in_reply_to_id
message
cooked
cooked_version
last_editor_id
thread_id
]
CHAT_MESSAGE_REACTION_COLUMNS ||= %i[chat_message_id user_id emoji created_at updated_at]
CHAT_MENTION_COLUMNS ||= %i[chat_message_id target_id type created_at updated_at]
def create_groups(rows, &block)
create_records(rows, "group", GROUP_COLUMNS, &block)
end
@ -967,6 +1102,52 @@ class BulkImport::Base
create_records(rows, "permalink", PERMALINK_COLUMNS, &block)
end
def create_chat_channels(rows, &block)
create_records_with_mapping(rows, "chat_channel", CHAT_CHANNEL_COLUMNS, &block)
end
def create_chat_direct_message(rows, &block)
create_records_with_mapping(
rows,
"direct_message_channel",
CHAT_DIRECT_MESSAGE_CHANNEL_COLUMNS,
&block
)
end
def create_user_chat_channel_memberships(rows, &block)
create_records(
rows,
"user_chat_channel_membership",
USER_CHAT_CHANNEL_MEMBERSHIP_COLUMNS,
&block
)
end
def create_direct_message_users(rows, &block)
create_records(rows, "direct_message_user", DIRECT_MESSAGE_USER_COLUMNS, &block)
end
def create_chat_threads(rows, &block)
create_records_with_mapping(rows, "chat_thread", CHAT_THREAD_COLUMNS, &block)
end
def create_thread_users(rows, &block)
create_records(rows, "user_chat_thread_membership", USER_CHAT_THREAD_MEMBERSHIP_COLUMNS, &block)
end
def create_chat_messages(rows, &block)
create_records_with_mapping(rows, "chat_message", CHAT_MESSAGE_COLUMNS, &block)
end
def create_chat_message_reactions(rows, &block)
create_records(rows, "chat_message_reaction", CHAT_MESSAGE_REACTION_COLUMNS, &block)
end
def create_chat_mentions(rows, &block)
create_records(rows, "chat_mention", CHAT_MENTION_COLUMNS, &block)
end
def process_group(group)
@groups[group[:imported_id].to_i] = group[:id] = @last_group_id += 1
@ -1634,6 +1815,136 @@ class BulkImport::Base
permalink
end
def process_direct_message_channel(chat_channel)
chat_channel[:id] = @last_chat_direct_message_channel_id += 1
chat_channel[:group] = false if chat_channel[:group].nil?
chat_channel[:created_at] ||= NOW
chat_channel[:updated_at] ||= NOW
@imported_records[chat_channel[:original_id].to_s] = chat_channel[:id]
@chat_direct_message_channel_mapping[chat_channel[:original_id].to_s] = chat_channel[:id]
chat_channel
end
def process_chat_channel(chat_channel)
chat_channel[:id] = @last_chat_channel_id += 1
if chat_channel[:name].present?
chat_channel[:name] = chat_channel[:name][0..SiteSetting.max_topic_title_length]
.scrub
.strip
.presence
chat_channel[:slug] ||= Slug.ascii_generator(chat_channel[:name])
end
chat_channel[:description] = chat_channel[:description][0..500].scrub.strip if chat_channel[
:description
].present?
chat_channel[:slug] = chat_channel[:slug][0..100] if chat_channel[:slug].present?
chat_channel[:allow_channel_wide_mentions] ||= true if chat_channel[
:allow_channel_wide_mentions
].nil?
chat_channel[:auto_join_users] ||= false if chat_channel[:auto_join_users].nil?
chat_channel[:threading_enabled] ||= false if chat_channel[:threading_enabled].nil?
chat_channel[:user_count] ||= 0
chat_channel[:messages_count] ||= 0
chat_channel[:status] ||= 0
chat_channel[:created_at] ||= NOW
chat_channel[:updated_at] ||= NOW
@imported_records[chat_channel[:original_id].to_s] = chat_channel[:id]
@chat_channel_mapping[chat_channel[:original_id].to_s] = chat_channel[:id]
chat_channel
end
def process_user_chat_channel_membership(membership)
membership[:created_at] ||= NOW
membership[:updated_at] ||= NOW
membership[:following] = false if membership[:following].nil?
membership[:muted] = false if membership[:muted].nil?
membership[
:desktop_notification_level
] ||= Chat::UserChatChannelMembership.desktop_notification_levels[:mention]
membership[
:mobile_notification_level
] ||= Chat::UserChatChannelMembership.mobile_notification_levels[:mention]
membership[:join_mode] ||= Chat::UserChatChannelMembership.join_modes[:manual]
membership
end
def process_direct_message_user(user)
user[:created_at] ||= NOW
user[:updated_at] ||= NOW
user
end
def process_chat_thread(thread)
thread[:id] = @last_chat_thread_id += 1
thread[:created_at] ||= NOW
thread[:updated_at] ||= NOW
@imported_records[thread[:original_id].to_s] = thread[:id]
@chat_thread_mapping[thread[:original_id].to_s] = thread[:id]
thread
end
def process_user_chat_thread_membership(membership)
membership[:created_at] ||= NOW
membership[:updated_at] ||= NOW
membership[:notification_level] ||= Chat::UserChatThreadMembership.notification_levels[
:tracking
]
membership[:thread_title_prompt_seen] = false if membership[:thread_title_prompt_seen].nil?
membership
end
def process_chat_message(message)
message[:id] = @last_chat_message_id += 1
message[:user_id] ||= Discourse::SYSTEM_USER_ID
message[:last_editor_id] ||= message[:user_id]
message[:message] = (message[:message] || "").scrub.strip.presence || "<Empty imported message>"
message[:message] = normalize_text(message[:message])
message[:cooked] = ::Chat::Message.cook(message[:message], user_id: message[:last_editor_id])
message[:cooked_version] = ::Chat::Message::BAKED_VERSION
message[:created_at] ||= NOW
message[:updated_at] ||= NOW
@imported_records[message[:original_id].to_s] = message[:id]
@chat_message_mapping[message[:original_id].to_s] = message[:id]
if message[:message].bytes.include?(0)
STDERR.puts "Skipping chat message with original ID #{message[:original_id]} because `message` contains null bytes"
message[:skip] = true
end
if message[:cooked].bytes.include?(0)
STDERR.puts "Skipping chat message with original ID #{message[:original_id]} because `cooked` contains null bytes"
message[:skip] = true
end
message
end
def process_chat_message_reaction(reaction)
reaction[:created_at] ||= NOW
reaction[:updated_at] ||= NOW
reaction
end
def process_chat_mention(mention)
mention[:created_at] ||= NOW
mention[:updated_at] ||= NOW
mention
end
def create_records(all_rows, name, columns, &block)
start = Time.now
imported_ids = []

View File

@ -19,6 +19,8 @@ end
class BulkImport::Generic < BulkImport::Base
AVATAR_DIRECTORY = ENV["AVATAR_DIRECTORY"]
UPLOAD_DIRECTORY = ENV["UPLOAD_DIRECTORY"]
CONTENT_UPLOAD_REFERENCE_TYPES = %w[posts chat_messages]
LAST_VIEWED_AT_PLACEHOLDER = "1970-01-01 00:00:00"
def initialize(db_path, uploads_db_path = nil)
super()
@ -98,7 +100,6 @@ class BulkImport::Generic < BulkImport::Base
import_badges
import_user_badges
import_upload_references
import_optimized_images
import_topic_users
@ -108,6 +109,23 @@ class BulkImport::Generic < BulkImport::Base
import_permalink_normalizations
import_permalinks
import_chat_direct_messages
import_chat_channels
import_chat_threads
import_chat_messages
import_user_chat_channel_memberships
import_chat_thread_users
import_chat_reactions
import_chat_mentions
update_chat_threads
update_chat_membership_metadata
import_upload_references
end
def execute_after
@ -758,8 +776,6 @@ class BulkImport::Generic < BulkImport::Base
ORDER BY topic_id, post_number, id
SQL
group_names = Group.pluck(:id, :name).to_h
create_posts(posts) do |row|
next if row["raw"].blank?
next unless (topic_id = topic_id_from_imported_id(row["topic_id"]))
@ -773,7 +789,7 @@ class BulkImport::Generic < BulkImport::Base
topic_id: topic_id,
user_id: user_id_from_imported_id(row["user_id"]),
created_at: to_datetime(row["created_at"]),
raw: post_raw(row, group_names),
raw: raw_with_placeholders_interpolated(row["raw"], row),
like_count: row["like_count"],
reply_to_post_number:
row["reply_to_post_id"] ? post_number_from_imported_id(row["reply_to_post_id"]) : nil,
@ -783,8 +799,12 @@ class BulkImport::Generic < BulkImport::Base
posts.close
end
def post_raw(row, group_names)
raw = row["raw"].dup
def group_id_name_map
@group_id_name_map ||= Group.pluck(:id, :name).to_h
end
def raw_with_placeholders_interpolated(raw, row)
raw = raw.dup
placeholders = row["placeholders"]&.then { |json| JSON.parse(json) }
if (polls = placeholders&.fetch("polls", nil))
@ -810,22 +830,7 @@ class BulkImport::Generic < BulkImport::Base
if (mentions = placeholders&.fetch("mentions", nil))
mentions.each do |mention|
name =
if mention["type"] == "user"
if mention["id"]
username_from_id(user_id_from_imported_id(mention["id"]))
elsif mention["name"]
user_id = user_id_from_original_username(mention["name"])
user_id ? username_from_id(user_id) : mention["name"]
end
elsif mention["type"] == "group"
if mention["id"]
group_id = group_id_from_imported_id(mention["id"])
group_id ? group_names[group_id] : mention["name"]
else
mention["name"]
end
end
name = resolve_mentioned_name(mention)
puts "#{mention["type"]} not found -- #{mention["placeholder"]}" unless name
raw.gsub!(mention["placeholder"], " @#{name} ")
@ -934,6 +939,40 @@ class BulkImport::Generic < BulkImport::Base
raw
end
def resolve_mentioned_name(mention)
# NOTE: original_id lookup order is important until post and chat mentions are unified
original_id = mention["target_id"] || mention["id"]
name = mention["name"]
case mention["type"]
when "user", "Chat::UserMention"
resolved_user_name(original_id, name)
when "group", "Chat::GroupMention"
resolved_group_name(original_id, name)
when "Chat::HereMention"
"here"
when "Chat::AllMention"
"all"
end
end
def resolved_user_name(original_id, name)
user_id =
if original_id
user_id_from_imported_id(original_id)
elsif name
user_id_from_original_username(name)
end
user_id ? username_from_id(user_id) : name
end
def resolved_group_name(original_id, name)
group_id = group_id_from_imported_id(original_id) if original_id
group_id ? group_id_name_map[group_id] : name
end
def process_raw(original_raw)
original_raw
end
@ -1753,28 +1792,50 @@ class BulkImport::Generic < BulkImport::Base
SQL
puts " Import took #{(Time.now - start_time).to_i} seconds."
puts "", "Importing upload references for posts..."
post_uploads = query(<<~SQL)
SELECT p.id AS post_id, u.value AS upload_id
FROM posts p,
JSON_EACH(p.upload_ids) u
import_content_upload_references("posts")
import_content_upload_references("chat_messages")
end
def import_content_upload_references(type)
if CONTENT_UPLOAD_REFERENCE_TYPES.exclude?(type)
puts " Skipping upload references import for #{type} because it's unsupported"
return
end
puts "", "Importing upload references for #{type}..."
content_uploads = query(<<~SQL)
SELECT t.id AS target_id, u.value AS upload_id
FROM #{type} t,
JSON_EACH(t.upload_ids) u
WHERE upload_ids IS NOT NULL
SQL
target_type = type.classify
existing_upload_references =
UploadReference.where(target_type: "Post").pluck(:upload_id, :target_id).to_set
UploadReference.where(target_type: target_type).pluck(:upload_id, :target_id).to_set
create_upload_references(post_uploads) do |row|
create_upload_references(content_uploads) do |row|
upload_id = upload_id_from_original_id(row["upload_id"])
post_id = post_id_from_imported_id(row["post_id"])
target_id = content_id_from_original_id(type, row["target_id"])
next unless upload_id && post_id
next unless existing_upload_references.add?([upload_id, post_id])
next unless upload_id && target_id
next unless existing_upload_references.add?([upload_id, target_id])
{ upload_id: upload_id, target_type: "Post", target_id: post_id }
{ upload_id: upload_id, target_type: target_type, target_id: target_id }
end
post_uploads.close
content_uploads.close
end
def content_id_from_original_id(type, original_id)
case type
when "posts"
post_id_from_imported_id(original_id)
when "chat_messages"
chat_message_id_from_original_id(original_id)
end
end
def update_uploaded_avatar_id
@ -2359,6 +2420,455 @@ class BulkImport::Generic < BulkImport::Base
rows.close
end
def import_chat_direct_messages
unless defined?(::Chat)
puts "", "Skipping chat direct messages, because the chat plugin is not installed."
return
end
puts "", "Importing chat direct messages..."
direct_messages = query(<<~SQL)
SELECT *
FROM chat_channels
WHERE chatable_type = 'DirectMessage'
ORDER BY id
SQL
create_chat_direct_message(direct_messages) do |row|
next if chat_direct_message_channel_id_from_original_id(row["chatable_id"]).present?
{
original_id: row["chatable_id"],
created_at: to_datetime(row["created_at"]),
group: to_boolean(row["is_group"]),
}
end
direct_messages.close
end
def import_chat_channels
unless defined?(::Chat)
puts "", "Skipping chat channels, because the chat plugin is not installed."
return
end
puts "", "Importing chat channels..."
# Ideally, wed like these to be set in `import_site_settings`,
# but since theres no way to enforce that, we're defaulting to keeping all chats
# indefinitely for now
SiteSetting.chat_channel_retention_days = 0
SiteSetting.chat_dm_retention_days = 0
channels = query(<<~SQL)
SELECT *
FROM chat_channels
ORDER BY id
SQL
create_chat_channels(channels) do |row|
next if chat_channel_id_from_original_id(row["id"]).present?
case row["chatable_type"]
when "Category"
type = "CategoryChannel"
chatable_id = category_id_from_imported_id(row["chatable_id"])
when "DirectMessage"
chatable_id = chat_direct_message_channel_id_from_original_id(row["chatable_id"])
type = "DirectMessageChannel"
end
next if !chatable_id
# TODO: Add more uniqueness checks
# Ensure no channel with same name and category exists?
{
original_id: row["id"],
name: row["name"],
description: row["description"],
slug: row["slug"],
status: row["status"],
chatable_id: chatable_id,
chatable_type: row["chatable_type"],
user_count: row["user_count"],
messages_count: row["messages_count"],
type: type,
created_at: to_datetime(row["created_at"]),
allow_channel_wide_mentions: to_boolean(row["allow_channel_wide_mentions"]),
auto_join_users: to_boolean(row["auto_join_users"]),
threading_enabled: to_boolean(row["threading_enabled"]),
}
end
channels.close
end
def import_user_chat_channel_memberships
unless defined?(::Chat)
puts "", "Skipping user chat channel memberships, because the chat plugin is not installed."
return
end
puts "", "Importing user chat channel memberships..."
channel_users = query(<<~SQL)
SELECT chat_channels.chatable_type, chat_channels.chatable_id, chat_channel_users.*
FROM chat_channel_users
JOIN chat_channels ON chat_channels.id = chat_channel_users.chat_channel_id
ORDER BY chat_channel_users.chat_channel_id
SQL
existing_members =
Chat::UserChatChannelMembership.distinct.pluck(:user_id, :chat_channel_id).to_set
create_user_chat_channel_memberships(channel_users) do |row|
user_id = user_id_from_imported_id(row["user_id"])
channel_id = chat_channel_id_from_original_id(row["chat_channel_id"])
last_read_message_id = chat_message_id_from_original_id(row["last_read_message_id"])
next if user_id.blank? || channel_id.blank?
next unless existing_members.add?([user_id, channel_id])
# `last_viewed_at` is required, if not provided, set a placeholder,
# it'll be updated in the `update_chat_membership_metadata` step
last_viewed_at = to_datetime(row["last_viewed_at"].presence || LAST_VIEWED_AT_PLACEHOLDER)
{
user_id: user_id,
chat_channel_id: channel_id,
created_at: to_datetime(row["created_at"]),
following: to_boolean(row["following"]),
muted: to_boolean(row["muted"]),
desktop_notification_level: row["desktop_notification_level"],
mobile_notification_level: row["mobile_notification_level"],
last_read_message_id: last_read_message_id,
join_mode: row["join_mode"],
last_viewed_at: last_viewed_at,
}
end
puts "", "Importing chat direct message users..."
channel_users.reset
existing_direct_message_users =
Chat::DirectMessageUser.distinct.pluck(:direct_message_channel_id, :user_id).to_set
create_direct_message_users(channel_users) do |row|
next if row["chatable_type"] != "DirectMessage"
user_id = user_id_from_imported_id(row["user_id"])
direct_message_channel_id =
chat_direct_message_channel_id_from_original_id(row["chatable_id"])
next if user_id.blank? || direct_message_channel_id.blank?
next unless existing_direct_message_users.add?([direct_message_channel_id, user_id])
{
direct_message_channel_id: direct_message_channel_id,
user_id: user_id,
created_at: to_datetime(row["created_at"]),
}
end
channel_users.close
end
def import_chat_threads
unless defined?(::Chat)
puts "", "Skipping chat threads, because the chat plugin is not installed."
return
end
puts "", "Importing chat threads..."
threads = query(<<~SQL)
SELECT *
FROM chat_threads
ORDER BY chat_channel_id, id
SQL
create_chat_threads(threads) do |row|
channel_id = chat_channel_id_from_original_id(row["chat_channel_id"])
original_message_user_id = user_id_from_imported_id(row["original_message_user_id"])
next if channel_id.blank? || original_message_user_id.blank?
# Messages aren't imported yet. Use a placeholder `original_message_id` for now.
# Actual original_message_ids will be set later after messages have been imported
placeholder_original_message_id = -1
{
original_id: row["id"],
channel_id: channel_id,
original_message_id: placeholder_original_message_id,
original_message_user_id: original_message_user_id,
status: row["status"],
title: row["title"],
created_at: to_datetime(row["created_at"]),
replies_count: row["replies_count"],
}
end
threads.close
end
def import_chat_thread_users
unless defined?(::Chat)
puts "", "Skipping chat thread users, because the chat plugin is not installed."
return
end
thread_users = query(<<~SQL)
SELECT *
FROM chat_thread_users
ORDER BY chat_thread_id, user_id
SQL
puts "", "Importing chat thread users..."
existing_members = Chat::UserChatThreadMembership.distinct.pluck(:user_id, :thread_id).to_set
create_thread_users(thread_users) do |row|
user_id = user_id_from_imported_id(row["user_id"])
thread_id = chat_thread_id_from_original_id(row["chat_thread_id"])
last_read_message_id = chat_message_id_from_original_id(row["last_read_message_id"])
next if user_id.blank? || thread_id.blank?
next unless existing_members.add?([user_id, thread_id])
{
user_id: user_id,
thread_id: thread_id,
notification_level: row["notification_level"],
created_at: to_datetime(row["created_at"]),
last_read_message_id: last_read_message_id,
}
end
thread_users.close
end
def import_chat_messages
unless defined?(::Chat)
puts "", "Skipping chat messages, because the chat plugin is not installed."
return
end
puts "", "Importing chat messages..."
messages = query(<<~SQL)
SELECT *
FROM chat_messages
ORDER BY chat_channel_id, created_at, id
SQL
create_chat_messages(messages) do |row|
channel_id = chat_channel_id_from_original_id(row["chat_channel_id"])
user_id = user_id_from_imported_id(row["user_id"])
next if channel_id.blank? || user_id.blank?
last_editor_id = user_id_from_imported_id(row["last_editor_id"])
thread_id = chat_thread_id_from_original_id(row["thread_id"])
deleted_by_id = user_id_from_imported_id(row["deleted_by_id"])
in_reply_to_id = chat_message_id_from_original_id(row["in_reply_to_id"]) # TODO: this will only work if serial ids are used
{
original_id: row["id"],
chat_channel_id: channel_id,
user_id: user_id,
thread_id: thread_id,
last_editor_id: last_editor_id,
created_at: to_datetime(row["created_at"]),
deleted_at: to_datetime(row["deleted_at"]),
deleted_by_id: deleted_by_id,
in_reply_to_id: in_reply_to_id,
message: raw_with_placeholders_interpolated(row["message"], row),
}
end
messages.close
end
def import_chat_reactions
unless defined?(::Chat)
puts "", "Skipping chat message reactions, because the chat plugin is not installed."
return
end
puts "", "Importing chat message reactions..."
reactions = query(<<~SQL)
SELECT *
FROM chat_reactions
ORDER BY chat_message_id
SQL
existing_reactions =
Chat::MessageReaction.distinct.pluck(:chat_message_id, :user_id, :emoji).to_set
create_chat_message_reactions(reactions) do |row|
next if row["emoji"].blank?
message_id = chat_message_id_from_original_id(row["chat_message_id"])
user_id = user_id_from_imported_id(row["user_id"])
next if message_id.blank? || user_id.blank?
next unless existing_reactions.add?([message_id, user_id, row["emoji"]])
# TODO: Validate emoji
{
chat_message_id: message_id,
user_id: user_id,
emoji: row["emoji"],
created_at: to_datetime(row["created_at"]),
}
end
reactions.close
end
def import_chat_mentions
unless defined?(::Chat)
puts "", "Skipping chat mentions, because the chat plugin is not installed."
return
end
puts "", "Importing chat mentions..."
mentions = query(<<~SQL)
SELECT *
FROM chat_mentions
ORDER BY chat_message_id
SQL
create_chat_mentions(mentions) do |row|
# TODO: Maybe standardize mention types, instead of requiring converter
# to set namespaced ruby classes
chat_message_id = chat_message_id_from_original_id(row["chat_message_id"])
target_id =
case row["type"]
when "Chat::AllMention", "Chat::HereMention"
nil
when "Chat::UserMention"
user_id_from_imported_id(row["target_id"])
when "Chat::GroupMention"
group_id_from_imported_id(row["target_id"])
end
next if target_id.nil? && %w[Chat::AllMention Chat::HereMention].exclude?(row["type"])
{
chat_message_id: chat_message_id,
target_id: target_id,
type: row["type"],
created_at: to_datetime(row["created_at"]),
}
end
mentions.close
end
def update_chat_threads
unless defined?(::Chat)
puts "", "Skipping chat thread updates, because the chat plugin is not installed."
return
end
puts "", "Updating chat threads..."
start_time = Time.now
DB.exec(<<~SQL)
WITH thread_info AS (
SELECT
thread_id,
MIN(id) AS original_message_id,
COUNT(id) - 1 AS replies_count,
MAX(id) AS last_message_id
FROM
chat_messages
WHERE
thread_id IS NOT NULL
GROUP BY
thread_id
)
UPDATE chat_threads
SET
original_message_id = thread_info.original_message_id,
replies_count = thread_info.replies_count,
last_message_id = thread_info.last_message_id
FROM
thread_info
WHERE
chat_threads.id = thread_info.thread_id;
SQL
puts " Update took #{(Time.now - start_time).to_i} seconds."
end
def update_chat_membership_metadata
unless defined?(::Chat)
puts "",
"Skipping chat membership metadata updates, because the chat plugin is not installed."
return
end
puts "", "Updating chat membership metadata..."
start_time = Time.now
# Ensure the user is caught up on all messages in the channel. The primary aim is to prevent
# new message indicators from showing up for imported messages. We do this by updating
# the `last_viewed_at` and `last_read_message_id` columns in `user_chat_channel_memberships`
# if they were not imported.
DB.exec(<<~SQL)
WITH latest_messages AS (
SELECT
chat_channel_id,
MAX(id) AS last_message_id,
MAX(created_at) AS last_message_created_at
FROM chat_messages
WHERE thread_id IS NULL
GROUP BY chat_channel_id
)
UPDATE user_chat_channel_memberships uccm
SET
last_read_message_id = COALESCE(uccm.last_read_message_id, lm.last_message_id),
last_viewed_at = CASE
WHEN uccm.last_viewed_at = '#{LAST_VIEWED_AT_PLACEHOLDER}'
THEN lm.last_message_created_at + INTERVAL '1 second'
ELSE uccm.last_viewed_at
END
FROM latest_messages lm
WHERE uccm.chat_channel_id = lm.chat_channel_id
SQL
# Set `last_read_message_id` in `user_chat_thread_memberships` if none is provided.
# Similar to the chat channel membership update above, this ensures the user is caught up on messages in the thread.
DB.exec(<<~SQL)
WITH latest_thread_messages AS (
SELECT
thread_id,
MAX(id) AS last_message_id
FROM chat_messages
WHERE thread_id IS NOT NULL
GROUP BY thread_id
)
UPDATE user_chat_thread_memberships utm
SET
last_read_message_id = ltm.last_message_id
FROM latest_thread_messages ltm
WHERE utm.thread_id = ltm.thread_id
AND utm.last_read_message_id IS NULL
SQL
puts " Update took #{(Time.now - start_time).to_i} seconds."
end
def calculate_external_url(row)
external_url = row["external_url"].dup
placeholders = row["external_url_placeholders"]&.then { |json| JSON.parse(json) }