discourse/lib/s3_inventory.rb
Alan Guo Xiang Tan 1a70d118a8
FIX: S3Inventory#backfill_etags_and_list_missing need to unescape key (#30787)
The `key` provided in the S3 inventory file will esacpe any special
characters in the filename of the key so we need to unescape. Otherwise,
uploads with extensions that conatins special characters will fail to
match records which we insert into the temporary table based off the
s3 inventory file.
2025-01-15 14:52:49 +08:00

326 lines
9.8 KiB
Ruby

# frozen_string_literal: true
require "aws-sdk-s3"
require "csv"
class S3Inventory
attr_reader :type, :inventory_date, :s3_helper
CSV_KEY_INDEX = 1
CSV_ETAG_INDEX = 2
INVENTORY_PREFIX = "inventory"
INVENTORY_LAG = 2.days
WAIT_AFTER_RESTORE_DAYS = 2
def initialize(
type,
s3_inventory_bucket:,
preloaded_inventory_file: nil,
preloaded_inventory_date: nil,
s3_options: {}
)
@s3_helper = S3Helper.new(s3_inventory_bucket, "", s3_options)
if preloaded_inventory_file && preloaded_inventory_date
# Data preloaded, so we don't need to fetch it again
@preloaded_inventory_file = preloaded_inventory_file
@inventory_date = preloaded_inventory_date
end
if type == :upload
@type = "original"
@model = Upload
@scope = @model.by_users.without_s3_file_missing_confirmed_verification_status
elsif type == :optimized
@type = "optimized"
@scope = @model = OptimizedImage
else
raise "Invalid type: #{type}"
end
end
def backfill_etags_and_list_missing
if !@preloaded_inventory_file && files.blank?
error("Failed to list inventory from S3")
return
end
DistributedMutex.synchronize("s3_inventory_list_missing_#{type}", validity: 30.minutes) do
begin
download_and_decompress_files if !@preloaded_inventory_file
multisite_prefix = Discourse.store.upload_path
ActiveRecord::Base.transaction do
begin
connection.exec(
"CREATE TEMP TABLE #{tmp_table_name}(url text UNIQUE, etag text, PRIMARY KEY(etag, url))",
)
connection.copy_data("COPY #{tmp_table_name} FROM STDIN CSV") do
for_each_inventory_row do |row|
key = row[CSV_KEY_INDEX]
next if Rails.configuration.multisite && key.exclude?(multisite_prefix)
next if key.exclude?("#{type}/")
url = File.join(Discourse.store.absolute_base_url, key)
connection.put_copy_data("#{CGI.unescape(url)},#{row[CSV_ETAG_INDEX]}\n")
end
end
table_name = @model.table_name
# backfilling etags
connection.async_exec(
"UPDATE #{table_name}
SET etag = #{tmp_table_name}.etag
FROM #{tmp_table_name}
WHERE #{table_name}.etag IS NULL AND
#{table_name}.url = #{tmp_table_name}.url",
)
uploads = @scope.where("updated_at < ?", inventory_date)
missing_uploads =
uploads.joins(
"LEFT JOIN #{tmp_table_name} ON #{tmp_table_name}.etag = #{table_name}.etag",
).where("#{tmp_table_name}.etag IS NULL")
exists_with_different_etag =
missing_uploads
.joins(
"LEFT JOIN #{tmp_table_name} inventory2 ON inventory2.url = #{table_name}.url",
)
.where("inventory2.etag IS NOT NULL")
.pluck(:id)
# marking as verified/not verified
if @model == Upload
sql_params = {
inventory_date: inventory_date,
invalid_etag: Upload.verification_statuses[:invalid_etag],
s3_file_missing_confirmed: Upload.verification_statuses[:s3_file_missing_confirmed],
verified: Upload.verification_statuses[:verified],
seeded_id_threshold: @model::SEEDED_ID_THRESHOLD,
}
DB.exec(<<~SQL, sql_params)
UPDATE #{table_name}
SET verification_status = :verified
WHERE etag IS NOT NULL
AND verification_status <> :verified
AND verification_status <> :s3_file_missing_confirmed
AND updated_at < :inventory_date
AND id > :seeded_id_threshold
AND EXISTS
(
SELECT 1
FROM #{tmp_table_name}
WHERE #{tmp_table_name}.etag = #{table_name}.etag
)
SQL
DB.exec(<<~SQL, sql_params)
UPDATE #{table_name}
SET verification_status = :invalid_etag
WHERE verification_status <> :invalid_etag
AND verification_status <> :s3_file_missing_confirmed
AND updated_at < :inventory_date
AND id > :seeded_id_threshold
AND NOT EXISTS
(
SELECT 1
FROM #{tmp_table_name}
WHERE #{tmp_table_name}.etag = #{table_name}.etag
)
SQL
end
if (missing_count = missing_uploads.count) > 0
missing_uploads
.select(:id, :url)
.find_each do |upload|
if exists_with_different_etag.include?(upload.id)
log "#{upload.url} has different etag"
else
log upload.url
end
end
log "#{missing_count} of #{uploads.count} #{@scope.name.underscore.pluralize} are missing"
if exists_with_different_etag.present?
log "#{exists_with_different_etag.count} of these are caused by differing etags"
log "Null the etag column and re-run for automatic backfill"
end
end
set_missing_s3_discourse_stats(missing_count)
ensure
connection.exec("DROP TABLE #{tmp_table_name}") unless connection.nil?
end
end
ensure
cleanup!
end
end
end
def for_each_inventory_row
if @preloaded_inventory_file
CSV.foreach(@preloaded_inventory_file) { |row| yield(row) }
else
files.each { |file| CSV.foreach(file[:filename][0...-3]) { |row| yield(row) } }
end
end
def download_inventory_file_to_tmp_directory(file)
return if File.exist?(file[:filename])
log "Downloading inventory file '#{file[:key]}' to tmp directory..."
failure_message = "Failed to inventory file '#{file[:key]}' to tmp directory."
@s3_helper.download_file(file[:key], file[:filename], failure_message)
end
def decompress_inventory_file(file)
log "Decompressing inventory file '#{file[:filename]}', this may take a while..."
Discourse::Utils.execute_command(
"gzip",
"--decompress",
file[:filename],
failure_message: "Failed to decompress inventory file '#{file[:filename]}'.",
chdir: tmp_directory,
)
end
def prepare_for_all_sites
db_names = RailsMultisite::ConnectionManagement.all_dbs
db_files = {}
db_names.each { |db| db_files[db] = Tempfile.new("#{db}-inventory.csv") }
download_and_decompress_files
for_each_inventory_row do |row|
key = row[CSV_KEY_INDEX]
row_db = key.match(%r{uploads/([^/]+)/})&.[](1)
if row_db && file = db_files[row_db]
file.write(row.to_csv)
end
end
db_names.each { |db| db_files[db].rewind }
db_files
ensure
cleanup!
end
def s3_client
@s3_helper.s3_client
end
private
def cleanup!
return if @preloaded_inventory_file
files.each do |file|
File.delete(file[:filename]) if File.exist?(file[:filename])
File.delete(file[:filename][0...-3]) if File.exist?(file[:filename][0...-3])
end
end
def connection
@connection ||= ActiveRecord::Base.connection.raw_connection
end
def tmp_table_name
"#{type}_inventory"
end
def files
return if @preloaded_inventory_file
@files ||=
begin
symlink_file = unsorted_files.sort_by { |file| -file.last_modified.to_i }.first
return [] if symlink_file.blank?
if BackupMetadata.last_restore_date.present? &&
(symlink_file.last_modified - WAIT_AFTER_RESTORE_DAYS.days) <
BackupMetadata.last_restore_date
set_missing_s3_discourse_stats(0)
return []
end
@inventory_date = symlink_file.last_modified - INVENTORY_LAG
log "Downloading symlink file to tmp directory..."
failure_message = "Failed to download symlink file to tmp directory."
filename = File.join(tmp_directory, File.basename(symlink_file.key))
@s3_helper.download_file(symlink_file.key, filename, failure_message)
return [] if !File.exist?(filename)
File
.readlines(filename)
.map do |key|
key = key.sub("s3://#{bucket_name}/", "").sub("\n", "")
{ key: key, filename: File.join(tmp_directory, File.basename(key)) }
end
end
end
def download_and_decompress_files
files.each do |file|
next if File.exist?(file[:filename][0...-3])
download_inventory_file_to_tmp_directory(file)
decompress_inventory_file(file)
end
end
def tmp_directory
@tmp_directory ||=
begin
current_db = RailsMultisite::ConnectionManagement.current_db
directory = File.join(Rails.root, "tmp", INVENTORY_PREFIX, current_db)
FileUtils.mkdir_p(directory)
directory
end
end
def bucket_name
@s3_helper.s3_bucket_name
end
def bucket_folder_path
@s3_helper.s3_bucket_folder_path
end
def unsorted_files
objects = []
hive_path = File.join(bucket_folder_path, "hive")
@s3_helper.list(hive_path).each { |obj| objects << obj if obj.key.match?(/symlink\.txt\z/i) }
objects
rescue Aws::Errors::ServiceError => e
log("Failed to list inventory from S3", e)
[]
end
def log(message, ex = nil)
puts(message)
Rails.logger.error("#{ex}\n" + (ex.backtrace || []).join("\n")) if ex
end
def error(message)
log(message, StandardError.new(message))
end
def set_missing_s3_discourse_stats(count)
Discourse.stats.set("missing_s3_#{@model.table_name}", count)
end
end