mirror of
https://github.com/discourse/discourse.git
synced 2025-01-16 18:32:46 +08:00
6c91148db8
* DEV: Implement uploads command entrypoint - Setup Thor UploadsCommand for CLI - First pass at modularizing various parts of the exising `uploads_import` script * DEV: First attempt at modularizing missing uploads fixer task Move missing upload fix to a dedicated uploads task implementation unit * DEV: First attempt at modularizing missing uploads uploader task Move uploader to a dedicated uploads task implementation unit * DEV: First attempt at modularizing missing uploads optimizer task Move optimizer to a dedicated uploads task implementation unit * DEV: Various follow up fixes to get optimization working - Start threads early - Improve "log" message formatting - Add missing `copy_to_tempfile` method on "uploader" task * DEV: Refactor a bit more Deduplicate and move most of threading premitives to base task as-is * DEV: Remove redundant condition in uploads db migration * DEV: More deduplication Move task retry logic to base class and tidy up other implementation details carried over from the existing script
245 lines
6.6 KiB
Ruby
245 lines
6.6 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
module Migrations::Uploader
|
|
module Tasks
|
|
class Optimizer < Base
|
|
def initialize(databases, settings)
|
|
super(databases, settings)
|
|
|
|
initialize_existing_ids_tracking_sets
|
|
initialize_discourse_resources
|
|
@max_count = 0
|
|
end
|
|
|
|
def run!
|
|
puts "", "Creating optimized images..."
|
|
|
|
disable_optimized_image_lock
|
|
|
|
start_tracking_sets_loader_threads.each(&:join)
|
|
status_thread = start_status_thread
|
|
consumer_threads = start_consumer_threads
|
|
producer_thread = start_producer_thread
|
|
|
|
producer_thread.join
|
|
work_queue.close
|
|
consumer_threads.each(&:join)
|
|
status_queue.close
|
|
status_thread.join
|
|
end
|
|
|
|
private
|
|
|
|
def initialize_existing_ids_tracking_sets
|
|
@optimized_upload_ids = Set.new
|
|
@post_upload_ids = Set.new
|
|
@avatar_upload_ids = Set.new
|
|
end
|
|
|
|
def initialize_discourse_resources
|
|
@avatar_sizes = Discourse.avatar_sizes
|
|
@system_user = Discourse.system_user
|
|
@category_id = Category.last.id
|
|
end
|
|
|
|
def disable_optimized_image_lock
|
|
# allow more than 1 thread to optimized images at the same time
|
|
OptimizedImage.lock_per_machine = false
|
|
end
|
|
|
|
def start_tracking_sets_loader_threads
|
|
[
|
|
start_optimized_upload_ids_loader_thread,
|
|
start_post_upload_ids_loader_thread,
|
|
start_avatar_upload_ids_loader_thread,
|
|
start_max_count_loader_thread,
|
|
]
|
|
end
|
|
|
|
def handle_status_update(params)
|
|
@current_count += 1
|
|
|
|
case params.delete(:status)
|
|
when :ok
|
|
uploads_db.insert(<<~SQL, params)
|
|
INSERT INTO optimized_images (id, optimized_images)
|
|
VALUES (:id, :optimized_images)
|
|
SQL
|
|
when :error
|
|
@error_count += 1
|
|
when :skipped
|
|
@skipped_count += 1
|
|
end
|
|
end
|
|
|
|
def start_optimized_upload_ids_loader_thread
|
|
Thread.new do
|
|
@uploads_db
|
|
.db
|
|
.query("SELECT id FROM optimized_images") { |row| @optimized_upload_ids << row[:id] }
|
|
end
|
|
end
|
|
|
|
def start_post_upload_ids_loader_thread
|
|
Thread.new do
|
|
sql = <<~SQL
|
|
SELECT upload_ids
|
|
FROM posts
|
|
WHERE upload_ids IS NOT NULL
|
|
SQL
|
|
|
|
@intermediate_db
|
|
.db
|
|
.query(sql) { |row| JSON.parse(row[:upload_ids]).each { |id| @post_upload_ids << id } }
|
|
end
|
|
end
|
|
|
|
def start_avatar_upload_ids_loader_thread
|
|
Thread.new do
|
|
sql = <<~SQL
|
|
SELECT avatar_upload_id
|
|
FROM users
|
|
WHERE avatar_upload_id IS NOT NULL
|
|
SQL
|
|
|
|
@intermediate_db.db.query(sql) { |row| @avatar_upload_ids << row[:avatar_upload_id] }
|
|
end
|
|
end
|
|
|
|
def start_max_count_loader_thread
|
|
Thread.new do
|
|
@max_count =
|
|
@uploads_db.db.query_single_splat(
|
|
"SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL",
|
|
)
|
|
end
|
|
end
|
|
|
|
def enqueue_jobs
|
|
sql = <<~SQL
|
|
SELECT id AS upload_id, upload ->> 'sha1' AS upload_sha1, markdown
|
|
FROM uploads
|
|
WHERE upload IS NOT NULL
|
|
ORDER BY rowid
|
|
SQL
|
|
|
|
@uploads_db
|
|
.db
|
|
.query(sql) do |row|
|
|
upload_id = row[:upload_id]
|
|
|
|
if @optimized_upload_ids.include?(upload_id) || !row[:markdown].start_with?("![")
|
|
status_queue << { id: row[:upload_id], status: :skipped }
|
|
next
|
|
end
|
|
|
|
if @post_upload_ids.include?(upload_id)
|
|
row[:type] = "post"
|
|
work_queue << row
|
|
elsif @avatar_upload_ids.include?(upload_id)
|
|
row[:type] = "avatar"
|
|
work_queue << row
|
|
else
|
|
status_queue << { id: row[:upload_id], status: :skipped }
|
|
end
|
|
end
|
|
end
|
|
|
|
def start_consumer_threads
|
|
Jobs.run_immediately!
|
|
|
|
super
|
|
end
|
|
|
|
def log_status
|
|
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
|
|
|
|
print "\r%7d / %7d (%s, %d skipped)" %
|
|
[current_count, @max_count, error_count_text, skipped_count]
|
|
end
|
|
|
|
def instantiate_task_resource
|
|
PostCreator.new(
|
|
@system_user,
|
|
raw: "Topic created by uploads_importer",
|
|
acting_user: @system_user,
|
|
skip_validations: true,
|
|
title: "Topic created by uploads_importer - #{SecureRandom.hex}",
|
|
archetype: Archetype.default,
|
|
category: @category_id,
|
|
).create!
|
|
end
|
|
|
|
def process_upload(row, post)
|
|
result = with_retries { attempt_optimization(row, post) }
|
|
status_queue << (result || { id: row[:upload_id], status: :error })
|
|
end
|
|
|
|
def attempt_optimization(row, post)
|
|
upload = Upload.find_by(sha1: row[:upload_sha1])
|
|
optimized_images = create_optimized_images(row[:type], row[:markdown], upload, post)
|
|
|
|
return if optimized_images.blank?
|
|
|
|
processed_optimized_images = process_optimized_images(optimized_images)
|
|
|
|
if images_valid?(processed_optimized_images)
|
|
{
|
|
id: row[:upload_id],
|
|
optimized_images: serialize_optimized_images(processed_optimized_images),
|
|
status: :ok,
|
|
}
|
|
end
|
|
end
|
|
|
|
def images_valid?(images)
|
|
!images.nil? && images.all?(&:present?) && images.all?(&:persisted?) &&
|
|
images.all? { |o| o.errors.blank? }
|
|
end
|
|
|
|
def serialize_optimized_images(images)
|
|
images.presence&.to_json(only: OptimizedImage.column_names)
|
|
end
|
|
|
|
def create_optimized_images(type, markdown, upload, post)
|
|
case type
|
|
when "post"
|
|
post.update_columns(baked_at: nil, cooked: "", raw: markdown)
|
|
post.reload
|
|
post.rebake!
|
|
|
|
OptimizedImage.where(upload_id: upload.id).to_a
|
|
when "avatar"
|
|
@avatar_sizes.map { |size| OptimizedImage.create_for(upload, size, size) }
|
|
end
|
|
rescue StandardError => e
|
|
puts e.message
|
|
puts e.stacktrace
|
|
|
|
nil
|
|
end
|
|
|
|
def process_optimized_images(images)
|
|
begin
|
|
images.map! do |image|
|
|
next if image.blank?
|
|
|
|
image_path = add_multisite_prefix(discourse_store.get_path_for_optimized_image(image))
|
|
|
|
unless file_exists?(image_path)
|
|
image.destroy
|
|
image = nil
|
|
end
|
|
|
|
image
|
|
end
|
|
rescue StandardError
|
|
images = nil
|
|
end
|
|
|
|
images
|
|
end
|
|
end
|
|
end
|
|
end
|