mirror of
https://github.com/discourse/discourse.git
synced 2025-01-18 20:52:46 +08:00
241bf48497
Previously only Sidekiq was allowed to generate more than one optimized image at the same time per machine. This adds an easy mechanism to allow the same in rake tasks and other tools.
663 lines
20 KiB
Ruby
663 lines
20 KiB
Ruby
# frozen_string_literal: true
|
|
puts "Loading application..."
|
|
require_relative "../../config/environment"
|
|
|
|
require "etc"
|
|
require "sqlite3"
|
|
require "colored2"
|
|
|
|
module BulkImport
|
|
class UploadsImporter
|
|
TRANSACTION_SIZE = 1000
|
|
QUEUE_SIZE = 1000
|
|
|
|
def initialize(settings_path)
|
|
@settings = YAML.load_file(settings_path, symbolize_names: true)
|
|
@settings[:path_replacements] ||= []
|
|
|
|
@root_paths = @settings[:root_paths]
|
|
@output_db = create_connection(@settings[:output_db_path])
|
|
|
|
initialize_output_db
|
|
configure_site_settings
|
|
end
|
|
|
|
def run
|
|
# disable logging for EXIFR which is used by ImageOptim
|
|
EXIFR.logger = Logger.new(nil)
|
|
|
|
if @settings[:fix_missing]
|
|
@source_db = create_connection(@settings[:output_db_path])
|
|
|
|
puts "Fixing missing uploads..."
|
|
fix_missing
|
|
else
|
|
@source_db = create_connection(@settings[:source_db_path])
|
|
|
|
puts "Uploading uploads..."
|
|
upload_files
|
|
|
|
puts "", "Creating optimized images..."
|
|
create_optimized_images if @settings[:create_optimized_images]
|
|
end
|
|
puts ""
|
|
ensure
|
|
close
|
|
end
|
|
|
|
def upload_files
|
|
queue = SizedQueue.new(QUEUE_SIZE)
|
|
consumer_threads = []
|
|
|
|
if @settings[:delete_missing_uploads]
|
|
puts "Deleting missing uploads from output database..."
|
|
@output_db.execute(<<~SQL)
|
|
DELETE FROM uploads
|
|
WHERE upload IS NULL
|
|
SQL
|
|
end
|
|
|
|
output_existing_ids = Set.new
|
|
query("SELECT id FROM uploads", @output_db).tap do |result_set|
|
|
result_set.each { |row| output_existing_ids << row["id"] }
|
|
result_set.close
|
|
end
|
|
|
|
source_existing_ids = Set.new
|
|
query("SELECT id FROM uploads", @source_db).tap do |result_set|
|
|
result_set.each { |row| source_existing_ids << row["id"] }
|
|
result_set.close
|
|
end
|
|
|
|
if (surplus_upload_ids = output_existing_ids - source_existing_ids).any?
|
|
if @settings[:delete_surplus_uploads]
|
|
puts "Deleting #{surplus_upload_ids.size} uploads from output database..."
|
|
|
|
surplus_upload_ids.each_slice(TRANSACTION_SIZE) do |ids|
|
|
placeholders = (["?"] * ids.size).join(",")
|
|
@output_db.execute(<<~SQL, ids)
|
|
DELETE FROM uploads
|
|
WHERE id IN (#{placeholders})
|
|
SQL
|
|
end
|
|
|
|
output_existing_ids -= surplus_upload_ids
|
|
else
|
|
puts "Found #{surplus_upload_ids.size} surplus uploads in output database. " \
|
|
"Run with `delete_surplus_uploads: true` to delete them."
|
|
end
|
|
|
|
surplus_upload_ids = nil
|
|
end
|
|
|
|
max_count = (source_existing_ids - output_existing_ids).size
|
|
source_existing_ids = nil
|
|
puts "Found #{output_existing_ids.size} existing uploads. #{max_count} are missing."
|
|
|
|
producer_thread =
|
|
Thread.new do
|
|
query("SELECT * FROM uploads", @source_db).tap do |result_set|
|
|
result_set.each { |row| queue << row unless output_existing_ids.include?(row["id"]) }
|
|
result_set.close
|
|
end
|
|
end
|
|
|
|
status_queue = SizedQueue.new(QUEUE_SIZE)
|
|
status_thread =
|
|
Thread.new do
|
|
error_count = 0
|
|
skipped_count = 0
|
|
current_count = 0
|
|
|
|
while !(params = status_queue.pop).nil?
|
|
begin
|
|
if params.delete(:skipped) == true
|
|
skipped_count += 1
|
|
elsif (error_message = params.delete(:error)) || params[:upload].nil?
|
|
error_count += 1
|
|
puts "", "Failed to create upload: #{params[:id]} (#{error_message})", ""
|
|
end
|
|
|
|
@output_db.execute(<<~SQL, params)
|
|
INSERT INTO uploads (id, upload, markdown, skip_reason)
|
|
VALUES (:id, :upload, :markdown, :skip_reason)
|
|
SQL
|
|
rescue StandardError => e
|
|
puts "", "Failed to insert upload: #{params[:id]} (#{e.message}))", ""
|
|
error_count += 1
|
|
end
|
|
|
|
current_count += 1
|
|
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
|
|
|
|
print "\r%7d / %7d (%s, %d skipped)" %
|
|
[current_count, max_count, error_count_text, skipped_count]
|
|
end
|
|
end
|
|
|
|
(Etc.nprocessors * @settings[:thread_count_factor]).to_i.times do |index|
|
|
consumer_threads << Thread.new do
|
|
Thread.current.name = "worker-#{index}"
|
|
|
|
store = Discourse.store
|
|
|
|
while (row = queue.pop)
|
|
begin
|
|
data_file = nil
|
|
path = nil
|
|
|
|
if row["data"].present?
|
|
data_file = Tempfile.new("discourse-upload", binmode: true)
|
|
data_file.write(row["data"])
|
|
data_file.rewind
|
|
path = data_file.path
|
|
else
|
|
relative_path = row["relative_path"]
|
|
file_exists = false
|
|
|
|
@root_paths.each do |root_path|
|
|
path = File.join(root_path, relative_path, row["filename"])
|
|
break if (file_exists = File.exist?(path))
|
|
|
|
@settings[:path_replacements].each do |from, to|
|
|
path = File.join(root_path, relative_path.sub(from, to), row["filename"])
|
|
break if (file_exists = File.exist?(path))
|
|
end
|
|
end
|
|
|
|
if !file_exists
|
|
status_queue << {
|
|
id: row["id"],
|
|
upload: nil,
|
|
skipped: true,
|
|
skip_reason: "file not found",
|
|
}
|
|
next
|
|
end
|
|
end
|
|
|
|
retry_count = 0
|
|
|
|
loop do
|
|
error_message = nil
|
|
upload =
|
|
copy_to_tempfile(path) do |file|
|
|
begin
|
|
UploadCreator.new(file, row["filename"], type: row["type"]).create_for(
|
|
Discourse::SYSTEM_USER_ID,
|
|
)
|
|
rescue StandardError => e
|
|
error_message = e.message
|
|
nil
|
|
end
|
|
end
|
|
|
|
if (upload_okay = upload.present? && upload.persisted? && upload.errors.blank?)
|
|
upload_path = store.get_path_for_upload(upload)
|
|
|
|
file_exists =
|
|
if store.external?
|
|
store.object_from_path(upload_path).exists?
|
|
else
|
|
File.exist?(File.join(store.public_dir, upload_path))
|
|
end
|
|
|
|
unless file_exists
|
|
upload.destroy
|
|
upload = nil
|
|
upload_okay = false
|
|
end
|
|
end
|
|
|
|
if upload_okay
|
|
status_queue << {
|
|
id: row["id"],
|
|
upload: upload.attributes.to_json,
|
|
markdown: UploadMarkdown.new(upload).to_markdown,
|
|
skip_reason: nil,
|
|
}
|
|
break
|
|
elsif retry_count >= 3
|
|
error_message ||= upload&.errors&.full_messages&.join(", ") || "unknown error"
|
|
status_queue << {
|
|
id: row["id"],
|
|
upload: nil,
|
|
markdown: nil,
|
|
error: "too many retries: #{error_message}",
|
|
skip_reason: "too many retries",
|
|
}
|
|
break
|
|
end
|
|
|
|
retry_count += 1
|
|
sleep 0.25 * retry_count
|
|
end
|
|
rescue StandardError => e
|
|
status_queue << {
|
|
id: row["id"],
|
|
upload: nil,
|
|
markdown: nil,
|
|
error: e.message,
|
|
skip_reason: "error",
|
|
}
|
|
ensure
|
|
data_file&.close!
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
producer_thread.join
|
|
queue.close
|
|
consumer_threads.each(&:join)
|
|
status_queue.close
|
|
status_thread.join
|
|
end
|
|
|
|
def fix_missing
|
|
queue = SizedQueue.new(QUEUE_SIZE)
|
|
consumer_threads = []
|
|
|
|
max_count =
|
|
@source_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL")
|
|
|
|
producer_thread =
|
|
Thread.new do
|
|
query(
|
|
"SELECT id, upload FROM uploads WHERE upload IS NOT NULL ORDER BY rowid DESC",
|
|
@source_db,
|
|
).tap do |result_set|
|
|
result_set.each { |row| queue << row }
|
|
result_set.close
|
|
end
|
|
end
|
|
|
|
status_queue = SizedQueue.new(QUEUE_SIZE)
|
|
status_thread =
|
|
Thread.new do
|
|
error_count = 0
|
|
current_count = 0
|
|
missing_count = 0
|
|
|
|
while !(result = status_queue.pop).nil?
|
|
current_count += 1
|
|
|
|
case result[:status]
|
|
when :ok
|
|
# ignore
|
|
when :error
|
|
error_count += 1
|
|
puts "Error in #{result[:id]}"
|
|
when :missing
|
|
missing_count += 1
|
|
puts "Missing #{result[:id]}"
|
|
|
|
@output_db.execute("DELETE FROM uploads WHERE id = ?", result[:id])
|
|
Upload.delete_by(id: result[:upload_id])
|
|
end
|
|
|
|
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
|
|
|
|
print "\r%7d / %7d (%s, %s missing)" %
|
|
[current_count, max_count, error_count_text, missing_count]
|
|
end
|
|
end
|
|
|
|
store = Discourse.store
|
|
|
|
(Etc.nprocessors * @settings[:thread_count_factor] * 2).to_i.times do |index|
|
|
consumer_threads << Thread.new do
|
|
Thread.current.name = "worker-#{index}"
|
|
fake_upload = OpenStruct.new(url: "")
|
|
while (row = queue.pop)
|
|
begin
|
|
upload = JSON.parse(row["upload"])
|
|
fake_upload.url = upload["url"]
|
|
path = store.get_path_for_upload(fake_upload)
|
|
|
|
file_exists =
|
|
if store.external?
|
|
store.object_from_path(path).exists?
|
|
else
|
|
File.exist?(File.join(store.public_dir, path))
|
|
end
|
|
|
|
if file_exists
|
|
status_queue << { id: row["id"], upload_id: upload["id"], status: :ok }
|
|
else
|
|
status_queue << { id: row["id"], upload_id: upload["id"], status: :missing }
|
|
end
|
|
rescue StandardError => e
|
|
puts e.message
|
|
status_queue << { id: row["id"], upload_id: upload["id"], status: :error }
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
producer_thread.join
|
|
queue.close
|
|
consumer_threads.each(&:join)
|
|
status_queue.close
|
|
status_thread.join
|
|
end
|
|
|
|
def create_optimized_images
|
|
init_threads = []
|
|
optimized_upload_ids = Set.new
|
|
post_upload_ids = Set.new
|
|
avatar_upload_ids = Set.new
|
|
max_count = 0
|
|
|
|
# allow more than 1 thread to optimized images at the same time
|
|
OptimizedImage.lock_per_machine = false
|
|
|
|
init_threads << Thread.new do
|
|
query("SELECT id FROM optimized_images", @output_db).tap do |result_set|
|
|
result_set.each { |row| optimized_upload_ids << row["id"] }
|
|
result_set.close
|
|
end
|
|
end
|
|
|
|
init_threads << Thread.new do
|
|
sql = <<~SQL
|
|
SELECT upload_ids
|
|
FROM posts
|
|
WHERE upload_ids IS NOT NULL
|
|
SQL
|
|
query(sql, @source_db).tap do |result_set|
|
|
result_set.each do |row|
|
|
JSON.parse(row["upload_ids"]).each { |id| post_upload_ids << id }
|
|
end
|
|
result_set.close
|
|
end
|
|
end
|
|
|
|
init_threads << Thread.new do
|
|
sql = <<~SQL
|
|
SELECT avatar_upload_id
|
|
FROM users
|
|
WHERE avatar_upload_id IS NOT NULL
|
|
SQL
|
|
query(sql, @source_db).tap do |result_set|
|
|
result_set.each { |row| avatar_upload_ids << row["avatar_upload_id"] }
|
|
result_set.close
|
|
end
|
|
end
|
|
|
|
init_threads << Thread.new do
|
|
max_count =
|
|
@output_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL")
|
|
end
|
|
|
|
init_threads.each(&:join)
|
|
|
|
status_queue = SizedQueue.new(QUEUE_SIZE)
|
|
status_thread =
|
|
Thread.new do
|
|
error_count = 0
|
|
current_count = 0
|
|
skipped_count = 0
|
|
|
|
while !(params = status_queue.pop).nil?
|
|
current_count += 1
|
|
|
|
case params.delete(:status)
|
|
when :ok
|
|
@output_db.execute(<<~SQL, params)
|
|
INSERT INTO optimized_images (id, optimized_images)
|
|
VALUES (:id, :optimized_images)
|
|
SQL
|
|
when :error
|
|
error_count += 1
|
|
when :skipped
|
|
skipped_count += 1
|
|
end
|
|
|
|
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
|
|
|
|
print "\r%7d / %7d (%s, %d skipped)" %
|
|
[current_count, max_count, error_count_text, skipped_count]
|
|
end
|
|
end
|
|
|
|
queue = SizedQueue.new(QUEUE_SIZE)
|
|
consumer_threads = []
|
|
|
|
producer_thread =
|
|
Thread.new do
|
|
sql = <<~SQL
|
|
SELECT id AS upload_id, upload ->> 'sha1' AS upload_sha1, markdown
|
|
FROM uploads
|
|
WHERE upload IS NOT NULL
|
|
ORDER BY rowid
|
|
SQL
|
|
|
|
query(sql, @output_db).tap do |result_set|
|
|
result_set.each do |row|
|
|
upload_id = row["upload_id"]
|
|
|
|
if optimized_upload_ids.include?(upload_id) || !row["markdown"].start_with?("![")
|
|
status_queue << { id: row["upload_id"], status: :skipped }
|
|
next
|
|
end
|
|
|
|
if post_upload_ids.include?(upload_id)
|
|
row["type"] = "post"
|
|
queue << row
|
|
elsif avatar_upload_ids.include?(upload_id)
|
|
row["type"] = "avatar"
|
|
queue << row
|
|
else
|
|
status_queue << { id: row["upload_id"], status: :skipped }
|
|
end
|
|
end
|
|
result_set.close
|
|
end
|
|
end
|
|
|
|
avatar_sizes = Discourse.avatar_sizes
|
|
store = Discourse.store
|
|
remote_factor = store.external? ? 2 : 1
|
|
|
|
Jobs.run_immediately!
|
|
|
|
(Etc.nprocessors * @settings[:thread_count_factor] * remote_factor).to_i.times do |index|
|
|
consumer_threads << Thread.new do
|
|
Thread.current.name = "worker-#{index}"
|
|
|
|
post =
|
|
PostCreator.new(
|
|
Discourse.system_user,
|
|
raw: "Topic created by uploads_importer",
|
|
acting_user: Discourse.system_user,
|
|
skip_validations: true,
|
|
title: "Topic created by uploads_importer - #{SecureRandom.hex}",
|
|
archetype: Archetype.default,
|
|
category: Category.last.id,
|
|
).create!
|
|
|
|
while (row = queue.pop)
|
|
retry_count = 0
|
|
|
|
loop do
|
|
upload = Upload.find_by(sha1: row["upload_sha1"])
|
|
|
|
optimized_images =
|
|
begin
|
|
case row["type"]
|
|
when "post"
|
|
post.update_columns(baked_at: nil, cooked: "", raw: row["markdown"])
|
|
post.reload
|
|
post.rebake!
|
|
OptimizedImage.where(upload_id: upload.id).to_a
|
|
when "avatar"
|
|
avatar_sizes.map { |size| OptimizedImage.create_for(upload, size, size) }
|
|
end
|
|
rescue StandardError => e
|
|
puts e.message
|
|
puts e.stacktrace
|
|
nil
|
|
end
|
|
|
|
begin
|
|
if optimized_images.present?
|
|
optimized_images.map! do |optimized_image|
|
|
next unless optimized_image.present?
|
|
optimized_image_path = store.get_path_for_optimized_image(optimized_image)
|
|
|
|
file_exists =
|
|
if store.external?
|
|
store.object_from_path(optimized_image_path).exists?
|
|
else
|
|
File.exist?(File.join(store.public_dir, optimized_image_path))
|
|
end
|
|
|
|
unless file_exists
|
|
optimized_image.destroy
|
|
optimized_image = nil
|
|
end
|
|
|
|
optimized_image
|
|
end
|
|
end
|
|
rescue StandardError
|
|
optimized_images = nil
|
|
end
|
|
|
|
optimized_images_okay =
|
|
!optimized_images.nil? && optimized_images.all?(&:present?) &&
|
|
optimized_images.all?(&:persisted?) &&
|
|
optimized_images.all? { |o| o.errors.blank? }
|
|
|
|
if optimized_images_okay
|
|
status_queue << {
|
|
id: row["upload_id"],
|
|
optimized_images: optimized_images.presence&.to_json,
|
|
status: :ok,
|
|
}
|
|
break
|
|
elsif retry_count >= 3
|
|
status_queue << { id: row["upload_id"], status: :error }
|
|
break
|
|
end
|
|
|
|
retry_count += 1
|
|
sleep 0.25 * retry_count
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
producer_thread.join
|
|
queue.close
|
|
consumer_threads.each(&:join)
|
|
status_queue.close
|
|
status_thread.join
|
|
end
|
|
|
|
private
|
|
|
|
def create_connection(path)
|
|
sqlite = SQLite3::Database.new(path, results_as_hash: true)
|
|
sqlite.busy_timeout = 60_000 # 60 seconds
|
|
sqlite.journal_mode = "WAL"
|
|
sqlite.synchronous = "off"
|
|
sqlite
|
|
end
|
|
|
|
def query(sql, db)
|
|
db.prepare(sql).execute
|
|
end
|
|
|
|
def initialize_output_db
|
|
@statement_counter = 0
|
|
|
|
@output_db.execute(<<~SQL)
|
|
CREATE TABLE IF NOT EXISTS uploads (
|
|
id TEXT PRIMARY KEY NOT NULL,
|
|
upload JSON_TEXT,
|
|
markdown TEXT,
|
|
skip_reason TEXT
|
|
)
|
|
SQL
|
|
|
|
@output_db.execute(<<~SQL)
|
|
CREATE TABLE IF NOT EXISTS optimized_images (
|
|
id TEXT PRIMARY KEY NOT NULL,
|
|
optimized_images JSON_TEXT
|
|
)
|
|
SQL
|
|
end
|
|
|
|
def insert(sql, bind_vars = [])
|
|
@output_db.transaction if @statement_counter == 0
|
|
@output_db.execute(sql, bind_vars)
|
|
|
|
if (@statement_counter += 1) > TRANSACTION_SIZE
|
|
@output_db.commit
|
|
@statement_counter = 0
|
|
end
|
|
end
|
|
|
|
def close
|
|
@source_db.close if @source_db
|
|
|
|
if @output_db
|
|
@output_db.commit if @output_db.transaction_active?
|
|
@output_db.close
|
|
end
|
|
end
|
|
|
|
def copy_to_tempfile(source_path)
|
|
extension = File.extname(source_path)
|
|
|
|
Tempfile.open(["discourse-upload", extension]) do |tmpfile|
|
|
File.open(source_path, "rb") { |source_stream| IO.copy_stream(source_stream, tmpfile) }
|
|
tmpfile.rewind
|
|
yield(tmpfile)
|
|
end
|
|
end
|
|
|
|
def configure_site_settings
|
|
settings = @settings[:site_settings]
|
|
|
|
SiteSetting.clean_up_uploads = false
|
|
SiteSetting.authorized_extensions = settings[:authorized_extensions]
|
|
SiteSetting.max_attachment_size_kb = settings[:max_attachment_size_kb]
|
|
SiteSetting.max_image_size_kb = settings[:max_image_size_kb]
|
|
|
|
if settings[:enable_s3_uploads]
|
|
SiteSetting.s3_access_key_id = settings[:s3_access_key_id]
|
|
SiteSetting.s3_secret_access_key = settings[:s3_secret_access_key]
|
|
SiteSetting.s3_upload_bucket = settings[:s3_upload_bucket]
|
|
SiteSetting.s3_region = settings[:s3_region]
|
|
SiteSetting.s3_cdn_url = settings[:s3_cdn_url]
|
|
SiteSetting.enable_s3_uploads = true
|
|
|
|
raise "Failed to enable S3 uploads" if SiteSetting.enable_s3_uploads != true
|
|
|
|
Tempfile.open("discourse-s3-test") do |tmpfile|
|
|
tmpfile.write("test")
|
|
tmpfile.rewind
|
|
|
|
upload =
|
|
UploadCreator.new(tmpfile, "discourse-s3-test.txt").create_for(
|
|
Discourse::SYSTEM_USER_ID,
|
|
)
|
|
|
|
unless upload.present? && upload.persisted? && upload.errors.blank? &&
|
|
upload.url.start_with?("//")
|
|
raise "Failed to upload to S3"
|
|
end
|
|
|
|
upload.destroy
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
# bundle exec ruby script/bulk_import/uploads_importer.rb /path/to/uploads_importer.yml
|
|
BulkImport::UploadsImporter.new(ARGV.first).run
|