mirror of
https://github.com/discourse/discourse.git
synced 2025-01-23 07:22:02 +08:00
b0416cb1c1
Uploading lots of small files can be made significantly faster by parallelizing the `s3.put_object` calls. In testing, an UPLOAD_CONCURRENCY of 10 made a large restore 10x faster. An UPLOAD_CONCURRENCY of 20 made the same restore 18x faster. This commit is careful to parallelize as little as possible, to reduce the chance of concurrency issues. In the worker threads, no database transactions are performed. All modification of shared objects is controlled with a mutex. Unfortunately we do not have any existing tests for the `ToS3Migration` class. This change has been tested with a large site backup (120k uploads totalling 45GB)
382 lines
12 KiB
Ruby
382 lines
12 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require 'aws-sdk-s3'
|
|
|
|
module FileStore
|
|
ToS3MigrationError = Class.new(RuntimeError)
|
|
|
|
class ToS3Migration
|
|
MISSING_UPLOADS_RAKE_TASK_NAME ||= 'posts:missing_uploads'
|
|
UPLOAD_CONCURRENCY ||= 20
|
|
|
|
def initialize(s3_options:, dry_run: false, migrate_to_multisite: false, skip_etag_verify: false)
|
|
|
|
@s3_bucket = s3_options[:bucket]
|
|
@s3_client_options = s3_options[:client_options]
|
|
@dry_run = dry_run
|
|
@migrate_to_multisite = migrate_to_multisite
|
|
@skip_etag_verify = skip_etag_verify
|
|
@current_db = RailsMultisite::ConnectionManagement.current_db
|
|
end
|
|
|
|
def self.s3_options_from_site_settings
|
|
{
|
|
client_options: S3Helper.s3_options(SiteSetting),
|
|
bucket: SiteSetting.Upload.s3_upload_bucket
|
|
}
|
|
end
|
|
|
|
def self.s3_options_from_env
|
|
unless ENV["DISCOURSE_S3_BUCKET"].present? &&
|
|
ENV["DISCOURSE_S3_REGION"].present? &&
|
|
(
|
|
(
|
|
ENV["DISCOURSE_S3_ACCESS_KEY_ID"].present? &&
|
|
ENV["DISCOURSE_S3_SECRET_ACCESS_KEY"].present?
|
|
) || ENV["DISCOURSE_S3_USE_IAM_PROFILE"].present?
|
|
)
|
|
|
|
raise ToS3MigrationError.new(<<~TEXT)
|
|
Please provide the following environment variables:
|
|
- DISCOURSE_S3_BUCKET
|
|
- DISCOURSE_S3_REGION
|
|
and either
|
|
- DISCOURSE_S3_ACCESS_KEY_ID
|
|
- DISCOURSE_S3_SECRET_ACCESS_KEY
|
|
or
|
|
- DISCOURSE_S3_USE_IAM_PROFILE
|
|
TEXT
|
|
end
|
|
|
|
opts = { region: ENV["DISCOURSE_S3_REGION"] }
|
|
opts[:endpoint] = ENV["DISCOURSE_S3_ENDPOINT"] if ENV["DISCOURSE_S3_ENDPOINT"].present?
|
|
|
|
if ENV["DISCOURSE_S3_USE_IAM_PROFILE"].blank?
|
|
opts[:access_key_id] = ENV["DISCOURSE_S3_ACCESS_KEY_ID"]
|
|
opts[:secret_access_key] = ENV["DISCOURSE_S3_SECRET_ACCESS_KEY"]
|
|
end
|
|
|
|
{
|
|
client_options: opts,
|
|
bucket: ENV["DISCOURSE_S3_BUCKET"]
|
|
}
|
|
end
|
|
|
|
def migrate
|
|
migrate_to_s3
|
|
end
|
|
|
|
def migration_successful?(should_raise: false)
|
|
success = true
|
|
|
|
failure_message = "S3 migration failed for db '#{@current_db}'."
|
|
prefix = @migrate_to_multisite ? "uploads/#{@current_db}/original/" : "original/"
|
|
|
|
base_url = File.join(SiteSetting.Upload.s3_base_url, prefix)
|
|
count = Upload.by_users.where("url NOT LIKE '#{base_url}%'").count
|
|
if count > 0
|
|
error_message = "#{count} of #{Upload.count} uploads are not migrated to S3. #{failure_message}"
|
|
raise_or_log(error_message, should_raise)
|
|
success = false
|
|
end
|
|
|
|
cdn_path = SiteSetting.cdn_path("/uploads/#{@current_db}/original").sub(/https?:/, "")
|
|
count = Post.where("cooked LIKE '%#{cdn_path}%'").count
|
|
if count > 0
|
|
error_message = "#{count} posts are not remapped to new S3 upload URL. #{failure_message}"
|
|
raise_or_log(error_message, should_raise)
|
|
success = false
|
|
end
|
|
|
|
Discourse::Application.load_tasks unless Rake::Task.task_defined?(MISSING_UPLOADS_RAKE_TASK_NAME)
|
|
Rake::Task[MISSING_UPLOADS_RAKE_TASK_NAME]
|
|
count = DB.query_single(<<~SQL, Post::MISSING_UPLOADS, Post::MISSING_UPLOADS_IGNORED).first
|
|
SELECT COUNT(1)
|
|
FROM posts p
|
|
WHERE EXISTS (
|
|
SELECT 1
|
|
FROM post_custom_fields f
|
|
WHERE f.post_id = p.id AND f.name = ?
|
|
) AND NOT EXISTS (
|
|
SELECT 1
|
|
FROM post_custom_fields f
|
|
WHERE f.post_id = p.id AND f.name = ?
|
|
)
|
|
SQL
|
|
if count > 0
|
|
error_message = "rake posts:missing_uploads identified #{count} issues. #{failure_message}"
|
|
raise_or_log(error_message, should_raise)
|
|
success = false
|
|
end
|
|
|
|
count = Post.where('baked_version <> ? OR baked_version IS NULL', Post::BAKED_VERSION).count
|
|
if count > 0
|
|
log("#{count} posts still require rebaking and will be rebaked during regular job")
|
|
log("To speed up migrations of posts we recommend you run 'rake posts:rebake_uncooked_posts'") if count > 100
|
|
success = false
|
|
else
|
|
log("No posts require rebaking")
|
|
end
|
|
|
|
success
|
|
end
|
|
|
|
protected
|
|
|
|
def log(message)
|
|
puts message
|
|
end
|
|
|
|
def raise_or_log(message, should_raise)
|
|
if should_raise
|
|
raise ToS3MigrationError.new(message)
|
|
else
|
|
log(message)
|
|
end
|
|
end
|
|
|
|
def uploads_migrated_to_new_scheme?
|
|
seeded_image_url = "uploads/#{@current_db}/original/_X/"
|
|
!Upload.by_users.where("url NOT LIKE '//%' AND url NOT LIKE '/%#{seeded_image_url}%'").exists?
|
|
end
|
|
|
|
def migrate_to_s3
|
|
# we don't want have migrated state, ensure we run all jobs here
|
|
Jobs.run_immediately!
|
|
|
|
log "*" * 30 + " DRY RUN " + "*" * 30 if @dry_run
|
|
log "Migrating uploads to S3 for '#{@current_db}'..."
|
|
|
|
if !uploads_migrated_to_new_scheme?
|
|
log "Some uploads were not migrated to the new scheme. Running the migration, this may take a while..."
|
|
SiteSetting.migrate_to_new_scheme = true
|
|
Upload.migrate_to_new_scheme
|
|
|
|
if !uploads_migrated_to_new_scheme?
|
|
raise ToS3MigrationError.new("Some uploads could not be migrated to the new scheme. " \
|
|
"You need to fix this manually.")
|
|
end
|
|
end
|
|
|
|
bucket_has_folder_path = true if @s3_bucket.include? "/"
|
|
public_directory = Rails.root.join("public").to_s
|
|
|
|
s3 = Aws::S3::Client.new(@s3_client_options)
|
|
|
|
if bucket_has_folder_path
|
|
bucket, folder = S3Helper.get_bucket_and_folder_path(@s3_bucket)
|
|
folder = File.join(folder, "/")
|
|
else
|
|
bucket, folder = @s3_bucket, ""
|
|
end
|
|
|
|
log "Uploading files to S3..."
|
|
log " - Listing local files"
|
|
|
|
local_files = []
|
|
IO.popen("cd #{public_directory} && find uploads/#{@current_db}/original -type f").each do |file|
|
|
local_files << file.chomp
|
|
putc "." if local_files.size % 1000 == 0
|
|
end
|
|
|
|
log " => #{local_files.size} files"
|
|
log " - Listing S3 files"
|
|
|
|
s3_objects = []
|
|
prefix = @migrate_to_multisite ? "uploads/#{@current_db}/original/" : "original/"
|
|
|
|
options = { bucket: bucket, prefix: folder + prefix }
|
|
|
|
loop do
|
|
response = s3.list_objects_v2(options)
|
|
s3_objects.concat(response.contents)
|
|
putc "."
|
|
break if response.next_continuation_token.blank?
|
|
options[:continuation_token] = response.next_continuation_token
|
|
end
|
|
|
|
log " => #{s3_objects.size} files"
|
|
log " - Syncing files to S3"
|
|
|
|
queue = Queue.new
|
|
synced = 0
|
|
failed = []
|
|
|
|
lock = Mutex.new
|
|
upload_threads = UPLOAD_CONCURRENCY.times.map do
|
|
Thread.new do
|
|
while obj = queue.pop
|
|
if s3.put_object(obj[:options]).etag[obj[:etag]]
|
|
putc "."
|
|
lock.synchronize { synced += 1 }
|
|
else
|
|
putc "X"
|
|
lock.synchronize { failed << obj[:path] }
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
local_files.each do |file|
|
|
path = File.join(public_directory, file)
|
|
name = File.basename(path)
|
|
etag = Digest::MD5.file(path).hexdigest unless @skip_etag_verify
|
|
key = file[file.index(prefix)..-1]
|
|
key.prepend(folder) if bucket_has_folder_path
|
|
original_path = file.sub("uploads/#{@current_db}", "")
|
|
|
|
if s3_object = s3_objects.find { |obj| obj.key.ends_with?(original_path) }
|
|
next if File.size(path) == s3_object.size && (@skip_etag_verify || s3_object.etag[etag])
|
|
end
|
|
|
|
options = {
|
|
acl: "public-read",
|
|
body: File.open(path, "rb"),
|
|
bucket: bucket,
|
|
content_type: MiniMime.lookup_by_filename(name)&.content_type,
|
|
key: key,
|
|
}
|
|
|
|
if !FileHelper.is_supported_image?(name)
|
|
upload = Upload.find_by(url: "/#{file}")
|
|
|
|
if upload&.original_filename
|
|
options[:content_disposition] = ActionDispatch::Http::ContentDisposition.format(
|
|
disposition: "attachment", filename: upload.original_filename
|
|
)
|
|
end
|
|
|
|
if upload&.secure
|
|
options[:acl] = "private"
|
|
end
|
|
elsif !FileHelper.is_inline_image?(name)
|
|
upload = Upload.find_by(url: "/#{file}")
|
|
options[:content_disposition] = ActionDispatch::Http::ContentDisposition.format(
|
|
disposition: "attachment", filename: upload&.original_filename || name
|
|
)
|
|
end
|
|
|
|
etag ||= Digest::MD5.file(path).hexdigest
|
|
|
|
if @dry_run
|
|
log "#{file} => #{options[:key]}"
|
|
synced += 1
|
|
else
|
|
queue << { path: path, options: options, etag: etag }
|
|
end
|
|
end
|
|
|
|
queue.close
|
|
upload_threads.each(&:join)
|
|
|
|
puts
|
|
|
|
failure_message = "S3 migration failed for db '#{@current_db}'."
|
|
|
|
if failed.size > 0
|
|
log "Failed to upload #{failed.size} files"
|
|
log failed.join("\n")
|
|
raise failure_message
|
|
elsif s3_objects.size + synced >= local_files.size
|
|
log "Updating the URLs in the database..."
|
|
|
|
from = "/uploads/#{@current_db}/original/"
|
|
to = "#{SiteSetting.Upload.s3_base_url}/#{prefix}"
|
|
|
|
if @dry_run
|
|
log "REPLACING '#{from}' WITH '#{to}'"
|
|
else
|
|
DbHelper.remap(from, to, anchor_left: true)
|
|
end
|
|
|
|
[
|
|
[
|
|
"src=\"/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)",
|
|
"src=\"#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1"
|
|
],
|
|
[
|
|
"src='/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)",
|
|
"src='#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1"
|
|
],
|
|
[
|
|
"href=\"/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)",
|
|
"href=\"#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1"
|
|
],
|
|
[
|
|
"href='/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)",
|
|
"href='#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1"
|
|
],
|
|
[
|
|
"\\[img\\]/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)\\[/img\\]",
|
|
"[img]#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1[/img]"
|
|
]
|
|
].each do |from_url, to_url|
|
|
|
|
if @dry_run
|
|
log "REPLACING '#{from_url}' WITH '#{to_url}'"
|
|
else
|
|
DbHelper.regexp_replace(from_url, to_url)
|
|
end
|
|
end
|
|
|
|
unless @dry_run
|
|
# Legacy inline image format
|
|
Post.where("raw LIKE '%![](/uploads/default/original/%)%'").each do |post|
|
|
regexp = /!\[\](\/uploads\/#{@current_db}\/original\/(\dX\/(?:[a-f0-9]\/)*[a-f0-9]{40}[a-z0-9\.]*))/
|
|
|
|
post.raw.scan(regexp).each do |upload_url, _|
|
|
upload = Upload.get_from_url(upload_url)
|
|
post.raw = post.raw.gsub("![](#{upload_url})", "![](#{upload.short_url})")
|
|
end
|
|
|
|
post.save!(validate: false)
|
|
end
|
|
end
|
|
|
|
if Discourse.asset_host.present?
|
|
# Uploads that were on local CDN will now be on S3 CDN
|
|
from = "#{Discourse.asset_host}/uploads/#{@current_db}/original/"
|
|
to = "#{SiteSetting.Upload.s3_cdn_url}/#{prefix}"
|
|
|
|
if @dry_run
|
|
log "REMAPPING '#{from}' TO '#{to}'"
|
|
else
|
|
DbHelper.remap(from, to)
|
|
end
|
|
end
|
|
|
|
# Uploads that were on base hostname will now be on S3 CDN
|
|
from = "#{Discourse.base_url}/uploads/#{@current_db}/original/"
|
|
to = "#{SiteSetting.Upload.s3_cdn_url}/#{prefix}"
|
|
|
|
if @dry_run
|
|
log "REMAPPING '#{from}' TO '#{to}'"
|
|
else
|
|
DbHelper.remap(from, to)
|
|
end
|
|
|
|
unless @dry_run
|
|
log "Removing old optimized images..."
|
|
|
|
OptimizedImage
|
|
.joins("LEFT JOIN uploads u ON optimized_images.upload_id = u.id")
|
|
.where("u.id IS NOT NULL AND u.url LIKE '//%' AND optimized_images.url NOT LIKE '//%'")
|
|
.delete_all
|
|
|
|
log "Flagging all posts containing lightboxes for rebake..."
|
|
|
|
count = Post.where("cooked LIKE '%class=\"lightbox\"%'").update_all(baked_version: nil)
|
|
log "#{count} posts were flagged for a rebake"
|
|
end
|
|
end
|
|
|
|
migration_successful?(should_raise: true)
|
|
|
|
log "Done!"
|
|
|
|
ensure
|
|
Jobs.run_later!
|
|
end
|
|
end
|
|
end
|