discourse/migrations/lib/uploader/tasks/uploader.rb
Selase Krakani 6c91148db8
DEV: Refactor uploads_importer script (#29292)
* DEV: Implement uploads command entrypoint

- Setup Thor UploadsCommand for CLI
- First pass at modularizing various parts of the exising `uploads_import` script

* DEV: First attempt at modularizing missing uploads fixer task

Move missing upload fix to a dedicated uploads task implementation unit

* DEV: First attempt at modularizing missing uploads uploader task

Move uploader to a dedicated uploads task implementation unit

* DEV: First attempt at modularizing missing uploads optimizer task

Move optimizer to a dedicated uploads task implementation unit

* DEV: Various follow up fixes to get optimization working

- Start threads early
- Improve "log" message formatting
- Add missing `copy_to_tempfile` method on "uploader" task

* DEV: Refactor a bit more

Deduplicate and move most of threading premitives to base task as-is

* DEV: Remove redundant condition in uploads db migration

* DEV: More deduplication

Move task retry logic to base class and tidy up other implementation
details carried over from the existing script
2024-10-31 13:31:12 +00:00

328 lines
9.5 KiB
Ruby

# frozen_string_literal: true
module Migrations::Uploader
module Tasks
class Uploader < Base
MAX_FILE_SIZE = 1.gigabyte
UploadMetadata = Struct.new(:original_filename, :origin_url, :description)
def run!
puts "", "Uploading uploads..."
process_existing_uploads
status_thread = start_status_thread
consumer_threads = start_consumer_threads
producer_thread = start_producer_thread
producer_thread.join
work_queue.close
consumer_threads.each(&:join)
status_queue.close
status_thread.join
end
private
def process_existing_uploads
delete_missing_uploads if settings[:delete_missing_uploads]
initialize_existing_ids_tracking_sets
handle_surplus_uploads if surplus_upload_ids.any?
@max_count = (@source_existing_ids - @output_existing_ids).size
@source_existing_ids = nil
puts "Found #{@output_existing_ids.size} existing uploads. #{@max_count} are missing."
end
def initialize_existing_ids_tracking_sets
@output_existing_ids = load_existing_ids(uploads_db.db, Set.new)
@source_existing_ids = load_existing_ids(intermediate_db.db, Set.new)
end
def load_existing_ids(db, set)
db.query("SELECT id FROM uploads") { |row| set << row[:id] }
set
end
def handle_surplus_uploads
if settings[:delete_surplus_uploads]
puts "Deleting #{surplus_upload_ids.size} uploads from output database..."
surplus_upload_ids.each_slice(TRANSACTION_SIZE) do |ids|
placeholders = (["?"] * ids.size).join(",")
uploads_db.db.execute(<<~SQL, ids)
DELETE FROM uploads
WHERE id IN (#{placeholders})
SQL
end
@output_existing_ids -= surplus_upload_ids
else
puts "Found #{surplus_upload_ids.size} surplus uploads in output database. " \
"Run with `delete_surplus_uploads: true` to delete them."
end
@surplus_upload_ids = nil
end
def surplus_upload_ids
@surplus_upload_ids ||= @output_existing_ids - @source_existing_ids
end
def handle_status_update(params)
@current_count += 1
begin
if params.delete(:skipped) == true
@skipped_count += 1
elsif (error_message = params.delete(:error)) || params[:upload].nil?
@error_count += 1
puts "", "Failed to create upload: #{params[:id]} (#{error_message})", ""
end
uploads_db.insert(<<~SQL, params)
INSERT INTO uploads (id, upload, markdown, skip_reason)
VALUES (:id, :upload, :markdown, :skip_reason)
SQL
rescue StandardError => e
puts "", "Failed to insert upload: #{params[:id]} (#{e.message}))", ""
@error_count += 1
end
end
def enqueue_jobs
intermediate_db
.db
.query("SELECT * FROM uploads ORDER BY id") do |row|
work_queue << row if @output_existing_ids.exclude?(row[:id])
end
end
def find_file_in_paths(row)
relative_path = row[:relative_path] || ""
settings[:root_paths].each do |root_path|
path = File.join(root_path, relative_path, row[:filename])
return path if File.exist?(path)
settings[:path_replacements].each do |from, to|
path = File.join(root_path, relative_path.sub(from, to), row[:filename])
return path if File.exist?(path)
end
end
nil
end
def handle_missing_file(row)
status_queue << { id: row[:id], upload: nil, skipped: true, skip_reason: "file not found" }
end
def process_upload(row, _)
metadata = build_metadata(row)
data_file = nil
path = nil
if row[:data].present?
data_file = Tempfile.new("discourse-upload", binmode: true)
data_file.write(row[:data])
data_file.rewind
path = data_file.path
elsif row[:url].present?
path, metadata.original_filename = download_file(url: row[:url], id: row[:id])
metadata.origin_url = row[:url]
return if !path
else
path = find_file_in_paths(row)
return handle_missing_file(row) if path.nil?
end
error_message = nil
result =
with_retries do
upload =
copy_to_tempfile(path) do |file|
begin
UploadCreator.new(
file,
metadata.original_filename,
type: row[:type],
origin: metadata.origin_url,
).create_for(Discourse::SYSTEM_USER_ID)
rescue StandardError => e
error_message = e.message
nil
end
end
if (upload_okay = upload.present? && upload.persisted? && upload.errors.blank?)
upload_path = add_multisite_prefix(discourse_store.get_path_for_upload(upload))
unless file_exists?(upload_path)
upload.destroy
upload = nil
upload_okay = false
end
end
if upload_okay
{
id: row[:id],
upload: upload.attributes.to_json,
markdown:
UploadMarkdown.new(upload).to_markdown(display_name: metadata.description),
skip_reason: nil,
}
else
error_message =
upload&.errors&.full_messages&.join(", ") || error_message || "unknown error"
nil
end
end
if result.nil?
status_queue << {
id: row[:id],
upload: nil,
markdown: nil,
error: "too many retries: #{error_message}",
skip_reason: "too many retries",
}
else
status_queue << result
end
rescue StandardError => e
status_queue << {
id: row[:id],
upload: nil,
markdown: nil,
error: e.message,
skip_reason: "error",
}
ensure
data_file&.close!
end
def build_metadata(row)
UploadMetadata.new(
original_filename: row[:display_filename] || row[:filename],
description: row[:description].presence,
)
end
def delete_missing_uploads
puts "Deleting missing uploads from uploads database..."
uploads_db.db.execute(<<~SQL)
DELETE FROM uploads
WHERE upload IS NULL
SQL
end
def download_file(url:, id:, retry_count: 0)
path = download_cache_path(id)
original_filename = nil
if File.exist?(path) && (original_filename = get_original_filename(id))
return path, original_filename
end
fd = FinalDestination.new(url)
file = nil
fd.get do |response, chunk, uri|
if file.nil?
check_response!(response, uri)
original_filename = extract_filename_from_response(response, uri)
file = File.open(path, "wb")
end
file.write(chunk)
if file.size > MAX_FILE_SIZE
file.close
file.unlink
file = nil
throw :done
end
end
if file
file.close
uploads_db.insert(
"INSERT INTO downloads (id, original_filename) VALUES (?, ?)",
[id, original_filename],
)
return path, original_filename
end
nil
end
def download_cache_path(id)
id = id.gsub("/", "_").gsub("=", "-")
File.join(settings[:download_cache_path], id)
end
def get_original_filename(id)
uploads_db.db.query_single_splat("SELECT original_filename FROM downloads WHERE id = ?", id)
end
def check_response!(response, uri)
if uri.blank?
code = response.code.to_i
if code >= 400
raise "#{code} Error"
else
throw :done
end
end
end
def extract_filename_from_response(response, uri)
filename =
if (header = response.header["Content-Disposition"].presence)
disposition_filename =
header[/filename\*=UTF-8''(\S+)\b/i, 1] || header[/filename=(?:"(.+)"|[^\s;]+)/i, 1]
if disposition_filename.present?
URI.decode_www_form_component(disposition_filename)
else
nil
end
end
filename = File.basename(uri.path).presence || "file" if filename.blank?
if File.extname(filename).blank? && response.content_type.present?
ext = MiniMime.lookup_by_content_type(response.content_type)&.extension
filename = "#{filename}.#{ext}" if ext.present?
end
filename
end
def copy_to_tempfile(source_path)
extension = File.extname(source_path)
Tempfile.open(["discourse-upload", extension]) do |tmpfile|
File.open(source_path, "rb") { |source_stream| IO.copy_stream(source_stream, tmpfile) }
tmpfile.rewind
yield(tmpfile)
end
end
def log_status
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
print "\r%7d / %7d (%s, %s skipped)" %
[current_count, @max_count, error_count_text, skipped_count]
end
end
end
end