discourse/script/bulk_import/uploads_importer.rb
Gerhard Schlager d725b3ca9e DEV: Add script for preprocessing uploads as part of a migration
This script preprocesses all uploads within a intermediate DB (output of converters) and uploads those files to S3. It does the same for optimized images. This speeds up migrations when you have to run them multiple times, because you only have to preprocess and upload the files once.

This script is very hacky and mostly undocumented for now. That will change in the future.
2023-12-11 16:23:07 +01:00

667 lines
20 KiB
Ruby

# frozen_string_literal: true
puts "Loading application..."
require_relative "../../config/environment"
require "etc"
require "sqlite3"
require "colored2"
# hack so that OptimizedImage.lock beliefs that it's running in a Sidekiq job
module Sidekiq
def self.server?
true
end
end
module BulkImport
class UploadsImporter
TRANSACTION_SIZE = 1000
QUEUE_SIZE = 1000
def initialize(settings_path)
@settings = YAML.load_file(settings_path, symbolize_names: true)
@settings[:path_replacements] ||= []
@root_paths = @settings[:root_paths]
@output_db = create_connection(@settings[:output_db_path])
initialize_output_db
configure_site_settings
end
def run
# disable logging for EXIFR which is used by ImageOptim
EXIFR.logger = Logger.new(nil)
if @settings[:fix_missing]
@source_db = create_connection(@settings[:output_db_path])
puts "Fixing missing uploads..."
fix_missing
else
@source_db = create_connection(@settings[:source_db_path])
puts "Uploading uploads..."
upload_files
puts "", "Creating optimized images..."
create_optimized_images if @settings[:create_optimized_images]
end
puts ""
ensure
close
end
def upload_files
queue = SizedQueue.new(QUEUE_SIZE)
consumer_threads = []
if @settings[:delete_missing_uploads]
puts "Deleting missing uploads from output database..."
@output_db.execute(<<~SQL)
DELETE FROM uploads
WHERE upload IS NULL
SQL
end
output_existing_ids = Set.new
query("SELECT id FROM uploads", @output_db).tap do |result_set|
result_set.each { |row| output_existing_ids << row["id"] }
result_set.close
end
source_existing_ids = Set.new
query("SELECT id FROM uploads", @source_db).tap do |result_set|
result_set.each { |row| source_existing_ids << row["id"] }
result_set.close
end
if (surplus_upload_ids = output_existing_ids - source_existing_ids).any?
if @settings[:delete_surplus_uploads]
puts "Deleting #{surplus_upload_ids.size} uploads from output database..."
surplus_upload_ids.each_slice(TRANSACTION_SIZE) do |ids|
placeholders = (["?"] * ids.size).join(",")
@output_db.execute(<<~SQL, ids)
DELETE FROM uploads
WHERE id IN (#{placeholders})
SQL
end
output_existing_ids -= surplus_upload_ids
else
puts "Found #{surplus_upload_ids.size} surplus uploads in output database. " \
"Run with `delete_surplus_uploads: true` to delete them."
end
surplus_upload_ids = nil
end
max_count = (source_existing_ids - output_existing_ids).size
source_existing_ids = nil
puts "Found #{output_existing_ids.size} existing uploads. #{max_count} are missing."
producer_thread =
Thread.new do
query("SELECT * FROM uploads", @source_db).tap do |result_set|
result_set.each { |row| queue << row unless output_existing_ids.include?(row["id"]) }
result_set.close
end
end
status_queue = SizedQueue.new(QUEUE_SIZE)
status_thread =
Thread.new do
error_count = 0
skipped_count = 0
current_count = 0
while !(params = status_queue.pop).nil?
begin
if params.delete(:skipped) == true
skipped_count += 1
elsif (error_message = params.delete(:error)) || params[:upload].nil?
error_count += 1
puts "", "Failed to create upload: #{params[:id]} (#{error_message})", ""
end
@output_db.execute(<<~SQL, params)
INSERT INTO uploads (id, upload, markdown, skip_reason)
VALUES (:id, :upload, :markdown, :skip_reason)
SQL
rescue StandardError => e
puts "", "Failed to insert upload: #{params[:id]} (#{e.message}))", ""
error_count += 1
end
current_count += 1
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
print "\r%7d / %7d (%s, %d skipped)" %
[current_count, max_count, error_count_text, skipped_count]
end
end
(Etc.nprocessors * @settings[:thread_count_factor]).to_i.times do |index|
consumer_threads << Thread.new do
Thread.current.name = "worker-#{index}"
store = Discourse.store
while (row = queue.pop)
begin
data_file = nil
path = nil
if row["data"].present?
data_file = Tempfile.new("discourse-upload", binmode: true)
data_file.write(row["data"])
data_file.rewind
path = data_file.path
else
relative_path = row["relative_path"]
file_exists = false
@root_paths.each do |root_path|
path = File.join(root_path, relative_path, row["filename"])
break if (file_exists = File.exist?(path))
@settings[:path_replacements].each do |from, to|
path = File.join(root_path, relative_path.sub(from, to), row["filename"])
break if (file_exists = File.exist?(path))
end
end
if !file_exists
status_queue << {
id: row["id"],
upload: nil,
skipped: true,
skip_reason: "file not found",
}
next
end
end
retry_count = 0
loop do
error_message = nil
upload =
copy_to_tempfile(path) do |file|
begin
UploadCreator.new(file, row["filename"], type: row["type"]).create_for(
Discourse::SYSTEM_USER_ID,
)
rescue StandardError => e
error_message = e.message
nil
end
end
if (upload_okay = upload.present? && upload.persisted? && upload.errors.blank?)
upload_path = store.get_path_for_upload(upload)
file_exists =
if store.external?
store.object_from_path(upload_path).exists?
else
File.exist?(File.join(store.public_dir, upload_path))
end
unless file_exists
upload.destroy
upload = nil
upload_okay = false
end
end
if upload_okay
status_queue << {
id: row["id"],
upload: upload.attributes.to_json,
markdown: UploadMarkdown.new(upload).to_markdown,
skip_reason: nil,
}
break
elsif retry_count >= 3
error_message ||= upload&.errors&.full_messages&.join(", ") || "unknown error"
status_queue << {
id: row["id"],
upload: nil,
markdown: nil,
error: "too many retries: #{error_message}",
skip_reason: "too many retries",
}
break
end
retry_count += 1
sleep 0.25 * retry_count
end
rescue StandardError => e
status_queue << {
id: row["id"],
upload: nil,
markdown: nil,
error: e.message,
skip_reason: "error",
}
ensure
data_file&.close!
end
end
end
end
producer_thread.join
queue.close
consumer_threads.each(&:join)
status_queue.close
status_thread.join
end
def fix_missing
queue = SizedQueue.new(QUEUE_SIZE)
consumer_threads = []
max_count =
@source_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL")
producer_thread =
Thread.new do
query(
"SELECT id, upload FROM uploads WHERE upload IS NOT NULL ORDER BY rowid DESC",
@source_db,
).tap do |result_set|
result_set.each { |row| queue << row }
result_set.close
end
end
status_queue = SizedQueue.new(QUEUE_SIZE)
status_thread =
Thread.new do
error_count = 0
current_count = 0
missing_count = 0
while !(result = status_queue.pop).nil?
current_count += 1
case result[:status]
when :ok
# ignore
when :error
error_count += 1
puts "Error in #{result[:id]}"
when :missing
missing_count += 1
puts "Missing #{result[:id]}"
@output_db.execute("DELETE FROM uploads WHERE id = ?", result[:id])
Upload.delete_by(id: result[:upload_id])
end
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
print "\r%7d / %7d (%s, %s missing)" %
[current_count, max_count, error_count_text, missing_count]
end
end
store = Discourse.store
(Etc.nprocessors * @settings[:thread_count_factor] * 2).to_i.times do |index|
consumer_threads << Thread.new do
Thread.current.name = "worker-#{index}"
fake_upload = OpenStruct.new(url: "")
while (row = queue.pop)
begin
upload = JSON.parse(row["upload"])
fake_upload.url = upload["url"]
path = store.get_path_for_upload(fake_upload)
file_exists =
if store.external?
store.object_from_path(path).exists?
else
File.exist?(File.join(store.public_dir, path))
end
if file_exists
status_queue << { id: row["id"], upload_id: upload["id"], status: :ok }
else
status_queue << { id: row["id"], upload_id: upload["id"], status: :missing }
end
rescue StandardError => e
puts e.message
status_queue << { id: row["id"], upload_id: upload["id"], status: :error }
end
end
end
end
producer_thread.join
queue.close
consumer_threads.each(&:join)
status_queue.close
status_thread.join
end
def create_optimized_images
init_threads = []
optimized_upload_ids = Set.new
post_upload_ids = Set.new
avatar_upload_ids = Set.new
max_count = 0
init_threads << Thread.new do
query("SELECT id FROM optimized_images", @output_db).tap do |result_set|
result_set.each { |row| optimized_upload_ids << row["id"] }
result_set.close
end
end
init_threads << Thread.new do
sql = <<~SQL
SELECT upload_ids
FROM posts
WHERE upload_ids IS NOT NULL
SQL
query(sql, @source_db).tap do |result_set|
result_set.each do |row|
JSON.parse(row["upload_ids"]).each { |id| post_upload_ids << id }
end
result_set.close
end
end
init_threads << Thread.new do
sql = <<~SQL
SELECT avatar_upload_id
FROM users
WHERE avatar_upload_id IS NOT NULL
SQL
query(sql, @source_db).tap do |result_set|
result_set.each { |row| avatar_upload_ids << row["avatar_upload_id"] }
result_set.close
end
end
init_threads << Thread.new do
max_count =
@output_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL")
end
init_threads.each(&:join)
status_queue = SizedQueue.new(QUEUE_SIZE)
status_thread =
Thread.new do
error_count = 0
current_count = 0
skipped_count = 0
while !(params = status_queue.pop).nil?
current_count += 1
case params.delete(:status)
when :ok
@output_db.execute(<<~SQL, params)
INSERT INTO optimized_images (id, optimized_images)
VALUES (:id, :optimized_images)
SQL
when :error
error_count += 1
when :skipped
skipped_count += 1
end
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
print "\r%7d / %7d (%s, %d skipped)" %
[current_count, max_count, error_count_text, skipped_count]
end
end
queue = SizedQueue.new(QUEUE_SIZE)
consumer_threads = []
producer_thread =
Thread.new do
sql = <<~SQL
SELECT id AS upload_id, upload ->> 'sha1' AS upload_sha1, markdown
FROM uploads
WHERE upload IS NOT NULL
ORDER BY rowid
SQL
query(sql, @output_db).tap do |result_set|
result_set.each do |row|
upload_id = row["upload_id"]
if optimized_upload_ids.include?(upload_id) || !row["markdown"].start_with?("![")
status_queue << { id: row["upload_id"], status: :skipped }
next
end
if post_upload_ids.include?(upload_id)
row["type"] = "post"
queue << row
elsif avatar_upload_ids.include?(upload_id)
row["type"] = "avatar"
queue << row
else
status_queue << { id: row["upload_id"], status: :skipped }
end
end
result_set.close
end
end
avatar_sizes = Discourse.avatar_sizes
store = Discourse.store
remote_factor = store.external? ? 2 : 1
Jobs.run_immediately!
(Etc.nprocessors * @settings[:thread_count_factor] * remote_factor).to_i.times do |index|
consumer_threads << Thread.new do
Thread.current.name = "worker-#{index}"
post =
PostCreator.new(
Discourse.system_user,
raw: "Topic created by uploads_importer",
acting_user: Discourse.system_user,
skip_validations: true,
title: "Topic created by uploads_importer - #{SecureRandom.hex}",
archetype: Archetype.default,
category: Category.last.id,
).create!
while (row = queue.pop)
retry_count = 0
loop do
upload = Upload.find_by(sha1: row["upload_sha1"])
optimized_images =
begin
case row["type"]
when "post"
post.update_columns(baked_at: nil, cooked: "", raw: row["markdown"])
post.reload
post.rebake!
OptimizedImage.where(upload_id: upload.id).to_a
when "avatar"
avatar_sizes.map { |size| OptimizedImage.create_for(upload, size, size) }
end
rescue StandardError => e
puts e.message
puts e.stacktrace
nil
end
begin
if optimized_images.present?
optimized_images.map! do |optimized_image|
next unless optimized_image.present?
optimized_image_path = store.get_path_for_optimized_image(optimized_image)
file_exists =
if store.external?
store.object_from_path(optimized_image_path).exists?
else
File.exist?(File.join(store.public_dir, optimized_image_path))
end
unless file_exists
optimized_image.destroy
optimized_image = nil
end
optimized_image
end
end
rescue StandardError
optimized_images = nil
end
optimized_images_okay =
!optimized_images.nil? && optimized_images.all?(&:present?) &&
optimized_images.all?(&:persisted?) &&
optimized_images.all? { |o| o.errors.blank? }
if optimized_images_okay
status_queue << {
id: row["upload_id"],
optimized_images: optimized_images.presence&.to_json,
status: :ok,
}
break
elsif retry_count >= 3
status_queue << { id: row["upload_id"], status: :error }
break
end
retry_count += 1
sleep 0.25 * retry_count
end
end
end
end
producer_thread.join
queue.close
consumer_threads.each(&:join)
status_queue.close
status_thread.join
end
private
def create_connection(path)
sqlite = SQLite3::Database.new(path, results_as_hash: true)
sqlite.busy_timeout = 60_000 # 60 seconds
sqlite.journal_mode = "WAL"
sqlite.synchronous = "off"
sqlite
end
def query(sql, db)
db.prepare(sql).execute
end
def initialize_output_db
@statement_counter = 0
@output_db.execute(<<~SQL)
CREATE TABLE IF NOT EXISTS uploads (
id TEXT PRIMARY KEY NOT NULL,
upload JSON_TEXT,
markdown TEXT,
skip_reason TEXT
)
SQL
@output_db.execute(<<~SQL)
CREATE TABLE IF NOT EXISTS optimized_images (
id TEXT PRIMARY KEY NOT NULL,
optimized_images JSON_TEXT
)
SQL
end
def insert(sql, bind_vars = [])
@output_db.transaction if @statement_counter == 0
@output_db.execute(sql, bind_vars)
if (@statement_counter += 1) > TRANSACTION_SIZE
@output_db.commit
@statement_counter = 0
end
end
def close
@source_db.close if @source_db
if @output_db
@output_db.commit if @output_db.transaction_active?
@output_db.close
end
end
def copy_to_tempfile(source_path)
extension = File.extname(source_path)
Tempfile.open(["discourse-upload", extension]) do |tmpfile|
File.open(source_path, "rb") { |source_stream| IO.copy_stream(source_stream, tmpfile) }
tmpfile.rewind
yield(tmpfile)
end
end
def configure_site_settings
settings = @settings[:site_settings]
SiteSetting.clean_up_uploads = false
SiteSetting.authorized_extensions = settings[:authorized_extensions]
SiteSetting.max_attachment_size_kb = settings[:max_attachment_size_kb]
SiteSetting.max_image_size_kb = settings[:max_image_size_kb]
if settings[:enable_s3_uploads]
SiteSetting.s3_access_key_id = settings[:s3_access_key_id]
SiteSetting.s3_secret_access_key = settings[:s3_secret_access_key]
SiteSetting.s3_upload_bucket = settings[:s3_upload_bucket]
SiteSetting.s3_region = settings[:s3_region]
SiteSetting.s3_cdn_url = settings[:s3_cdn_url]
SiteSetting.enable_s3_uploads = true
raise "Failed to enable S3 uploads" if SiteSetting.enable_s3_uploads != true
Tempfile.open("discourse-s3-test") do |tmpfile|
tmpfile.write("test")
tmpfile.rewind
upload =
UploadCreator.new(tmpfile, "discourse-s3-test.txt").create_for(
Discourse::SYSTEM_USER_ID,
)
unless upload.present? && upload.persisted? && upload.errors.blank? &&
upload.url.start_with?("//")
raise "Failed to upload to S3"
end
upload.destroy
end
end
end
end
end
# bundle exec ruby script/bulk_import/uploads_importer.rb /path/to/uploads_importer.yml
BulkImport::UploadsImporter.new(ARGV.first).run