DEV: Uploads import script can download files (#26816)

Uploads import script can download files
This commit is contained in:
Gerhard Schlager 2024-05-04 22:48:16 +02:00 committed by GitHub
parent f230767722
commit 1872047053
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 118 additions and 12 deletions

View File

@ -7,16 +7,17 @@ class UploadMarkdown
def to_markdown(display_name: nil) def to_markdown(display_name: nil)
if FileHelper.is_supported_image?(@upload.original_filename) if FileHelper.is_supported_image?(@upload.original_filename)
image_markdown image_markdown(display_name: display_name)
elsif FileHelper.is_supported_playable_media?(@upload.original_filename) elsif FileHelper.is_supported_playable_media?(@upload.original_filename)
playable_media_markdown playable_media_markdown(display_name: display_name)
else else
attachment_markdown(display_name: display_name) attachment_markdown(display_name: display_name)
end end
end end
def image_markdown def image_markdown(display_name: nil)
"![#{@upload.original_filename}|#{@upload.width}x#{@upload.height}](#{@upload.short_url})" display_name ||= @upload.original_filename
"![#{display_name}|#{@upload.width}x#{@upload.height}](#{@upload.short_url})"
end end
def attachment_markdown(display_name: nil, with_filesize: true) def attachment_markdown(display_name: nil, with_filesize: true)
@ -26,7 +27,7 @@ class UploadMarkdown
"[#{display_name}|attachment](#{@upload.short_url})#{human_filesize}" "[#{display_name}|attachment](#{@upload.short_url})#{human_filesize}"
end end
def playable_media_markdown def playable_media_markdown(display_name: nil)
type = type =
if FileHelper.is_supported_audio?(@upload.original_filename) if FileHelper.is_supported_audio?(@upload.original_filename)
"audio" "audio"
@ -34,6 +35,7 @@ class UploadMarkdown
"video" "video"
end end
return attachment_markdown if !type return attachment_markdown if !type
"![#{@upload.original_filename}|#{type}](#{@upload.short_url})" display_name ||= @upload.original_filename
"![#{display_name}|#{type}](#{@upload.short_url})"
end end
end end

View File

@ -20,6 +20,10 @@ rescue LoadError
end end
module BulkImport module BulkImport
MAX_FILE_SIZE = 1.gigabyte
UploadMetadata = Struct.new(:original_filename, :origin_url, :description)
class UploadsImporter class UploadsImporter
TRANSACTION_SIZE = 1000 TRANSACTION_SIZE = 1000
QUEUE_SIZE = 1000 QUEUE_SIZE = 1000
@ -109,7 +113,7 @@ module BulkImport
producer_thread = producer_thread =
Thread.new do Thread.new do
query("SELECT * FROM uploads", @source_db).tap do |result_set| query("SELECT * FROM uploads ORDER BY id", @source_db).tap do |result_set|
result_set.each { |row| queue << row unless output_existing_ids.include?(row["id"]) } result_set.each { |row| queue << row unless output_existing_ids.include?(row["id"]) }
result_set.close result_set.close
end end
@ -131,7 +135,7 @@ module BulkImport
puts "", "Failed to create upload: #{params[:id]} (#{error_message})", "" puts "", "Failed to create upload: #{params[:id]} (#{error_message})", ""
end end
@output_db.execute(<<~SQL, params) insert(<<~SQL, params)
INSERT INTO uploads (id, upload, markdown, skip_reason) INSERT INTO uploads (id, upload, markdown, skip_reason)
VALUES (:id, :upload, :markdown, :skip_reason) VALUES (:id, :upload, :markdown, :skip_reason)
SQL SQL
@ -158,14 +162,23 @@ module BulkImport
begin begin
data_file = nil data_file = nil
path = nil path = nil
metadata =
UploadMetadata.new(
original_filename: row["display_filename"] || row["filename"],
description: row["description"].presence,
)
if row["data"].present? if row["data"].present?
data_file = Tempfile.new("discourse-upload", binmode: true) data_file = Tempfile.new("discourse-upload", binmode: true)
data_file.write(row["data"]) data_file.write(row["data"])
data_file.rewind data_file.rewind
path = data_file.path path = data_file.path
elsif row["url"].present?
path, metadata.original_filename = download_file(url: row["url"], id: row["id"])
metadata.origin_url = row["url"]
next if !path
else else
relative_path = row["relative_path"] relative_path = row["relative_path"] || ""
file_exists = false file_exists = false
@root_paths.each do |root_path| @root_paths.each do |root_path|
@ -198,8 +211,9 @@ module BulkImport
begin begin
UploadCreator.new( UploadCreator.new(
file, file,
row["display_filename"] || row["filename"], metadata.original_filename,
type: row["type"], type: row["type"],
origin: metadata.origin_url,
).create_for(Discourse::SYSTEM_USER_ID) ).create_for(Discourse::SYSTEM_USER_ID)
rescue StandardError => e rescue StandardError => e
error_message = e.message error_message = e.message
@ -228,7 +242,8 @@ module BulkImport
status_queue << { status_queue << {
id: row["id"], id: row["id"],
upload: upload.attributes.to_json, upload: upload.attributes.to_json,
markdown: UploadMarkdown.new(upload).to_markdown, markdown:
UploadMarkdown.new(upload).to_markdown(display_name: metadata.description),
skip_reason: nil, skip_reason: nil,
} }
break break
@ -269,6 +284,85 @@ module BulkImport
status_thread.join status_thread.join
end end
def download_file(url:, id:, retry_count: 0)
path = download_cache_path(id)
original_filename = nil
if File.exist?(path) && (original_filename = get_original_filename(id))
return path, original_filename
end
fd = FinalDestination.new(url)
file = nil
fd.get do |response, chunk, uri|
if file.nil?
check_response!(response, uri)
original_filename = extract_filename_from_response(response, uri)
file = File.open(path, "wb")
end
file.write(chunk)
if file.size > MAX_FILE_SIZE
file.close
file.unlink
file = nil
throw :done
end
end
if file
file.close
insert(
"INSERT INTO downloads (id, original_filename) VALUES (?, ?)",
[id, original_filename],
)
return path, original_filename
end
nil
end
def download_cache_path(id)
id = id.gsub("/", "_").gsub("=", "-")
File.join(@settings[:download_cache_path], id)
end
def get_original_filename(id)
@output_db.get_first_value("SELECT original_filename FROM downloads WHERE id = ?", id)
end
def check_response!(response, uri)
if uri.blank?
code = response.code.to_i
if code >= 400
raise "#{code} Error"
else
throw :done
end
end
end
def extract_filename_from_response(response, uri)
filename =
if (header = response.header["Content-Disposition"].presence)
disposition_filename =
header[/filename\*=UTF-8''(\S+)\b/i, 1] || header[/filename=(?:"(.+)"|[^\s;]+)/i, 1]
disposition_filename.present? ? URI.decode_www_form_component(disposition_filename) : nil
end
filename = File.basename(uri.path).presence || "file" if filename.blank?
if File.extname(filename).blank? && response.content_type.present?
ext = MiniMime.lookup_by_content_type(response.content_type)&.extension
filename = "#{filename}.#{ext}" if ext.present?
end
filename
end
def fix_missing def fix_missing
queue = SizedQueue.new(QUEUE_SIZE) queue = SizedQueue.new(QUEUE_SIZE)
consumer_threads = [] consumer_threads = []
@ -419,7 +513,7 @@ module BulkImport
case params.delete(:status) case params.delete(:status)
when :ok when :ok
@output_db.execute(<<~SQL, params) insert(<<~SQL, params)
INSERT INTO optimized_images (id, optimized_images) INSERT INTO optimized_images (id, optimized_images)
VALUES (:id, :optimized_images) VALUES (:id, :optimized_images)
SQL SQL
@ -604,6 +698,13 @@ module BulkImport
optimized_images JSON_TEXT optimized_images JSON_TEXT
) )
SQL SQL
@output_db.execute(<<~SQL)
CREATE TABLE IF NOT EXISTS downloads (
id TEXT PRIMARY KEY NOT NULL,
original_filename TEXT NOT NULL
)
SQL
end end
def insert(sql, bind_vars = []) def insert(sql, bind_vars = [])

View File

@ -5,6 +5,9 @@ root_paths:
- "/path/to/your/files" - "/path/to/your/files"
- "/path/to/more/files" - "/path/to/more/files"
# Files that are downloaded from URLs are cached in this directory.
download_cache_path: "/path/to/downloaded/files"
# The number of threads to use for processing uploads is calculated as: # The number of threads to use for processing uploads is calculated as:
# thread_count = [number of cores] * [thread_count_factor] # thread_count = [number of cores] * [thread_count_factor]
# The thread count will be doubled if uploads are stored on S3 because there's a higher latency. # The thread count will be doubled if uploads are stored on S3 because there's a higher latency.