discourse/lib/file_store/to_s3_migration.rb
Gerhard Schlager 93b8b04b06 FIX: Migrating uploads to S3 could miss files
The rake task aborted the migration with "Already migrated" when all upload URLs linked to the correct S3 bucket even though the files didn't exist on S3. By removing the first check we force the rake task to check for the existance of uploads on S3.
2020-03-04 12:50:48 +01:00

356 lines
11 KiB
Ruby

# frozen_string_literal: true
module FileStore
ToS3MigrationError = Class.new(RuntimeError)
class ToS3Migration
def initialize(s3_options:, dry_run: false, migrate_to_multisite: false, skip_etag_verify: false)
@s3_bucket = s3_options[:bucket]
@s3_client_options = s3_options[:client_options]
@dry_run = dry_run
@migrate_to_multisite = migrate_to_multisite
@skip_etag_verify = skip_etag_verify
@current_db = RailsMultisite::ConnectionManagement.current_db
end
def self.s3_options_from_site_settings
{
client_options: S3Helper.s3_options(SiteSetting),
bucket: SiteSetting.s3_upload_bucket
}
end
def self.s3_options_from_env
unless ENV["DISCOURSE_S3_BUCKET"].present? &&
ENV["DISCOURSE_S3_REGION"].present? &&
(
(
ENV["DISCOURSE_S3_ACCESS_KEY_ID"].present? &&
ENV["DISCOURSE_S3_SECRET_ACCESS_KEY"].present?
) || ENV["DISCOURSE_S3_USE_IAM_PROFILE"].present?
)
raise ToS3MigrationError.new(<<~TEXT)
Please provide the following environment variables:
- DISCOURSE_S3_BUCKET
- DISCOURSE_S3_REGION
and either
- DISCOURSE_S3_ACCESS_KEY_ID
- DISCOURSE_S3_SECRET_ACCESS_KEY
or
- DISCOURSE_S3_USE_IAM_PROFILE
TEXT
end
opts = { region: ENV["DISCOURSE_S3_REGION"] }
opts[:endpoint] = ENV["DISCOURSE_S3_ENDPOINT"] if ENV["DISCOURSE_S3_ENDPOINT"].present?
if ENV["DISCOURSE_S3_USE_IAM_PROFILE"].blank?
opts[:access_key_id] = ENV["DISCOURSE_S3_ACCESS_KEY_ID"]
opts[:secret_access_key] = ENV["DISCOURSE_S3_SECRET_ACCESS_KEY"]
end
{
client_options: opts,
bucket: ENV["DISCOURSE_S3_BUCKET"]
}
end
def migrate
migrate_to_s3
end
def migration_successful?(should_raise: false)
success = true
failure_message = "S3 migration failed for db '#{@current_db}'."
prefix = @migrate_to_multisite ? "uploads/#{@current_db}/original/" : "original/"
base_url = File.join(SiteSetting.Upload.s3_base_url, prefix)
count = Upload.by_users.where("url NOT LIKE '#{base_url}%'").count
if count > 0
error_message = "#{count} of #{Upload.count} uploads are not migrated to S3. #{failure_message}"
raise_or_log(error_message, should_raise)
success = false
end
cdn_path = SiteSetting.cdn_path("/uploads/#{@current_db}/original").sub(/https?:/, "")
count = Post.where("cooked LIKE '%#{cdn_path}%'").count
if count > 0
error_message = "#{count} posts are not remapped to new S3 upload URL. #{failure_message}"
raise_or_log(error_message, should_raise)
success = false
end
Discourse::Application.load_tasks
Rake::Task['posts:missing_uploads']
count = DB.query_single(<<~SQL, Post::MISSING_UPLOADS, Post::MISSING_UPLOADS_IGNORED).first
SELECT COUNT(1)
FROM posts p
WHERE EXISTS (
SELECT 1
FROM post_custom_fields f
WHERE f.post_id = p.id AND f.name = ?
) AND NOT EXISTS (
SELECT 1
FROM post_custom_fields f
WHERE f.post_id = p.id AND f.name = ?
)
SQL
if count > 0
error_message = "rake posts:missing_uploads identified #{count} issues. #{failure_message}"
raise_or_log(error_message, should_raise)
success = false
end
count = Post.where('baked_version <> ? OR baked_version IS NULL', Post::BAKED_VERSION).count
if count > 0
log("#{count} posts still require rebaking and will be rebaked during regular job")
log("To speed up migrations of posts we recommend you run 'rake posts:rebake_uncooked_posts'") if count > 100
success = false
else
log("No posts require rebaking")
end
success
end
protected
def log(message)
puts message
end
def raise_or_log(message, should_raise)
if should_raise
raise ToS3MigrationError.new(message)
else
log(message)
end
end
def uploads_migrated_to_new_scheme?
seeded_image_url = "#{GlobalSetting.relative_url_root}/uploads/#{@current_db}/original/_X/"
!Upload.by_users.where("url NOT LIKE '//%' AND url NOT LIKE '#{seeded_image_url}%'").exists?
end
def migrate_to_s3
# we don't want have migrated state, ensure we run all jobs here
Jobs.run_immediately!
log "*" * 30 + " DRY RUN " + "*" * 30 if @dry_run
log "Migrating uploads to S3 for '#{@current_db}'..."
if !uploads_migrated_to_new_scheme?
log "Some uploads were not migrated to the new scheme. Running the migration, this may take a while..."
SiteSetting.migrate_to_new_scheme = true
Upload.migrate_to_new_scheme
if !uploads_migrated_to_new_scheme?
raise ToS3MigrationError.new("Some uploads could not be migrated to the new scheme. " \
"You need to fix this manually.")
end
end
bucket_has_folder_path = true if @s3_bucket.include? "/"
public_directory = Rails.root.join("public").to_s
s3 = Aws::S3::Client.new(@s3_client_options)
if bucket_has_folder_path
bucket, folder = S3Helper.get_bucket_and_folder_path(@s3_bucket)
folder = File.join(folder, "/")
else
bucket, folder = @s3_bucket, ""
end
log "Uploading files to S3..."
log " - Listing local files"
local_files = []
IO.popen("cd #{public_directory} && find uploads/#{@current_db}/original -type f").each do |file|
local_files << file.chomp
putc "." if local_files.size % 1000 == 0
end
log " => #{local_files.size} files"
log " - Listing S3 files"
s3_objects = []
prefix = @migrate_to_multisite ? "uploads/#{@current_db}/original/" : "original/"
options = { bucket: bucket, prefix: folder + prefix }
loop do
response = s3.list_objects_v2(options)
s3_objects.concat(response.contents)
putc "."
break if response.next_continuation_token.blank?
options[:continuation_token] = response.next_continuation_token
end
log " => #{s3_objects.size} files"
log " - Syncing files to S3"
synced = 0
failed = []
local_files.each do |file|
path = File.join(public_directory, file)
name = File.basename(path)
etag = Digest::MD5.file(path).hexdigest unless @skip_etag_verify
key = file[file.index(prefix)..-1]
key.prepend(folder) if bucket_has_folder_path
original_path = file.sub("uploads/#{@current_db}", "")
if s3_object = s3_objects.find { |obj| obj.key.ends_with?(original_path) }
next if File.size(path) == s3_object.size && (@skip_etag_verify || s3_object.etag[etag])
end
options = {
acl: "public-read",
body: File.open(path, "rb"),
bucket: bucket,
content_type: MiniMime.lookup_by_filename(name)&.content_type,
key: key,
}
if !FileHelper.is_supported_image?(name)
upload = Upload.find_by(url: "/#{file}")
if upload&.original_filename
options[:content_disposition] =
%Q{attachment; filename="#{upload.original_filename}"}
end
if upload&.secure
options[:acl] = "private"
end
end
etag ||= Digest::MD5.file(path).hexdigest
if @dry_run
log "#{file} => #{options[:key]}"
synced += 1
elsif s3.put_object(options).etag[etag]
putc "."
synced += 1
else
putc "X"
failed << path
end
end
puts
failure_message = "S3 migration failed for db '#{@current_db}'."
if failed.size > 0
log "Failed to upload #{failed.size} files"
log failed.join("\n")
raise failure_message
elsif s3_objects.size + synced >= local_files.size
log "Updating the URLs in the database..."
from = "/uploads/#{@current_db}/original/"
to = "#{SiteSetting.Upload.s3_base_url}/#{prefix}"
if @dry_run
log "REPLACING '#{from}' WITH '#{to}'"
else
DbHelper.remap(from, to, anchor_left: true)
end
[
[
"src=\"/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)",
"src=\"#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1"
],
[
"src='/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)",
"src='#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1"
],
[
"href=\"/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)",
"href=\"#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1"
],
[
"href='/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)",
"href='#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1"
],
[
"\\[img\\]/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)\\[/img\\]",
"[img]#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1[/img]"
]
].each do |from_url, to_url|
if @dry_run
log "REPLACING '#{from_url}' WITH '#{to_url}'"
else
DbHelper.regexp_replace(from_url, to_url)
end
end
unless @dry_run
# Legacy inline image format
Post.where("raw LIKE '%![](/uploads/default/original/%)%'").each do |post|
regexp = /!\[\](\/uploads\/#{@current_db}\/original\/(\dX\/(?:[a-f0-9]\/)*[a-f0-9]{40}[a-z0-9\.]*))/
post.raw.scan(regexp).each do |upload_url, _|
upload = Upload.get_from_url(upload_url)
post.raw = post.raw.gsub("![](#{upload_url})", "![](#{upload.short_url})")
end
post.save!(validate: false)
end
end
if Discourse.asset_host.present?
# Uploads that were on local CDN will now be on S3 CDN
from = "#{Discourse.asset_host}/uploads/#{@current_db}/original/"
to = "#{SiteSetting.Upload.s3_cdn_url}/#{prefix}"
if @dry_run
log "REMAPPING '#{from}' TO '#{to}'"
else
DbHelper.remap(from, to)
end
end
# Uploads that were on base hostname will now be on S3 CDN
from = "#{Discourse.base_url}/uploads/#{@current_db}/original/"
to = "#{SiteSetting.Upload.s3_cdn_url}/#{prefix}"
if @dry_run
log "REMAPPING '#{from}' TO '#{to}'"
else
DbHelper.remap(from, to)
end
unless @dry_run
log "Removing old optimized images..."
OptimizedImage
.joins("LEFT JOIN uploads u ON optimized_images.upload_id = u.id")
.where("u.id IS NOT NULL AND u.url LIKE '//%' AND optimized_images.url NOT LIKE '//%'")
.delete_all
log "Flagging all posts containing lightboxes for rebake..."
count = Post.where("cooked LIKE '%class=\"lightbox\"%'").update_all(baked_version: nil)
log "#{count} posts were flagged for a rebake"
end
end
migration_successful?(should_raise: true)
log "Done!"
ensure
Jobs.run_later!
end
end
end