discourse/migrations/lib/uploader/tasks/base.rb
Selase Krakani 6c91148db8
DEV: Refactor uploads_importer script (#29292)
* DEV: Implement uploads command entrypoint

- Setup Thor UploadsCommand for CLI
- First pass at modularizing various parts of the exising `uploads_import` script

* DEV: First attempt at modularizing missing uploads fixer task

Move missing upload fix to a dedicated uploads task implementation unit

* DEV: First attempt at modularizing missing uploads uploader task

Move uploader to a dedicated uploads task implementation unit

* DEV: First attempt at modularizing missing uploads optimizer task

Move optimizer to a dedicated uploads task implementation unit

* DEV: Various follow up fixes to get optimization working

- Start threads early
- Improve "log" message formatting
- Add missing `copy_to_tempfile` method on "uploader" task

* DEV: Refactor a bit more

Deduplicate and move most of threading premitives to base task as-is

* DEV: Remove redundant condition in uploads db migration

* DEV: More deduplication

Move task retry logic to base class and tidy up other implementation
details carried over from the existing script
2024-10-31 13:31:12 +00:00

141 lines
3.1 KiB
Ruby

# frozen_string_literal: true
require "etc"
require "colored2"
module Migrations::Uploader
module Tasks
class Base
class NotImplementedError < StandardError
end
TRANSACTION_SIZE = 1000
QUEUE_SIZE = 1000
DEFAULT_THREAD_FACTOR = 1.5
attr_reader :uploads_db,
:intermediate_db,
:settings,
:work_queue,
:status_queue,
:discourse_store,
:error_count,
:current_count,
:missing_count,
:skipped_count
def initialize(databases, settings)
@uploads_db = databases[:uploads_db]
@intermediate_db = databases[:intermediate_db]
@settings = settings
@work_queue = SizedQueue.new(QUEUE_SIZE)
@status_queue = SizedQueue.new(QUEUE_SIZE)
@discourse_store = Discourse.store
@error_count = 0
@current_count = 0
@missing_count = 0
@skipped_count = 0
end
def run!
raise NotImplementedError
end
def self.run!(databases, settings)
new(databases, settings).run!
end
protected
def handle_status_update
raise NotImplementedError
end
def enqueue_jobs
raise NotImplementedError
end
def instantiate_task_resource
{}
end
def start_status_thread
Thread.new do
while !(result = status_queue.pop).nil?
handle_status_update(result)
log_status
end
end
end
def start_consumer_threads
thread_count.times.map { |index| consumer_thread(index) }
end
def consumer_thread(index)
Thread.new do
Thread.current.name = "worker-#{index}"
resource = instantiate_task_resource
while (row = work_queue.pop)
process_upload(row, resource)
end
end
end
def start_producer_thread
Thread.new { enqueue_jobs }
end
def thread_count
@thread_count ||= calculate_thread_count
end
def add_multisite_prefix(path)
return path if !Rails.configuration.multisite
File.join("uploads", RailsMultisite::ConnectionManagement.current_db, path)
end
def file_exists?(path)
if discourse_store.external?
discourse_store.object_from_path(path).exists?
else
File.exist?(File.join(discourse_store.public_dir, path))
end
end
def with_retries(max: 3)
count = 0
loop do
result = yield
break result if result
count += 1
break nil if count >= max
sleep(calculate_backoff(count))
end
end
private
def calculate_backoff(retry_count)
0.25 * retry_count
end
def calculate_thread_count
base = Etc.nprocessors
thread_count_factor = settings.fetch(:thread_count_factor, DEFAULT_THREAD_FACTOR)
store_factor = discourse_store.external? ? 2 : 1
(base * thread_count_factor * store_factor).to_i
end
end
end
end