discourse/lib/s3_inventory.rb
Alan Guo Xiang Tan 8cf4ed5f88
DEV: Introduce hidden s3_inventory_bucket site setting (#27304)
This commit introduces a hidden `s3_inventory_bucket` site setting which
replaces the `enable_s3_inventory` and `s3_configure_inventory_policy`
site setting.

The reason `enable_s3_inventory` and `s3_configure_inventory_policy`
site settings are removed is because this feature has technically been
broken since it was introduced. When the `enable_s3_inventory` feature
is turned on, the app will because configure a daily inventory policy for the
`s3_upload_bucket` bucket and store the inventories under a prefix in
the bucket. The problem here is that once the inventories are created,
there is nothing cleaning up all these inventories so whoever that has
enabled this feature would have been paying the cost of storing a whole
bunch of inventory files which are never used. Given that we have not
received any complains about inventory files inflating S3 storage costs,
we think that it is very likely that this feature is no longer being
used and we are looking to drop support for this feature in the not too
distance future.

For now, we will still support a hidden `s3_inventory_bucket` site
setting which site administrators can configure via the
`DISCOURSE_S3_INVENTORY_BUCKET` env.
2024-06-10 13:16:00 +08:00

320 lines
9.6 KiB
Ruby

# frozen_string_literal: true
require "aws-sdk-s3"
require "csv"
class S3Inventory
attr_reader :type, :inventory_date, :s3_helper
CSV_KEY_INDEX = 1
CSV_ETAG_INDEX = 2
INVENTORY_PREFIX = "inventory"
INVENTORY_LAG = 2.days
WAIT_AFTER_RESTORE_DAYS = 2
def initialize(
type,
s3_inventory_bucket:,
preloaded_inventory_file: nil,
preloaded_inventory_date: nil
)
@s3_helper = S3Helper.new(s3_inventory_bucket)
if preloaded_inventory_file && preloaded_inventory_date
# Data preloaded, so we don't need to fetch it again
@preloaded_inventory_file = preloaded_inventory_file
@inventory_date = preloaded_inventory_date
end
if type == :upload
@type = "original"
@model = Upload
@scope = @model.by_users.without_s3_file_missing_confirmed_verification_status
elsif type == :optimized
@type = "optimized"
@scope = @model = OptimizedImage
else
raise "Invalid type: #{type}"
end
end
def backfill_etags_and_list_missing
if !@preloaded_inventory_file && files.blank?
error("Failed to list inventory from S3")
return
end
DistributedMutex.synchronize("s3_inventory_list_missing_#{type}", validity: 30.minutes) do
begin
download_and_decompress_files if !@preloaded_inventory_file
multisite_prefix = Discourse.store.upload_path
ActiveRecord::Base.transaction do
begin
connection.exec(
"CREATE TEMP TABLE #{tmp_table_name}(url text UNIQUE, etag text, PRIMARY KEY(etag, url))",
)
connection.copy_data("COPY #{tmp_table_name} FROM STDIN CSV") do
for_each_inventory_row do |row|
key = row[CSV_KEY_INDEX]
next if Rails.configuration.multisite && key.exclude?(multisite_prefix)
next if key.exclude?("#{type}/")
url = File.join(Discourse.store.absolute_base_url, key)
connection.put_copy_data("#{url},#{row[CSV_ETAG_INDEX]}\n")
end
end
table_name = @model.table_name
# backfilling etags
connection.async_exec(
"UPDATE #{table_name}
SET etag = #{tmp_table_name}.etag
FROM #{tmp_table_name}
WHERE #{table_name}.etag IS NULL AND
#{table_name}.url = #{tmp_table_name}.url",
)
uploads = @scope.where("updated_at < ?", inventory_date)
missing_uploads =
uploads.joins(
"LEFT JOIN #{tmp_table_name} ON #{tmp_table_name}.etag = #{table_name}.etag",
).where("#{tmp_table_name}.etag IS NULL")
exists_with_different_etag =
missing_uploads
.joins(
"LEFT JOIN #{tmp_table_name} inventory2 ON inventory2.url = #{table_name}.url",
)
.where("inventory2.etag IS NOT NULL")
.pluck(:id)
# marking as verified/not verified
if @model == Upload
sql_params = {
inventory_date: inventory_date,
invalid_etag: Upload.verification_statuses[:invalid_etag],
s3_file_missing_confirmed: Upload.verification_statuses[:s3_file_missing_confirmed],
verified: Upload.verification_statuses[:verified],
seeded_id_threshold: @model::SEEDED_ID_THRESHOLD,
}
DB.exec(<<~SQL, sql_params)
UPDATE #{table_name}
SET verification_status = :verified
WHERE etag IS NOT NULL
AND verification_status <> :verified
AND verification_status <> :s3_file_missing_confirmed
AND updated_at < :inventory_date
AND id > :seeded_id_threshold
AND EXISTS
(
SELECT 1
FROM #{tmp_table_name}
WHERE #{tmp_table_name}.etag = #{table_name}.etag
)
SQL
DB.exec(<<~SQL, sql_params)
UPDATE #{table_name}
SET verification_status = :invalid_etag
WHERE verification_status <> :invalid_etag
AND verification_status <> :s3_file_missing_confirmed
AND updated_at < :inventory_date
AND id > :seeded_id_threshold
AND NOT EXISTS
(
SELECT 1
FROM #{tmp_table_name}
WHERE #{tmp_table_name}.etag = #{table_name}.etag
)
SQL
end
if (missing_count = missing_uploads.count) > 0
missing_uploads
.select(:id, :url)
.find_each do |upload|
if exists_with_different_etag.include?(upload.id)
log "#{upload.url} has different etag"
else
log upload.url
end
end
log "#{missing_count} of #{uploads.count} #{@scope.name.underscore.pluralize} are missing"
if exists_with_different_etag.present?
log "#{exists_with_different_etag.count} of these are caused by differing etags"
log "Null the etag column and re-run for automatic backfill"
end
end
Discourse.stats.set("missing_s3_#{table_name}", missing_count)
ensure
connection.exec("DROP TABLE #{tmp_table_name}") unless connection.nil?
end
end
ensure
cleanup!
end
end
end
def for_each_inventory_row
if @preloaded_inventory_file
CSV.foreach(@preloaded_inventory_file) { |row| yield(row) }
else
files.each { |file| CSV.foreach(file[:filename][0...-3]) { |row| yield(row) } }
end
end
def download_inventory_file_to_tmp_directory(file)
return if File.exist?(file[:filename])
log "Downloading inventory file '#{file[:key]}' to tmp directory..."
failure_message = "Failed to inventory file '#{file[:key]}' to tmp directory."
@s3_helper.download_file(file[:key], file[:filename], failure_message)
end
def decompress_inventory_file(file)
log "Decompressing inventory file '#{file[:filename]}', this may take a while..."
Discourse::Utils.execute_command(
"gzip",
"--decompress",
file[:filename],
failure_message: "Failed to decompress inventory file '#{file[:filename]}'.",
chdir: tmp_directory,
)
end
def prepare_for_all_sites
db_names = RailsMultisite::ConnectionManagement.all_dbs
db_files = {}
db_names.each { |db| db_files[db] = Tempfile.new("#{db}-inventory.csv") }
download_and_decompress_files
for_each_inventory_row do |row|
key = row[CSV_KEY_INDEX]
row_db = key.match(%r{uploads/([^/]+)/})&.[](1)
if row_db && file = db_files[row_db]
file.write(row.to_csv)
end
end
db_names.each { |db| db_files[db].rewind }
db_files
ensure
cleanup!
end
def s3_client
@s3_helper.s3_client
end
private
def cleanup!
return if @preloaded_inventory_file
files.each do |file|
File.delete(file[:filename]) if File.exist?(file[:filename])
File.delete(file[:filename][0...-3]) if File.exist?(file[:filename][0...-3])
end
end
def connection
@connection ||= ActiveRecord::Base.connection.raw_connection
end
def tmp_table_name
"#{type}_inventory"
end
def files
return if @preloaded_inventory_file
@files ||=
begin
symlink_file = unsorted_files.sort_by { |file| -file.last_modified.to_i }.first
return [] if symlink_file.blank?
if BackupMetadata.last_restore_date.present? &&
(symlink_file.last_modified - WAIT_AFTER_RESTORE_DAYS.days) <
BackupMetadata.last_restore_date
return []
end
@inventory_date = symlink_file.last_modified - INVENTORY_LAG
log "Downloading symlink file to tmp directory..."
failure_message = "Failed to download symlink file to tmp directory."
filename = File.join(tmp_directory, File.basename(symlink_file.key))
@s3_helper.download_file(symlink_file.key, filename, failure_message)
return [] if !File.exist?(filename)
File
.readlines(filename)
.map do |key|
key = key.sub("s3://#{bucket_name}/", "").sub("\n", "")
{ key: key, filename: File.join(tmp_directory, File.basename(key)) }
end
end
end
def download_and_decompress_files
files.each do |file|
next if File.exist?(file[:filename][0...-3])
download_inventory_file_to_tmp_directory(file)
decompress_inventory_file(file)
end
end
def tmp_directory
@tmp_directory ||=
begin
current_db = RailsMultisite::ConnectionManagement.current_db
directory = File.join(Rails.root, "tmp", INVENTORY_PREFIX, current_db)
FileUtils.mkdir_p(directory)
directory
end
end
def bucket_name
@s3_helper.s3_bucket_name
end
def bucket_folder_path
@s3_helper.s3_bucket_folder_path
end
def unsorted_files
objects = []
hive_path = File.join(bucket_folder_path, "hive")
@s3_helper.list(hive_path).each { |obj| objects << obj if obj.key.match?(/symlink\.txt\z/i) }
objects
rescue Aws::Errors::ServiceError => e
log("Failed to list inventory from S3", e)
[]
end
def log(message, ex = nil)
puts(message)
Rails.logger.error("#{ex}\n" + (ex.backtrace || []).join("\n")) if ex
end
def error(message)
log(message, StandardError.new(message))
end
end