diff --git a/Gemfile b/Gemfile index 647aefea8ad..521ef33e822 100644 --- a/Gemfile +++ b/Gemfile @@ -259,6 +259,11 @@ if ENV["IMPORT"] == "1" gem "parallel", require: false end +if ENV["GENERIC_IMPORT"] == "1" + gem "sqlite3" + gem "redcarpet" +end + gem "web-push" gem "colored2", require: false gem "maxminddb" diff --git a/script/bulk_import/base.rb b/script/bulk_import/base.rb index d92878b705b..1f78e54d258 100644 --- a/script/bulk_import/base.rb +++ b/script/bulk_import/base.rb @@ -130,7 +130,7 @@ class BulkImport::Base end def imported_ids(name) - map = [] + map = {} ids = [] @raw_connection.send_query( @@ -202,8 +202,9 @@ class BulkImport::Base puts "Loading users indexes..." @last_user_id = last_id(User) @last_user_email_id = last_id(UserEmail) - @emails = - User.unscoped.joins(:user_emails).pluck(:"user_emails.email", :"user_emails.user_id").to_h + @last_sso_record_id = last_id(SingleSignOnRecord) + @emails = UserEmail.pluck(:email, :user_id).to_h + @external_ids = SingleSignOnRecord.pluck(:external_id, :user_id).to_h @usernames_lower = User.unscoped.pluck(:username_lower).to_set @mapped_usernames = UserCustomField @@ -214,6 +215,7 @@ class BulkImport::Base puts "Loading categories indexes..." @last_category_id = last_id(Category) + @highest_category_position = Category.unscoped.maximum(:position) || 0 @category_names = Category .unscoped @@ -256,6 +258,11 @@ class BulkImport::Base if @last_user_email_id > 0 @raw_connection.exec("SELECT setval('#{UserEmail.sequence_name}', #{@last_user_email_id})") end + if @last_sso_record_id > 0 + @raw_connection.exec( + "SELECT setval('#{SingleSignOnRecord.sequence_name}', #{@last_sso_record_id})", + ) + end if @last_category_id > 0 @raw_connection.exec("SELECT setval('#{Category.sequence_name}', #{@last_category_id})") end @@ -344,6 +351,21 @@ class BulkImport::Base USER_PROFILE_COLUMNS ||= %i[user_id location website bio_raw bio_cooked views] + USER_SSO_RECORD_COLUMNS ||= %i[ + id + user_id + external_id + last_payload + created_at + updated_at + external_username + external_email + external_name + external_avatar_url + external_profile_background_url + external_card_background_url + ] + GROUP_USER_COLUMNS ||= %i[group_id user_id created_at updated_at] CATEGORY_COLUMNS ||= %i[ @@ -442,6 +464,9 @@ class BulkImport::Base def create_user_profiles(rows, &block) create_records(rows, "user_profile", USER_PROFILE_COLUMNS, &block) end + def create_single_sign_on_records(rows, &block) + create_records(rows, "single_sign_on_record", USER_SSO_RECORD_COLUMNS, &block) + end def create_group_users(rows, &block) create_records(rows, "group_user", GROUP_USER_COLUMNS, &block) end @@ -495,6 +520,15 @@ class BulkImport::Base end end + if user[:external_id].present? + if existing_user_id = @external_ids[user[:external_id]] + @pre_existing_user_ids << existing_user_id + @users[user[:imported_id].to_i] = existing_user_id + user[:skip] = true + return user + end + end + @users[user[:imported_id].to_i] = user[:id] = @last_user_id += 1 imported_username = user[:username].dup @@ -521,6 +555,9 @@ class BulkImport::Base user[:last_emailed_at] ||= NOW user[:created_at] ||= NOW user[:updated_at] ||= user[:created_at] + user[:suspended_at] ||= user[:suspended_at] + user[:suspended_till] ||= user[:suspended_till] || + (200.years.from_now if user[:suspended_at].present?) if (date_of_birth = user[:date_of_birth]).is_a?(Date) && date_of_birth.year != 1904 user[:date_of_birth] = Date.new(1904, date_of_birth.month, date_of_birth.day) @@ -539,8 +576,7 @@ class BulkImport::Base user_email[:created_at] ||= NOW user_email[:updated_at] ||= user_email[:created_at] - user_email[:email] ||= random_email - user_email[:email].downcase! + user_email[:email] = user_email[:email]&.downcase || random_email # unique email user_email[:email] = random_email until EmailAddressValidator.valid_value?( user_email[:email], @@ -577,6 +613,18 @@ class BulkImport::Base user_profile end + def process_single_sign_on_record(sso_record) + user_id = @users[sso_record[:imported_user_id].to_i] + return { skip: true } if @pre_existing_user_ids.include?(user_id) + + sso_record[:id] = @last_sso_record_id += 1 + sso_record[:user_id] = user_id + sso_record[:last_payload] ||= "" + sso_record[:created_at] = NOW + sso_record[:updated_at] = NOW + sso_record + end + def process_group_user(group_user) group_user[:created_at] = NOW group_user[:updated_at] = NOW @@ -584,6 +632,12 @@ class BulkImport::Base end def process_category(category) + if category[:existing_id].present? + @categories[category[:imported_id].to_i] = category[:existing_id] + category[:skip] = true + return category + end + category[:id] ||= @last_category_id += 1 @categories[category[:imported_id].to_i] ||= category[:id] category[:name] = category[:name][0...50].scrub.strip @@ -594,6 +648,14 @@ class BulkImport::Base category[:user_id] ||= Discourse::SYSTEM_USER_ID category[:created_at] ||= NOW category[:updated_at] ||= category[:created_at] + + if category[:position] + @highest_category_position = category[:position] if category[:position] > + @highest_category_position + else + category[:position] = @highest_category_position += 1 + end + category end @@ -633,6 +695,7 @@ class BulkImport::Base post[:raw] end end + post[:raw] = normalize_text(post[:raw]) post[:like_count] ||= 0 post[:cooked] = pre_cook post[:raw] post[:hidden] ||= false @@ -875,7 +938,7 @@ class BulkImport::Base end def fix_name(name) - name.scrub! if name.valid_encoding? == false + name.scrub! if name && !name.valid_encoding? return if name.blank? name = ActiveSupport::Inflector.transliterate(name) name.gsub!(/[^\w.-]+/, "_") diff --git a/script/bulk_import/generic_bulk.rb b/script/bulk_import/generic_bulk.rb new file mode 100644 index 00000000000..08ef615e73c --- /dev/null +++ b/script/bulk_import/generic_bulk.rb @@ -0,0 +1,367 @@ +# frozen_string_literal: true + +require_relative "base" +require "sqlite3" +require "json" + +class BulkImport::Generic < BulkImport::Base + AVATAR_DIRECTORY = ENV["AVATAR_DIRECTORY"] + UPLOAD_DIRECTORY = ENV["UPLOAD_DIRECTORY"] + + def initialize(db_path) + super() + @db = create_connection(db_path) + end + + def start + run # will call execute, and then "complete" the migration + + # Now that the migration is complete, do some more work: + + Discourse::Application.load_tasks + + puts "running 'import:ensure_consistency' rake task." + Rake::Task["import:ensure_consistency"].invoke + end + + def execute + import_categories + import_users + import_user_emails + import_single_sign_on_records + import_topics + import_posts + import_topic_allowed_users + import_likes + import_user_stats + import_tags + end + + def import_categories + puts "Importing categories..." + + categories = query(<<~SQL) + WITH RECURSIVE tree(id, parent_category_id, name, description, color, text_color, read_restricted, slug, + old_relative_url, existing_id, level, rowid) AS ( + SELECT c.id, c.parent_category_id, c.name, c.description, c.color, c.text_color, c.read_restricted, c.slug, + c.old_relative_url, c.existing_id, 0 AS level, c.ROWID + FROM categories c + WHERE c.parent_category_id IS NULL + UNION + SELECT c.id, c.parent_category_id, c.name, c.description, c.color, c.text_color, c.read_restricted, c.slug, + c.old_relative_url, c.existing_id, tree.level + 1 AS level, c.ROWID + FROM categories c, + tree + WHERE c.parent_category_id = tree.id + ) + SELECT * + FROM tree + ORDER BY level, rowid + SQL + + create_categories(categories) do |row| + { + imported_id: row["id"], + existing_id: row["existing_id"], + name: row["name"], + description: row["description"], + parent_category_id: + row["parent_category_id"] ? category_id_from_imported_id(row["parent_category_id"]) : nil, + slug: row["slug"], + } + end + end + + def import_users + puts "Importing users..." + + users = query(<<~SQL) + SELECT ROWID, * + FROM users + ORDER BY ROWID + SQL + + create_users(users) do |row| + sso_record = JSON.parse(row["sso_record"]) if row["sso_record"].present? + + if row["suspension"].present? + suspension = JSON.parse(row["suspension"]) + suspended_at = suspension["suspended_at"] + suspended_till = suspension["suspended_till"] + end + + { + imported_id: row["id"], + username: row["username"], + name: row["name"], + email: row["email"], + external_id: sso_record&.fetch("external_id"), + created_at: to_datetime(row["created_at"]), + admin: row["admin"], + moderator: row["moderator"], + suspended_at: suspended_at, + suspended_till: suspended_till, + } + end + end + + def import_user_emails + puts "", "Importing user emails..." + + users = query(<<~SQL) + SELECT ROWID, id, email, created_at + FROM users + ORDER BY ROWID + SQL + + create_user_emails(users) do |row| + { + # FIXME: using both "imported_id" and "imported_user_id" and should be replaced by just "imported_id" + imported_id: row["id"], + imported_user_id: row["id"], + email: row["email"], + created_at: to_datetime(row["created_at"]), + } + end + end + + def import_single_sign_on_records + puts "", "Importing SSO records..." + + users = query(<<~SQL) + SELECT ROWID, id, sso_record + FROM users + WHERE sso_record IS NOT NULL + ORDER BY ROWID + SQL + + create_single_sign_on_records(users) do |row| + sso_record = JSON.parse(row["sso_record"], symbolize_names: true) + # FIXME: using both "imported_id" and "imported_user_id" and should be replaced by just "imported_id" + sso_record[:imported_id] = row["id"] + sso_record[:imported_user_id] = row["id"] + sso_record + end + end + + def import_topics + puts "Importing topics..." + + topics = query(<<~SQL) + SELECT ROWID, * + FROM topics + ORDER BY ROWID + SQL + + create_topics(topics) do |row| + { + archetype: row["private_message"] ? Archetype.private_message : Archetype.default, + imported_id: row["id"], + title: row["title"], + user_id: user_id_from_imported_id(row["user_id"]), + created_at: to_datetime(row["created_at"]), + category_id: category_id_from_imported_id(row["category_id"]), + closed: to_boolean(row["closed"]), + views: row["views"], + } + end + end + + def import_topic_allowed_users + puts "Importing topic_allowed_users..." + + topics = query(<<~SQL) + SELECT ROWID, * + FROM topics + WHERE private_message IS NOT NULL + ORDER BY ROWID + SQL + + added = 0 + + create_topic_allowed_users(topics) do |row| + next unless topic_id = topic_id_from_imported_id(row["id"]) + imported_user_id = JSON.parse(row["private_message"])["user_ids"].first + user_id = user_id_from_imported_id(imported_user_id) + added += 1 + { + # FIXME: missing imported_id + topic_id: topic_id, + user_id: user_id, + } + end + + puts "", "Added #{added} topic_allowed_users records." + end + + def import_posts + puts "Importing posts..." + + posts = query(<<~SQL) + SELECT ROWID, * + FROM posts + ORDER BY topic_id, post_number + SQL + + create_posts(posts) do |row| + next if row["raw"].blank? + next unless topic_id = topic_id_from_imported_id(row["topic_id"]) + + { + imported_id: row["id"], + topic_id: topic_id, + user_id: user_id_from_imported_id(row["user_id"]), + created_at: to_datetime(row["created_at"]), + raw: row["raw"], + like_count: row["like_count"], + } + end + end + + def import_likes + puts "Importing likes..." + + @imported_likes = Set.new + + likes = query(<<~SQL) + SELECT ROWID, * + FROM likes + ORDER BY ROWID + SQL + + create_post_actions(likes) do |row| + post_id = post_id_from_imported_id(row["post_id"]) + user_id = user_id_from_imported_id(row["user_id"]) + + next if post_id.nil? || user_id.nil? + next if @imported_likes.add?([post_id, user_id]).nil? + + { + # FIXME: missing imported_id + post_id: post_id_from_imported_id(row["post_id"]), + user_id: user_id_from_imported_id(row["user_id"]), + post_action_type_id: 2, + created_at: to_datetime(row["created_at"]), + } + end + end + + def import_user_stats + puts "Importing user stats..." + + users = query(<<~SQL) + WITH posts_counts AS ( + SELECT COUNT(p.id) AS count, p.user_id + FROM posts p GROUP BY p.user_id + ), + topic_counts AS ( + SELECT COUNT(t.id) AS count, t.user_id + FROM topics t GROUP BY t.user_id + ), + first_post AS ( + SELECT MIN(p.created_at) AS created_at, p.user_id + FROM posts p GROUP BY p.user_id ORDER BY p.created_at ASC + ) + SELECT u.id AS user_id, u.created_at, pc.count AS posts, tc.count AS topics, fp.created_at AS first_post + FROM users u + JOIN posts_counts pc ON u.id = pc.user_id + JOIN topic_counts tc ON u.id = tc.user_id + JOIN first_post fp ON u.id = fp.user_id + SQL + + create_user_stats(users) do |row| + user = { + imported_id: row["user_id"], + imported_user_id: row["user_id"], + new_since: to_datetime(row["created_at"]), + post_count: row["posts"], + topic_count: row["topics"], + first_post_created_at: to_datetime(row["first_post"]), + } + + likes_received = @db.execute(<<~SQL) + SELECT COUNT(l.id) AS likes_received + FROM likes l JOIN posts p ON l.post_id = p.id + WHERE p.user_id = #{row["user_id"]} + SQL + + user[:likes_received] = row["likes_received"] if likes_received + + likes_given = @db.execute(<<~SQL) + SELECT COUNT(l.id) AS likes_given + FROM likes l + WHERE l.user_id = #{row["user_id"]} + SQL + + user[:likes_given] = row["likes_given"] if likes_given + + user + end + end + + def import_tags + puts "", "Importing tags..." + + tags = + query("SELECT id as topic_id, tags FROM topics") + .map do |r| + next unless r["tags"] + [r["topic_id"], JSON.parse(r["tags"]).uniq] + end + .compact + + tag_mapping = {} + + tags + .map(&:last) + .flatten + .compact + .uniq + .each do |tag_name| + cleaned_tag_name = DiscourseTagging.clean_tag(tag_name) + tag = Tag.find_by_name(cleaned_tag_name) || Tag.create!(name: cleaned_tag_name) + tag_mapping[tag_name] = tag.id + end + + tags_disaggregated = + tags + .map do |topic_id, tags_of_topic| + tags_of_topic.map { |t| { topic_id: topic_id, tag_id: tag_mapping.fetch(t) } } + end + .flatten + + create_topic_tags(tags_disaggregated) do |row| + next unless topic_id = topic_id_from_imported_id(row[:topic_id]) + + { topic_id: topic_id, tag_id: row[:tag_id] } + end + end + + def create_connection(path) + sqlite = SQLite3::Database.new(path, results_as_hash: true) + sqlite.busy_timeout = 60_000 # 60 seconds + sqlite.auto_vacuum = "full" + sqlite.foreign_keys = true + sqlite.journal_mode = "wal" + sqlite.synchronous = "normal" + sqlite + end + + def query(sql) + @db.prepare(sql).execute + end + + def to_date(text) + text.present? ? Date.parse(text) : nil + end + + def to_datetime(text) + text.present? ? DateTime.parse(text) : nil + end + + def to_boolean(value) + value == 1 + end +end + +BulkImport::Generic.new(ARGV.first).start