mirror of
https://github.com/discourse/discourse.git
synced 2025-01-04 23:45:58 +08:00
328 lines
9.5 KiB
Ruby
328 lines
9.5 KiB
Ruby
|
# frozen_string_literal: true
|
||
|
|
||
|
module Migrations::Uploader
|
||
|
module Tasks
|
||
|
class Uploader < Base
|
||
|
MAX_FILE_SIZE = 1.gigabyte
|
||
|
|
||
|
UploadMetadata = Struct.new(:original_filename, :origin_url, :description)
|
||
|
|
||
|
def run!
|
||
|
puts "", "Uploading uploads..."
|
||
|
|
||
|
process_existing_uploads
|
||
|
|
||
|
status_thread = start_status_thread
|
||
|
consumer_threads = start_consumer_threads
|
||
|
producer_thread = start_producer_thread
|
||
|
|
||
|
producer_thread.join
|
||
|
work_queue.close
|
||
|
consumer_threads.each(&:join)
|
||
|
status_queue.close
|
||
|
status_thread.join
|
||
|
end
|
||
|
|
||
|
private
|
||
|
|
||
|
def process_existing_uploads
|
||
|
delete_missing_uploads if settings[:delete_missing_uploads]
|
||
|
initialize_existing_ids_tracking_sets
|
||
|
handle_surplus_uploads if surplus_upload_ids.any?
|
||
|
|
||
|
@max_count = (@source_existing_ids - @output_existing_ids).size
|
||
|
@source_existing_ids = nil
|
||
|
|
||
|
puts "Found #{@output_existing_ids.size} existing uploads. #{@max_count} are missing."
|
||
|
end
|
||
|
|
||
|
def initialize_existing_ids_tracking_sets
|
||
|
@output_existing_ids = load_existing_ids(uploads_db.db, Set.new)
|
||
|
@source_existing_ids = load_existing_ids(intermediate_db.db, Set.new)
|
||
|
end
|
||
|
|
||
|
def load_existing_ids(db, set)
|
||
|
db.query("SELECT id FROM uploads") { |row| set << row[:id] }
|
||
|
|
||
|
set
|
||
|
end
|
||
|
|
||
|
def handle_surplus_uploads
|
||
|
if settings[:delete_surplus_uploads]
|
||
|
puts "Deleting #{surplus_upload_ids.size} uploads from output database..."
|
||
|
|
||
|
surplus_upload_ids.each_slice(TRANSACTION_SIZE) do |ids|
|
||
|
placeholders = (["?"] * ids.size).join(",")
|
||
|
uploads_db.db.execute(<<~SQL, ids)
|
||
|
DELETE FROM uploads
|
||
|
WHERE id IN (#{placeholders})
|
||
|
SQL
|
||
|
end
|
||
|
|
||
|
@output_existing_ids -= surplus_upload_ids
|
||
|
else
|
||
|
puts "Found #{surplus_upload_ids.size} surplus uploads in output database. " \
|
||
|
"Run with `delete_surplus_uploads: true` to delete them."
|
||
|
end
|
||
|
|
||
|
@surplus_upload_ids = nil
|
||
|
end
|
||
|
|
||
|
def surplus_upload_ids
|
||
|
@surplus_upload_ids ||= @output_existing_ids - @source_existing_ids
|
||
|
end
|
||
|
|
||
|
def handle_status_update(params)
|
||
|
@current_count += 1
|
||
|
|
||
|
begin
|
||
|
if params.delete(:skipped) == true
|
||
|
@skipped_count += 1
|
||
|
elsif (error_message = params.delete(:error)) || params[:upload].nil?
|
||
|
@error_count += 1
|
||
|
puts "", "Failed to create upload: #{params[:id]} (#{error_message})", ""
|
||
|
end
|
||
|
|
||
|
uploads_db.insert(<<~SQL, params)
|
||
|
INSERT INTO uploads (id, upload, markdown, skip_reason)
|
||
|
VALUES (:id, :upload, :markdown, :skip_reason)
|
||
|
SQL
|
||
|
rescue StandardError => e
|
||
|
puts "", "Failed to insert upload: #{params[:id]} (#{e.message}))", ""
|
||
|
@error_count += 1
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def enqueue_jobs
|
||
|
intermediate_db
|
||
|
.db
|
||
|
.query("SELECT * FROM uploads ORDER BY id") do |row|
|
||
|
work_queue << row if @output_existing_ids.exclude?(row[:id])
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def find_file_in_paths(row)
|
||
|
relative_path = row[:relative_path] || ""
|
||
|
|
||
|
settings[:root_paths].each do |root_path|
|
||
|
path = File.join(root_path, relative_path, row[:filename])
|
||
|
|
||
|
return path if File.exist?(path)
|
||
|
|
||
|
settings[:path_replacements].each do |from, to|
|
||
|
path = File.join(root_path, relative_path.sub(from, to), row[:filename])
|
||
|
|
||
|
return path if File.exist?(path)
|
||
|
end
|
||
|
end
|
||
|
|
||
|
nil
|
||
|
end
|
||
|
|
||
|
def handle_missing_file(row)
|
||
|
status_queue << { id: row[:id], upload: nil, skipped: true, skip_reason: "file not found" }
|
||
|
end
|
||
|
|
||
|
def process_upload(row, _)
|
||
|
metadata = build_metadata(row)
|
||
|
data_file = nil
|
||
|
path = nil
|
||
|
|
||
|
if row[:data].present?
|
||
|
data_file = Tempfile.new("discourse-upload", binmode: true)
|
||
|
data_file.write(row[:data])
|
||
|
data_file.rewind
|
||
|
path = data_file.path
|
||
|
elsif row[:url].present?
|
||
|
path, metadata.original_filename = download_file(url: row[:url], id: row[:id])
|
||
|
metadata.origin_url = row[:url]
|
||
|
return if !path
|
||
|
else
|
||
|
path = find_file_in_paths(row)
|
||
|
return handle_missing_file(row) if path.nil?
|
||
|
end
|
||
|
|
||
|
error_message = nil
|
||
|
result =
|
||
|
with_retries do
|
||
|
upload =
|
||
|
copy_to_tempfile(path) do |file|
|
||
|
begin
|
||
|
UploadCreator.new(
|
||
|
file,
|
||
|
metadata.original_filename,
|
||
|
type: row[:type],
|
||
|
origin: metadata.origin_url,
|
||
|
).create_for(Discourse::SYSTEM_USER_ID)
|
||
|
rescue StandardError => e
|
||
|
error_message = e.message
|
||
|
nil
|
||
|
end
|
||
|
end
|
||
|
|
||
|
if (upload_okay = upload.present? && upload.persisted? && upload.errors.blank?)
|
||
|
upload_path = add_multisite_prefix(discourse_store.get_path_for_upload(upload))
|
||
|
|
||
|
unless file_exists?(upload_path)
|
||
|
upload.destroy
|
||
|
upload = nil
|
||
|
upload_okay = false
|
||
|
end
|
||
|
end
|
||
|
|
||
|
if upload_okay
|
||
|
{
|
||
|
id: row[:id],
|
||
|
upload: upload.attributes.to_json,
|
||
|
markdown:
|
||
|
UploadMarkdown.new(upload).to_markdown(display_name: metadata.description),
|
||
|
skip_reason: nil,
|
||
|
}
|
||
|
else
|
||
|
error_message =
|
||
|
upload&.errors&.full_messages&.join(", ") || error_message || "unknown error"
|
||
|
nil
|
||
|
end
|
||
|
end
|
||
|
|
||
|
if result.nil?
|
||
|
status_queue << {
|
||
|
id: row[:id],
|
||
|
upload: nil,
|
||
|
markdown: nil,
|
||
|
error: "too many retries: #{error_message}",
|
||
|
skip_reason: "too many retries",
|
||
|
}
|
||
|
else
|
||
|
status_queue << result
|
||
|
end
|
||
|
rescue StandardError => e
|
||
|
status_queue << {
|
||
|
id: row[:id],
|
||
|
upload: nil,
|
||
|
markdown: nil,
|
||
|
error: e.message,
|
||
|
skip_reason: "error",
|
||
|
}
|
||
|
ensure
|
||
|
data_file&.close!
|
||
|
end
|
||
|
|
||
|
def build_metadata(row)
|
||
|
UploadMetadata.new(
|
||
|
original_filename: row[:display_filename] || row[:filename],
|
||
|
description: row[:description].presence,
|
||
|
)
|
||
|
end
|
||
|
|
||
|
def delete_missing_uploads
|
||
|
puts "Deleting missing uploads from uploads database..."
|
||
|
|
||
|
uploads_db.db.execute(<<~SQL)
|
||
|
DELETE FROM uploads
|
||
|
WHERE upload IS NULL
|
||
|
SQL
|
||
|
end
|
||
|
|
||
|
def download_file(url:, id:, retry_count: 0)
|
||
|
path = download_cache_path(id)
|
||
|
original_filename = nil
|
||
|
|
||
|
if File.exist?(path) && (original_filename = get_original_filename(id))
|
||
|
return path, original_filename
|
||
|
end
|
||
|
|
||
|
fd = FinalDestination.new(url)
|
||
|
file = nil
|
||
|
|
||
|
fd.get do |response, chunk, uri|
|
||
|
if file.nil?
|
||
|
check_response!(response, uri)
|
||
|
original_filename = extract_filename_from_response(response, uri)
|
||
|
file = File.open(path, "wb")
|
||
|
end
|
||
|
|
||
|
file.write(chunk)
|
||
|
|
||
|
if file.size > MAX_FILE_SIZE
|
||
|
file.close
|
||
|
file.unlink
|
||
|
file = nil
|
||
|
throw :done
|
||
|
end
|
||
|
end
|
||
|
|
||
|
if file
|
||
|
file.close
|
||
|
uploads_db.insert(
|
||
|
"INSERT INTO downloads (id, original_filename) VALUES (?, ?)",
|
||
|
[id, original_filename],
|
||
|
)
|
||
|
return path, original_filename
|
||
|
end
|
||
|
|
||
|
nil
|
||
|
end
|
||
|
|
||
|
def download_cache_path(id)
|
||
|
id = id.gsub("/", "_").gsub("=", "-")
|
||
|
File.join(settings[:download_cache_path], id)
|
||
|
end
|
||
|
|
||
|
def get_original_filename(id)
|
||
|
uploads_db.db.query_single_splat("SELECT original_filename FROM downloads WHERE id = ?", id)
|
||
|
end
|
||
|
|
||
|
def check_response!(response, uri)
|
||
|
if uri.blank?
|
||
|
code = response.code.to_i
|
||
|
|
||
|
if code >= 400
|
||
|
raise "#{code} Error"
|
||
|
else
|
||
|
throw :done
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def extract_filename_from_response(response, uri)
|
||
|
filename =
|
||
|
if (header = response.header["Content-Disposition"].presence)
|
||
|
disposition_filename =
|
||
|
header[/filename\*=UTF-8''(\S+)\b/i, 1] || header[/filename=(?:"(.+)"|[^\s;]+)/i, 1]
|
||
|
if disposition_filename.present?
|
||
|
URI.decode_www_form_component(disposition_filename)
|
||
|
else
|
||
|
nil
|
||
|
end
|
||
|
end
|
||
|
|
||
|
filename = File.basename(uri.path).presence || "file" if filename.blank?
|
||
|
|
||
|
if File.extname(filename).blank? && response.content_type.present?
|
||
|
ext = MiniMime.lookup_by_content_type(response.content_type)&.extension
|
||
|
filename = "#{filename}.#{ext}" if ext.present?
|
||
|
end
|
||
|
|
||
|
filename
|
||
|
end
|
||
|
|
||
|
def copy_to_tempfile(source_path)
|
||
|
extension = File.extname(source_path)
|
||
|
|
||
|
Tempfile.open(["discourse-upload", extension]) do |tmpfile|
|
||
|
File.open(source_path, "rb") { |source_stream| IO.copy_stream(source_stream, tmpfile) }
|
||
|
tmpfile.rewind
|
||
|
yield(tmpfile)
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def log_status
|
||
|
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
|
||
|
print "\r%7d / %7d (%s, %s skipped)" %
|
||
|
[current_count, @max_count, error_count_text, skipped_count]
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
end
|