mirror of
https://github.com/discourse/discourse.git
synced 2025-01-04 21:16:19 +08:00
6c91148db8
* DEV: Implement uploads command entrypoint - Setup Thor UploadsCommand for CLI - First pass at modularizing various parts of the exising `uploads_import` script * DEV: First attempt at modularizing missing uploads fixer task Move missing upload fix to a dedicated uploads task implementation unit * DEV: First attempt at modularizing missing uploads uploader task Move uploader to a dedicated uploads task implementation unit * DEV: First attempt at modularizing missing uploads optimizer task Move optimizer to a dedicated uploads task implementation unit * DEV: Various follow up fixes to get optimization working - Start threads early - Improve "log" message formatting - Add missing `copy_to_tempfile` method on "uploader" task * DEV: Refactor a bit more Deduplicate and move most of threading premitives to base task as-is * DEV: Remove redundant condition in uploads db migration * DEV: More deduplication Move task retry logic to base class and tidy up other implementation details carried over from the existing script
328 lines
9.5 KiB
Ruby
328 lines
9.5 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
module Migrations::Uploader
|
|
module Tasks
|
|
class Uploader < Base
|
|
MAX_FILE_SIZE = 1.gigabyte
|
|
|
|
UploadMetadata = Struct.new(:original_filename, :origin_url, :description)
|
|
|
|
def run!
|
|
puts "", "Uploading uploads..."
|
|
|
|
process_existing_uploads
|
|
|
|
status_thread = start_status_thread
|
|
consumer_threads = start_consumer_threads
|
|
producer_thread = start_producer_thread
|
|
|
|
producer_thread.join
|
|
work_queue.close
|
|
consumer_threads.each(&:join)
|
|
status_queue.close
|
|
status_thread.join
|
|
end
|
|
|
|
private
|
|
|
|
def process_existing_uploads
|
|
delete_missing_uploads if settings[:delete_missing_uploads]
|
|
initialize_existing_ids_tracking_sets
|
|
handle_surplus_uploads if surplus_upload_ids.any?
|
|
|
|
@max_count = (@source_existing_ids - @output_existing_ids).size
|
|
@source_existing_ids = nil
|
|
|
|
puts "Found #{@output_existing_ids.size} existing uploads. #{@max_count} are missing."
|
|
end
|
|
|
|
def initialize_existing_ids_tracking_sets
|
|
@output_existing_ids = load_existing_ids(uploads_db.db, Set.new)
|
|
@source_existing_ids = load_existing_ids(intermediate_db.db, Set.new)
|
|
end
|
|
|
|
def load_existing_ids(db, set)
|
|
db.query("SELECT id FROM uploads") { |row| set << row[:id] }
|
|
|
|
set
|
|
end
|
|
|
|
def handle_surplus_uploads
|
|
if settings[:delete_surplus_uploads]
|
|
puts "Deleting #{surplus_upload_ids.size} uploads from output database..."
|
|
|
|
surplus_upload_ids.each_slice(TRANSACTION_SIZE) do |ids|
|
|
placeholders = (["?"] * ids.size).join(",")
|
|
uploads_db.db.execute(<<~SQL, ids)
|
|
DELETE FROM uploads
|
|
WHERE id IN (#{placeholders})
|
|
SQL
|
|
end
|
|
|
|
@output_existing_ids -= surplus_upload_ids
|
|
else
|
|
puts "Found #{surplus_upload_ids.size} surplus uploads in output database. " \
|
|
"Run with `delete_surplus_uploads: true` to delete them."
|
|
end
|
|
|
|
@surplus_upload_ids = nil
|
|
end
|
|
|
|
def surplus_upload_ids
|
|
@surplus_upload_ids ||= @output_existing_ids - @source_existing_ids
|
|
end
|
|
|
|
def handle_status_update(params)
|
|
@current_count += 1
|
|
|
|
begin
|
|
if params.delete(:skipped) == true
|
|
@skipped_count += 1
|
|
elsif (error_message = params.delete(:error)) || params[:upload].nil?
|
|
@error_count += 1
|
|
puts "", "Failed to create upload: #{params[:id]} (#{error_message})", ""
|
|
end
|
|
|
|
uploads_db.insert(<<~SQL, params)
|
|
INSERT INTO uploads (id, upload, markdown, skip_reason)
|
|
VALUES (:id, :upload, :markdown, :skip_reason)
|
|
SQL
|
|
rescue StandardError => e
|
|
puts "", "Failed to insert upload: #{params[:id]} (#{e.message}))", ""
|
|
@error_count += 1
|
|
end
|
|
end
|
|
|
|
def enqueue_jobs
|
|
intermediate_db
|
|
.db
|
|
.query("SELECT * FROM uploads ORDER BY id") do |row|
|
|
work_queue << row if @output_existing_ids.exclude?(row[:id])
|
|
end
|
|
end
|
|
|
|
def find_file_in_paths(row)
|
|
relative_path = row[:relative_path] || ""
|
|
|
|
settings[:root_paths].each do |root_path|
|
|
path = File.join(root_path, relative_path, row[:filename])
|
|
|
|
return path if File.exist?(path)
|
|
|
|
settings[:path_replacements].each do |from, to|
|
|
path = File.join(root_path, relative_path.sub(from, to), row[:filename])
|
|
|
|
return path if File.exist?(path)
|
|
end
|
|
end
|
|
|
|
nil
|
|
end
|
|
|
|
def handle_missing_file(row)
|
|
status_queue << { id: row[:id], upload: nil, skipped: true, skip_reason: "file not found" }
|
|
end
|
|
|
|
def process_upload(row, _)
|
|
metadata = build_metadata(row)
|
|
data_file = nil
|
|
path = nil
|
|
|
|
if row[:data].present?
|
|
data_file = Tempfile.new("discourse-upload", binmode: true)
|
|
data_file.write(row[:data])
|
|
data_file.rewind
|
|
path = data_file.path
|
|
elsif row[:url].present?
|
|
path, metadata.original_filename = download_file(url: row[:url], id: row[:id])
|
|
metadata.origin_url = row[:url]
|
|
return if !path
|
|
else
|
|
path = find_file_in_paths(row)
|
|
return handle_missing_file(row) if path.nil?
|
|
end
|
|
|
|
error_message = nil
|
|
result =
|
|
with_retries do
|
|
upload =
|
|
copy_to_tempfile(path) do |file|
|
|
begin
|
|
UploadCreator.new(
|
|
file,
|
|
metadata.original_filename,
|
|
type: row[:type],
|
|
origin: metadata.origin_url,
|
|
).create_for(Discourse::SYSTEM_USER_ID)
|
|
rescue StandardError => e
|
|
error_message = e.message
|
|
nil
|
|
end
|
|
end
|
|
|
|
if (upload_okay = upload.present? && upload.persisted? && upload.errors.blank?)
|
|
upload_path = add_multisite_prefix(discourse_store.get_path_for_upload(upload))
|
|
|
|
unless file_exists?(upload_path)
|
|
upload.destroy
|
|
upload = nil
|
|
upload_okay = false
|
|
end
|
|
end
|
|
|
|
if upload_okay
|
|
{
|
|
id: row[:id],
|
|
upload: upload.attributes.to_json,
|
|
markdown:
|
|
UploadMarkdown.new(upload).to_markdown(display_name: metadata.description),
|
|
skip_reason: nil,
|
|
}
|
|
else
|
|
error_message =
|
|
upload&.errors&.full_messages&.join(", ") || error_message || "unknown error"
|
|
nil
|
|
end
|
|
end
|
|
|
|
if result.nil?
|
|
status_queue << {
|
|
id: row[:id],
|
|
upload: nil,
|
|
markdown: nil,
|
|
error: "too many retries: #{error_message}",
|
|
skip_reason: "too many retries",
|
|
}
|
|
else
|
|
status_queue << result
|
|
end
|
|
rescue StandardError => e
|
|
status_queue << {
|
|
id: row[:id],
|
|
upload: nil,
|
|
markdown: nil,
|
|
error: e.message,
|
|
skip_reason: "error",
|
|
}
|
|
ensure
|
|
data_file&.close!
|
|
end
|
|
|
|
def build_metadata(row)
|
|
UploadMetadata.new(
|
|
original_filename: row[:display_filename] || row[:filename],
|
|
description: row[:description].presence,
|
|
)
|
|
end
|
|
|
|
def delete_missing_uploads
|
|
puts "Deleting missing uploads from uploads database..."
|
|
|
|
uploads_db.db.execute(<<~SQL)
|
|
DELETE FROM uploads
|
|
WHERE upload IS NULL
|
|
SQL
|
|
end
|
|
|
|
def download_file(url:, id:, retry_count: 0)
|
|
path = download_cache_path(id)
|
|
original_filename = nil
|
|
|
|
if File.exist?(path) && (original_filename = get_original_filename(id))
|
|
return path, original_filename
|
|
end
|
|
|
|
fd = FinalDestination.new(url)
|
|
file = nil
|
|
|
|
fd.get do |response, chunk, uri|
|
|
if file.nil?
|
|
check_response!(response, uri)
|
|
original_filename = extract_filename_from_response(response, uri)
|
|
file = File.open(path, "wb")
|
|
end
|
|
|
|
file.write(chunk)
|
|
|
|
if file.size > MAX_FILE_SIZE
|
|
file.close
|
|
file.unlink
|
|
file = nil
|
|
throw :done
|
|
end
|
|
end
|
|
|
|
if file
|
|
file.close
|
|
uploads_db.insert(
|
|
"INSERT INTO downloads (id, original_filename) VALUES (?, ?)",
|
|
[id, original_filename],
|
|
)
|
|
return path, original_filename
|
|
end
|
|
|
|
nil
|
|
end
|
|
|
|
def download_cache_path(id)
|
|
id = id.gsub("/", "_").gsub("=", "-")
|
|
File.join(settings[:download_cache_path], id)
|
|
end
|
|
|
|
def get_original_filename(id)
|
|
uploads_db.db.query_single_splat("SELECT original_filename FROM downloads WHERE id = ?", id)
|
|
end
|
|
|
|
def check_response!(response, uri)
|
|
if uri.blank?
|
|
code = response.code.to_i
|
|
|
|
if code >= 400
|
|
raise "#{code} Error"
|
|
else
|
|
throw :done
|
|
end
|
|
end
|
|
end
|
|
|
|
def extract_filename_from_response(response, uri)
|
|
filename =
|
|
if (header = response.header["Content-Disposition"].presence)
|
|
disposition_filename =
|
|
header[/filename\*=UTF-8''(\S+)\b/i, 1] || header[/filename=(?:"(.+)"|[^\s;]+)/i, 1]
|
|
if disposition_filename.present?
|
|
URI.decode_www_form_component(disposition_filename)
|
|
else
|
|
nil
|
|
end
|
|
end
|
|
|
|
filename = File.basename(uri.path).presence || "file" if filename.blank?
|
|
|
|
if File.extname(filename).blank? && response.content_type.present?
|
|
ext = MiniMime.lookup_by_content_type(response.content_type)&.extension
|
|
filename = "#{filename}.#{ext}" if ext.present?
|
|
end
|
|
|
|
filename
|
|
end
|
|
|
|
def copy_to_tempfile(source_path)
|
|
extension = File.extname(source_path)
|
|
|
|
Tempfile.open(["discourse-upload", extension]) do |tmpfile|
|
|
File.open(source_path, "rb") { |source_stream| IO.copy_stream(source_stream, tmpfile) }
|
|
tmpfile.rewind
|
|
yield(tmpfile)
|
|
end
|
|
end
|
|
|
|
def log_status
|
|
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
|
|
print "\r%7d / %7d (%s, %s skipped)" %
|
|
[current_count, @max_count, error_count_text, skipped_count]
|
|
end
|
|
end
|
|
end
|
|
end
|