mirror of
https://github.com/discourse/discourse.git
synced 2024-11-22 10:33:51 +08:00
5a00a041f1
A "bad upload" in this context is a upload with a mismatched URL. This can happen when changing the S3 bucket used for uploads and the upload records in the database have not been remapped correctly.
359 lines
11 KiB
Ruby
359 lines
11 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "aws-sdk-s3"
|
|
require "csv"
|
|
|
|
class S3Inventory
|
|
attr_reader :type, :inventory_date, :s3_helper
|
|
|
|
CSV_KEY_INDEX = 1
|
|
CSV_ETAG_INDEX = 2
|
|
INVENTORY_PREFIX = "inventory"
|
|
INVENTORY_LAG = 2.days
|
|
WAIT_AFTER_RESTORE_DAYS = 2
|
|
|
|
def initialize(
|
|
type,
|
|
s3_inventory_bucket:,
|
|
preloaded_inventory_file: nil,
|
|
preloaded_inventory_date: nil,
|
|
s3_options: {}
|
|
)
|
|
@s3_helper = S3Helper.new(s3_inventory_bucket, "", s3_options)
|
|
|
|
if preloaded_inventory_file && preloaded_inventory_date
|
|
# Data preloaded, so we don't need to fetch it again
|
|
@preloaded_inventory_file = preloaded_inventory_file
|
|
@inventory_date = preloaded_inventory_date
|
|
end
|
|
|
|
if type == :upload
|
|
@type = "original"
|
|
@model = Upload
|
|
@scope = @model.by_users.without_s3_file_missing_confirmed_verification_status
|
|
elsif type == :optimized
|
|
@type = "optimized"
|
|
@scope = @model = OptimizedImage
|
|
else
|
|
raise "Invalid type: #{type}"
|
|
end
|
|
end
|
|
|
|
def backfill_etags_and_list_missing
|
|
if !@preloaded_inventory_file && files.blank?
|
|
error("Failed to list inventory from S3")
|
|
return
|
|
end
|
|
|
|
DistributedMutex.synchronize("s3_inventory_list_missing_#{type}", validity: 30.minutes) do
|
|
begin
|
|
download_and_decompress_files if !@preloaded_inventory_file
|
|
|
|
multisite_prefix = Discourse.store.upload_path
|
|
|
|
ActiveRecord::Base.transaction do
|
|
begin
|
|
connection.exec(
|
|
"CREATE TEMP TABLE #{tmp_table_name}(url text UNIQUE, etag text, PRIMARY KEY(etag, url))",
|
|
)
|
|
|
|
connection.copy_data("COPY #{tmp_table_name} FROM STDIN CSV") do
|
|
for_each_inventory_row do |row|
|
|
key = row[CSV_KEY_INDEX]
|
|
|
|
next if Rails.configuration.multisite && key.exclude?(multisite_prefix)
|
|
next if key.exclude?("#{type}/")
|
|
|
|
url = File.join(Discourse.store.absolute_base_url, key)
|
|
connection.put_copy_data("#{url},#{row[CSV_ETAG_INDEX]}\n")
|
|
end
|
|
end
|
|
|
|
table_name = @model.table_name
|
|
|
|
# backfilling etags
|
|
connection.async_exec(
|
|
"UPDATE #{table_name}
|
|
SET etag = #{tmp_table_name}.etag
|
|
FROM #{tmp_table_name}
|
|
WHERE #{table_name}.etag IS NULL AND
|
|
#{table_name}.url = #{tmp_table_name}.url",
|
|
)
|
|
|
|
uploads = @scope.where("updated_at < ?", inventory_date)
|
|
|
|
missing_uploads =
|
|
uploads.joins(
|
|
"LEFT JOIN #{tmp_table_name} ON #{tmp_table_name}.etag = #{table_name}.etag",
|
|
).where(
|
|
"#{tmp_table_name}.etag IS NULL OR #{tmp_table_name}.url != #{table_name}.url",
|
|
)
|
|
|
|
exists_with_different_etag =
|
|
missing_uploads
|
|
.joins(
|
|
"LEFT JOIN #{tmp_table_name} inventory2 ON inventory2.url = #{table_name}.url",
|
|
)
|
|
.where("inventory2.etag IS NOT NULL")
|
|
.pluck(:id)
|
|
|
|
exists_with_different_url =
|
|
missing_uploads
|
|
.joins(
|
|
"LEFT JOIN #{tmp_table_name} inventory3 ON inventory3.etag = #{table_name}.etag",
|
|
)
|
|
.where("inventory3.url != #{table_name}.url")
|
|
.pluck(:id)
|
|
|
|
# marking as verified/not verified
|
|
if @model == Upload
|
|
sql_params = {
|
|
inventory_date: inventory_date,
|
|
invalid_etag: Upload.verification_statuses[:invalid_etag],
|
|
invalid_url: Upload.verification_statuses[:invalid_url],
|
|
s3_file_missing_confirmed: Upload.verification_statuses[:s3_file_missing_confirmed],
|
|
verified: Upload.verification_statuses[:verified],
|
|
seeded_id_threshold: @model::SEEDED_ID_THRESHOLD,
|
|
}
|
|
|
|
DB.exec(<<~SQL, sql_params)
|
|
UPDATE #{table_name}
|
|
SET verification_status = :verified
|
|
WHERE etag IS NOT NULL
|
|
AND verification_status <> :verified
|
|
AND verification_status <> :s3_file_missing_confirmed
|
|
AND updated_at < :inventory_date
|
|
AND id > :seeded_id_threshold
|
|
AND EXISTS
|
|
(
|
|
SELECT 1
|
|
FROM #{tmp_table_name}
|
|
WHERE #{tmp_table_name}.etag = #{table_name}.etag
|
|
)
|
|
SQL
|
|
|
|
DB.exec(<<~SQL, sql_params)
|
|
UPDATE #{table_name}
|
|
SET verification_status = :invalid_etag
|
|
WHERE verification_status <> :invalid_etag
|
|
AND verification_status <> :s3_file_missing_confirmed
|
|
AND updated_at < :inventory_date
|
|
AND id > :seeded_id_threshold
|
|
AND NOT EXISTS
|
|
(
|
|
SELECT 1
|
|
FROM #{tmp_table_name}
|
|
WHERE #{tmp_table_name}.etag = #{table_name}.etag
|
|
)
|
|
SQL
|
|
|
|
DB.exec(<<~SQL, sql_params)
|
|
UPDATE #{table_name}
|
|
SET verification_status = :invalid_url
|
|
WHERE verification_status <> :invalid_url
|
|
AND verification_status <> :invalid_etag
|
|
AND verification_status <> :s3_file_missing_confirmed
|
|
AND updated_at < :inventory_date
|
|
AND id > :seeded_id_threshold
|
|
AND NOT EXISTS
|
|
(
|
|
SELECT 1
|
|
FROM #{tmp_table_name}
|
|
WHERE #{tmp_table_name}.url = #{table_name}.url
|
|
)
|
|
SQL
|
|
end
|
|
|
|
if (missing_count = missing_uploads.count) > 0
|
|
missing_uploads
|
|
.select(:id, :url)
|
|
.find_each do |upload|
|
|
if exists_with_different_etag.include?(upload.id)
|
|
log "#{upload.url} has different etag"
|
|
elsif exists_with_different_url.include?(upload.id)
|
|
log "#{upload.url} has different url"
|
|
else
|
|
log upload.url
|
|
end
|
|
end
|
|
|
|
log "#{missing_count} of #{uploads.count} #{@scope.name.underscore.pluralize} are missing"
|
|
if exists_with_different_etag.present?
|
|
log "#{exists_with_different_etag.count} of these are caused by differing etags"
|
|
log "Null the etag column and re-run for automatic backfill"
|
|
end
|
|
if exists_with_different_url.present?
|
|
log "#{exists_with_different_url.count} of these are caused by differing urls"
|
|
log "Empty the url column and re-run for automatic backfill"
|
|
end
|
|
end
|
|
|
|
set_missing_s3_discourse_stats(missing_count)
|
|
ensure
|
|
connection.exec("DROP TABLE #{tmp_table_name}") unless connection.nil?
|
|
end
|
|
end
|
|
ensure
|
|
cleanup!
|
|
end
|
|
end
|
|
end
|
|
|
|
def for_each_inventory_row
|
|
if @preloaded_inventory_file
|
|
CSV.foreach(@preloaded_inventory_file) { |row| yield(row) }
|
|
else
|
|
files.each { |file| CSV.foreach(file[:filename][0...-3]) { |row| yield(row) } }
|
|
end
|
|
end
|
|
|
|
def download_inventory_file_to_tmp_directory(file)
|
|
return if File.exist?(file[:filename])
|
|
|
|
log "Downloading inventory file '#{file[:key]}' to tmp directory..."
|
|
failure_message = "Failed to inventory file '#{file[:key]}' to tmp directory."
|
|
|
|
@s3_helper.download_file(file[:key], file[:filename], failure_message)
|
|
end
|
|
|
|
def decompress_inventory_file(file)
|
|
log "Decompressing inventory file '#{file[:filename]}', this may take a while..."
|
|
Discourse::Utils.execute_command(
|
|
"gzip",
|
|
"--decompress",
|
|
file[:filename],
|
|
failure_message: "Failed to decompress inventory file '#{file[:filename]}'.",
|
|
chdir: tmp_directory,
|
|
)
|
|
end
|
|
|
|
def prepare_for_all_sites
|
|
db_names = RailsMultisite::ConnectionManagement.all_dbs
|
|
db_files = {}
|
|
|
|
db_names.each { |db| db_files[db] = Tempfile.new("#{db}-inventory.csv") }
|
|
|
|
download_and_decompress_files
|
|
for_each_inventory_row do |row|
|
|
key = row[CSV_KEY_INDEX]
|
|
row_db = key.match(%r{uploads/([^/]+)/})&.[](1)
|
|
if row_db && file = db_files[row_db]
|
|
file.write(row.to_csv)
|
|
end
|
|
end
|
|
|
|
db_names.each { |db| db_files[db].rewind }
|
|
|
|
db_files
|
|
ensure
|
|
cleanup!
|
|
end
|
|
|
|
def s3_client
|
|
@s3_helper.s3_client
|
|
end
|
|
|
|
private
|
|
|
|
def cleanup!
|
|
return if @preloaded_inventory_file
|
|
files.each do |file|
|
|
File.delete(file[:filename]) if File.exist?(file[:filename])
|
|
File.delete(file[:filename][0...-3]) if File.exist?(file[:filename][0...-3])
|
|
end
|
|
end
|
|
|
|
def connection
|
|
@connection ||= ActiveRecord::Base.connection.raw_connection
|
|
end
|
|
|
|
def tmp_table_name
|
|
"#{type}_inventory"
|
|
end
|
|
|
|
def files
|
|
return if @preloaded_inventory_file
|
|
|
|
@files ||=
|
|
begin
|
|
symlink_file = unsorted_files.sort_by { |file| -file.last_modified.to_i }.first
|
|
|
|
return [] if symlink_file.blank?
|
|
|
|
if BackupMetadata.last_restore_date.present? &&
|
|
(symlink_file.last_modified - WAIT_AFTER_RESTORE_DAYS.days) <
|
|
BackupMetadata.last_restore_date
|
|
set_missing_s3_discourse_stats(0)
|
|
return []
|
|
end
|
|
|
|
@inventory_date = symlink_file.last_modified - INVENTORY_LAG
|
|
log "Downloading symlink file to tmp directory..."
|
|
failure_message = "Failed to download symlink file to tmp directory."
|
|
filename = File.join(tmp_directory, File.basename(symlink_file.key))
|
|
|
|
@s3_helper.download_file(symlink_file.key, filename, failure_message)
|
|
|
|
return [] if !File.exist?(filename)
|
|
|
|
File
|
|
.readlines(filename)
|
|
.map do |key|
|
|
key = key.sub("s3://#{bucket_name}/", "").sub("\n", "")
|
|
{ key: key, filename: File.join(tmp_directory, File.basename(key)) }
|
|
end
|
|
end
|
|
end
|
|
|
|
def download_and_decompress_files
|
|
files.each do |file|
|
|
next if File.exist?(file[:filename][0...-3])
|
|
|
|
download_inventory_file_to_tmp_directory(file)
|
|
decompress_inventory_file(file)
|
|
end
|
|
end
|
|
|
|
def tmp_directory
|
|
@tmp_directory ||=
|
|
begin
|
|
current_db = RailsMultisite::ConnectionManagement.current_db
|
|
directory = File.join(Rails.root, "tmp", INVENTORY_PREFIX, current_db)
|
|
FileUtils.mkdir_p(directory)
|
|
directory
|
|
end
|
|
end
|
|
|
|
def bucket_name
|
|
@s3_helper.s3_bucket_name
|
|
end
|
|
|
|
def bucket_folder_path
|
|
@s3_helper.s3_bucket_folder_path
|
|
end
|
|
|
|
def unsorted_files
|
|
objects = []
|
|
hive_path = File.join(bucket_folder_path, "hive")
|
|
@s3_helper.list(hive_path).each { |obj| objects << obj if obj.key.match?(/symlink\.txt\z/i) }
|
|
|
|
objects
|
|
rescue Aws::Errors::ServiceError => e
|
|
log("Failed to list inventory from S3", e)
|
|
[]
|
|
end
|
|
|
|
def log(message, ex = nil)
|
|
puts(message)
|
|
Rails.logger.error("#{ex}\n" + (ex.backtrace || []).join("\n")) if ex
|
|
end
|
|
|
|
def error(message)
|
|
log(message, StandardError.new(message))
|
|
end
|
|
|
|
def set_missing_s3_discourse_stats(count)
|
|
Discourse.stats.set("missing_s3_#{@model.table_name}", count)
|
|
end
|
|
end
|