discourse/lib/file_store/to_s3_migration.rb
David Taylor b0416cb1c1
FEATURE: Upload to s3 in parallel to speed up backup restores (#13391)
Uploading lots of small files can be made significantly faster by parallelizing the `s3.put_object` calls. In testing, an UPLOAD_CONCURRENCY of 10 made a large restore 10x faster. An UPLOAD_CONCURRENCY of 20 made the same restore 18x faster.

This commit is careful to parallelize as little as possible, to reduce the chance of concurrency issues. In the worker threads, no database transactions are performed. All modification of shared objects is controlled with a mutex.

Unfortunately we do not have any existing tests for the `ToS3Migration` class. This change has been tested with a large site backup (120k uploads totalling 45GB)
2021-06-16 10:34:39 +01:00

382 lines
12 KiB
Ruby

# frozen_string_literal: true
require 'aws-sdk-s3'
module FileStore
ToS3MigrationError = Class.new(RuntimeError)
class ToS3Migration
MISSING_UPLOADS_RAKE_TASK_NAME ||= 'posts:missing_uploads'
UPLOAD_CONCURRENCY ||= 20
def initialize(s3_options:, dry_run: false, migrate_to_multisite: false, skip_etag_verify: false)
@s3_bucket = s3_options[:bucket]
@s3_client_options = s3_options[:client_options]
@dry_run = dry_run
@migrate_to_multisite = migrate_to_multisite
@skip_etag_verify = skip_etag_verify
@current_db = RailsMultisite::ConnectionManagement.current_db
end
def self.s3_options_from_site_settings
{
client_options: S3Helper.s3_options(SiteSetting),
bucket: SiteSetting.Upload.s3_upload_bucket
}
end
def self.s3_options_from_env
unless ENV["DISCOURSE_S3_BUCKET"].present? &&
ENV["DISCOURSE_S3_REGION"].present? &&
(
(
ENV["DISCOURSE_S3_ACCESS_KEY_ID"].present? &&
ENV["DISCOURSE_S3_SECRET_ACCESS_KEY"].present?
) || ENV["DISCOURSE_S3_USE_IAM_PROFILE"].present?
)
raise ToS3MigrationError.new(<<~TEXT)
Please provide the following environment variables:
- DISCOURSE_S3_BUCKET
- DISCOURSE_S3_REGION
and either
- DISCOURSE_S3_ACCESS_KEY_ID
- DISCOURSE_S3_SECRET_ACCESS_KEY
or
- DISCOURSE_S3_USE_IAM_PROFILE
TEXT
end
opts = { region: ENV["DISCOURSE_S3_REGION"] }
opts[:endpoint] = ENV["DISCOURSE_S3_ENDPOINT"] if ENV["DISCOURSE_S3_ENDPOINT"].present?
if ENV["DISCOURSE_S3_USE_IAM_PROFILE"].blank?
opts[:access_key_id] = ENV["DISCOURSE_S3_ACCESS_KEY_ID"]
opts[:secret_access_key] = ENV["DISCOURSE_S3_SECRET_ACCESS_KEY"]
end
{
client_options: opts,
bucket: ENV["DISCOURSE_S3_BUCKET"]
}
end
def migrate
migrate_to_s3
end
def migration_successful?(should_raise: false)
success = true
failure_message = "S3 migration failed for db '#{@current_db}'."
prefix = @migrate_to_multisite ? "uploads/#{@current_db}/original/" : "original/"
base_url = File.join(SiteSetting.Upload.s3_base_url, prefix)
count = Upload.by_users.where("url NOT LIKE '#{base_url}%'").count
if count > 0
error_message = "#{count} of #{Upload.count} uploads are not migrated to S3. #{failure_message}"
raise_or_log(error_message, should_raise)
success = false
end
cdn_path = SiteSetting.cdn_path("/uploads/#{@current_db}/original").sub(/https?:/, "")
count = Post.where("cooked LIKE '%#{cdn_path}%'").count
if count > 0
error_message = "#{count} posts are not remapped to new S3 upload URL. #{failure_message}"
raise_or_log(error_message, should_raise)
success = false
end
Discourse::Application.load_tasks unless Rake::Task.task_defined?(MISSING_UPLOADS_RAKE_TASK_NAME)
Rake::Task[MISSING_UPLOADS_RAKE_TASK_NAME]
count = DB.query_single(<<~SQL, Post::MISSING_UPLOADS, Post::MISSING_UPLOADS_IGNORED).first
SELECT COUNT(1)
FROM posts p
WHERE EXISTS (
SELECT 1
FROM post_custom_fields f
WHERE f.post_id = p.id AND f.name = ?
) AND NOT EXISTS (
SELECT 1
FROM post_custom_fields f
WHERE f.post_id = p.id AND f.name = ?
)
SQL
if count > 0
error_message = "rake posts:missing_uploads identified #{count} issues. #{failure_message}"
raise_or_log(error_message, should_raise)
success = false
end
count = Post.where('baked_version <> ? OR baked_version IS NULL', Post::BAKED_VERSION).count
if count > 0
log("#{count} posts still require rebaking and will be rebaked during regular job")
log("To speed up migrations of posts we recommend you run 'rake posts:rebake_uncooked_posts'") if count > 100
success = false
else
log("No posts require rebaking")
end
success
end
protected
def log(message)
puts message
end
def raise_or_log(message, should_raise)
if should_raise
raise ToS3MigrationError.new(message)
else
log(message)
end
end
def uploads_migrated_to_new_scheme?
seeded_image_url = "uploads/#{@current_db}/original/_X/"
!Upload.by_users.where("url NOT LIKE '//%' AND url NOT LIKE '/%#{seeded_image_url}%'").exists?
end
def migrate_to_s3
# we don't want have migrated state, ensure we run all jobs here
Jobs.run_immediately!
log "*" * 30 + " DRY RUN " + "*" * 30 if @dry_run
log "Migrating uploads to S3 for '#{@current_db}'..."
if !uploads_migrated_to_new_scheme?
log "Some uploads were not migrated to the new scheme. Running the migration, this may take a while..."
SiteSetting.migrate_to_new_scheme = true
Upload.migrate_to_new_scheme
if !uploads_migrated_to_new_scheme?
raise ToS3MigrationError.new("Some uploads could not be migrated to the new scheme. " \
"You need to fix this manually.")
end
end
bucket_has_folder_path = true if @s3_bucket.include? "/"
public_directory = Rails.root.join("public").to_s
s3 = Aws::S3::Client.new(@s3_client_options)
if bucket_has_folder_path
bucket, folder = S3Helper.get_bucket_and_folder_path(@s3_bucket)
folder = File.join(folder, "/")
else
bucket, folder = @s3_bucket, ""
end
log "Uploading files to S3..."
log " - Listing local files"
local_files = []
IO.popen("cd #{public_directory} && find uploads/#{@current_db}/original -type f").each do |file|
local_files << file.chomp
putc "." if local_files.size % 1000 == 0
end
log " => #{local_files.size} files"
log " - Listing S3 files"
s3_objects = []
prefix = @migrate_to_multisite ? "uploads/#{@current_db}/original/" : "original/"
options = { bucket: bucket, prefix: folder + prefix }
loop do
response = s3.list_objects_v2(options)
s3_objects.concat(response.contents)
putc "."
break if response.next_continuation_token.blank?
options[:continuation_token] = response.next_continuation_token
end
log " => #{s3_objects.size} files"
log " - Syncing files to S3"
queue = Queue.new
synced = 0
failed = []
lock = Mutex.new
upload_threads = UPLOAD_CONCURRENCY.times.map do
Thread.new do
while obj = queue.pop
if s3.put_object(obj[:options]).etag[obj[:etag]]
putc "."
lock.synchronize { synced += 1 }
else
putc "X"
lock.synchronize { failed << obj[:path] }
end
end
end
end
local_files.each do |file|
path = File.join(public_directory, file)
name = File.basename(path)
etag = Digest::MD5.file(path).hexdigest unless @skip_etag_verify
key = file[file.index(prefix)..-1]
key.prepend(folder) if bucket_has_folder_path
original_path = file.sub("uploads/#{@current_db}", "")
if s3_object = s3_objects.find { |obj| obj.key.ends_with?(original_path) }
next if File.size(path) == s3_object.size && (@skip_etag_verify || s3_object.etag[etag])
end
options = {
acl: "public-read",
body: File.open(path, "rb"),
bucket: bucket,
content_type: MiniMime.lookup_by_filename(name)&.content_type,
key: key,
}
if !FileHelper.is_supported_image?(name)
upload = Upload.find_by(url: "/#{file}")
if upload&.original_filename
options[:content_disposition] = ActionDispatch::Http::ContentDisposition.format(
disposition: "attachment", filename: upload.original_filename
)
end
if upload&.secure
options[:acl] = "private"
end
elsif !FileHelper.is_inline_image?(name)
upload = Upload.find_by(url: "/#{file}")
options[:content_disposition] = ActionDispatch::Http::ContentDisposition.format(
disposition: "attachment", filename: upload&.original_filename || name
)
end
etag ||= Digest::MD5.file(path).hexdigest
if @dry_run
log "#{file} => #{options[:key]}"
synced += 1
else
queue << { path: path, options: options, etag: etag }
end
end
queue.close
upload_threads.each(&:join)
puts
failure_message = "S3 migration failed for db '#{@current_db}'."
if failed.size > 0
log "Failed to upload #{failed.size} files"
log failed.join("\n")
raise failure_message
elsif s3_objects.size + synced >= local_files.size
log "Updating the URLs in the database..."
from = "/uploads/#{@current_db}/original/"
to = "#{SiteSetting.Upload.s3_base_url}/#{prefix}"
if @dry_run
log "REPLACING '#{from}' WITH '#{to}'"
else
DbHelper.remap(from, to, anchor_left: true)
end
[
[
"src=\"/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)",
"src=\"#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1"
],
[
"src='/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)",
"src='#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1"
],
[
"href=\"/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)",
"href=\"#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1"
],
[
"href='/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)",
"href='#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1"
],
[
"\\[img\\]/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)\\[/img\\]",
"[img]#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1[/img]"
]
].each do |from_url, to_url|
if @dry_run
log "REPLACING '#{from_url}' WITH '#{to_url}'"
else
DbHelper.regexp_replace(from_url, to_url)
end
end
unless @dry_run
# Legacy inline image format
Post.where("raw LIKE '%![](/uploads/default/original/%)%'").each do |post|
regexp = /!\[\](\/uploads\/#{@current_db}\/original\/(\dX\/(?:[a-f0-9]\/)*[a-f0-9]{40}[a-z0-9\.]*))/
post.raw.scan(regexp).each do |upload_url, _|
upload = Upload.get_from_url(upload_url)
post.raw = post.raw.gsub("![](#{upload_url})", "![](#{upload.short_url})")
end
post.save!(validate: false)
end
end
if Discourse.asset_host.present?
# Uploads that were on local CDN will now be on S3 CDN
from = "#{Discourse.asset_host}/uploads/#{@current_db}/original/"
to = "#{SiteSetting.Upload.s3_cdn_url}/#{prefix}"
if @dry_run
log "REMAPPING '#{from}' TO '#{to}'"
else
DbHelper.remap(from, to)
end
end
# Uploads that were on base hostname will now be on S3 CDN
from = "#{Discourse.base_url}/uploads/#{@current_db}/original/"
to = "#{SiteSetting.Upload.s3_cdn_url}/#{prefix}"
if @dry_run
log "REMAPPING '#{from}' TO '#{to}'"
else
DbHelper.remap(from, to)
end
unless @dry_run
log "Removing old optimized images..."
OptimizedImage
.joins("LEFT JOIN uploads u ON optimized_images.upload_id = u.id")
.where("u.id IS NOT NULL AND u.url LIKE '//%' AND optimized_images.url NOT LIKE '//%'")
.delete_all
log "Flagging all posts containing lightboxes for rebake..."
count = Post.where("cooked LIKE '%class=\"lightbox\"%'").update_all(baked_version: nil)
log "#{count} posts were flagged for a rebake"
end
end
migration_successful?(should_raise: true)
log "Done!"
ensure
Jobs.run_later!
end
end
end