mirror of
https://github.com/discourse/discourse.git
synced 2025-01-26 15:38:29 +08:00
8cf4ed5f88
This commit introduces a hidden `s3_inventory_bucket` site setting which replaces the `enable_s3_inventory` and `s3_configure_inventory_policy` site setting. The reason `enable_s3_inventory` and `s3_configure_inventory_policy` site settings are removed is because this feature has technically been broken since it was introduced. When the `enable_s3_inventory` feature is turned on, the app will because configure a daily inventory policy for the `s3_upload_bucket` bucket and store the inventories under a prefix in the bucket. The problem here is that once the inventories are created, there is nothing cleaning up all these inventories so whoever that has enabled this feature would have been paying the cost of storing a whole bunch of inventory files which are never used. Given that we have not received any complains about inventory files inflating S3 storage costs, we think that it is very likely that this feature is no longer being used and we are looking to drop support for this feature in the not too distance future. For now, we will still support a hidden `s3_inventory_bucket` site setting which site administrators can configure via the `DISCOURSE_S3_INVENTORY_BUCKET` env.
320 lines
9.6 KiB
Ruby
320 lines
9.6 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "aws-sdk-s3"
|
|
require "csv"
|
|
|
|
class S3Inventory
|
|
attr_reader :type, :inventory_date, :s3_helper
|
|
|
|
CSV_KEY_INDEX = 1
|
|
CSV_ETAG_INDEX = 2
|
|
INVENTORY_PREFIX = "inventory"
|
|
INVENTORY_LAG = 2.days
|
|
WAIT_AFTER_RESTORE_DAYS = 2
|
|
|
|
def initialize(
|
|
type,
|
|
s3_inventory_bucket:,
|
|
preloaded_inventory_file: nil,
|
|
preloaded_inventory_date: nil
|
|
)
|
|
@s3_helper = S3Helper.new(s3_inventory_bucket)
|
|
|
|
if preloaded_inventory_file && preloaded_inventory_date
|
|
# Data preloaded, so we don't need to fetch it again
|
|
@preloaded_inventory_file = preloaded_inventory_file
|
|
@inventory_date = preloaded_inventory_date
|
|
end
|
|
|
|
if type == :upload
|
|
@type = "original"
|
|
@model = Upload
|
|
@scope = @model.by_users.without_s3_file_missing_confirmed_verification_status
|
|
elsif type == :optimized
|
|
@type = "optimized"
|
|
@scope = @model = OptimizedImage
|
|
else
|
|
raise "Invalid type: #{type}"
|
|
end
|
|
end
|
|
|
|
def backfill_etags_and_list_missing
|
|
if !@preloaded_inventory_file && files.blank?
|
|
error("Failed to list inventory from S3")
|
|
return
|
|
end
|
|
|
|
DistributedMutex.synchronize("s3_inventory_list_missing_#{type}", validity: 30.minutes) do
|
|
begin
|
|
download_and_decompress_files if !@preloaded_inventory_file
|
|
|
|
multisite_prefix = Discourse.store.upload_path
|
|
|
|
ActiveRecord::Base.transaction do
|
|
begin
|
|
connection.exec(
|
|
"CREATE TEMP TABLE #{tmp_table_name}(url text UNIQUE, etag text, PRIMARY KEY(etag, url))",
|
|
)
|
|
|
|
connection.copy_data("COPY #{tmp_table_name} FROM STDIN CSV") do
|
|
for_each_inventory_row do |row|
|
|
key = row[CSV_KEY_INDEX]
|
|
|
|
next if Rails.configuration.multisite && key.exclude?(multisite_prefix)
|
|
next if key.exclude?("#{type}/")
|
|
|
|
url = File.join(Discourse.store.absolute_base_url, key)
|
|
connection.put_copy_data("#{url},#{row[CSV_ETAG_INDEX]}\n")
|
|
end
|
|
end
|
|
|
|
table_name = @model.table_name
|
|
|
|
# backfilling etags
|
|
connection.async_exec(
|
|
"UPDATE #{table_name}
|
|
SET etag = #{tmp_table_name}.etag
|
|
FROM #{tmp_table_name}
|
|
WHERE #{table_name}.etag IS NULL AND
|
|
#{table_name}.url = #{tmp_table_name}.url",
|
|
)
|
|
|
|
uploads = @scope.where("updated_at < ?", inventory_date)
|
|
|
|
missing_uploads =
|
|
uploads.joins(
|
|
"LEFT JOIN #{tmp_table_name} ON #{tmp_table_name}.etag = #{table_name}.etag",
|
|
).where("#{tmp_table_name}.etag IS NULL")
|
|
|
|
exists_with_different_etag =
|
|
missing_uploads
|
|
.joins(
|
|
"LEFT JOIN #{tmp_table_name} inventory2 ON inventory2.url = #{table_name}.url",
|
|
)
|
|
.where("inventory2.etag IS NOT NULL")
|
|
.pluck(:id)
|
|
|
|
# marking as verified/not verified
|
|
if @model == Upload
|
|
sql_params = {
|
|
inventory_date: inventory_date,
|
|
invalid_etag: Upload.verification_statuses[:invalid_etag],
|
|
s3_file_missing_confirmed: Upload.verification_statuses[:s3_file_missing_confirmed],
|
|
verified: Upload.verification_statuses[:verified],
|
|
seeded_id_threshold: @model::SEEDED_ID_THRESHOLD,
|
|
}
|
|
|
|
DB.exec(<<~SQL, sql_params)
|
|
UPDATE #{table_name}
|
|
SET verification_status = :verified
|
|
WHERE etag IS NOT NULL
|
|
AND verification_status <> :verified
|
|
AND verification_status <> :s3_file_missing_confirmed
|
|
AND updated_at < :inventory_date
|
|
AND id > :seeded_id_threshold
|
|
AND EXISTS
|
|
(
|
|
SELECT 1
|
|
FROM #{tmp_table_name}
|
|
WHERE #{tmp_table_name}.etag = #{table_name}.etag
|
|
)
|
|
SQL
|
|
|
|
DB.exec(<<~SQL, sql_params)
|
|
UPDATE #{table_name}
|
|
SET verification_status = :invalid_etag
|
|
WHERE verification_status <> :invalid_etag
|
|
AND verification_status <> :s3_file_missing_confirmed
|
|
AND updated_at < :inventory_date
|
|
AND id > :seeded_id_threshold
|
|
AND NOT EXISTS
|
|
(
|
|
SELECT 1
|
|
FROM #{tmp_table_name}
|
|
WHERE #{tmp_table_name}.etag = #{table_name}.etag
|
|
)
|
|
SQL
|
|
end
|
|
|
|
if (missing_count = missing_uploads.count) > 0
|
|
missing_uploads
|
|
.select(:id, :url)
|
|
.find_each do |upload|
|
|
if exists_with_different_etag.include?(upload.id)
|
|
log "#{upload.url} has different etag"
|
|
else
|
|
log upload.url
|
|
end
|
|
end
|
|
|
|
log "#{missing_count} of #{uploads.count} #{@scope.name.underscore.pluralize} are missing"
|
|
if exists_with_different_etag.present?
|
|
log "#{exists_with_different_etag.count} of these are caused by differing etags"
|
|
log "Null the etag column and re-run for automatic backfill"
|
|
end
|
|
end
|
|
|
|
Discourse.stats.set("missing_s3_#{table_name}", missing_count)
|
|
ensure
|
|
connection.exec("DROP TABLE #{tmp_table_name}") unless connection.nil?
|
|
end
|
|
end
|
|
ensure
|
|
cleanup!
|
|
end
|
|
end
|
|
end
|
|
|
|
def for_each_inventory_row
|
|
if @preloaded_inventory_file
|
|
CSV.foreach(@preloaded_inventory_file) { |row| yield(row) }
|
|
else
|
|
files.each { |file| CSV.foreach(file[:filename][0...-3]) { |row| yield(row) } }
|
|
end
|
|
end
|
|
|
|
def download_inventory_file_to_tmp_directory(file)
|
|
return if File.exist?(file[:filename])
|
|
|
|
log "Downloading inventory file '#{file[:key]}' to tmp directory..."
|
|
failure_message = "Failed to inventory file '#{file[:key]}' to tmp directory."
|
|
|
|
@s3_helper.download_file(file[:key], file[:filename], failure_message)
|
|
end
|
|
|
|
def decompress_inventory_file(file)
|
|
log "Decompressing inventory file '#{file[:filename]}', this may take a while..."
|
|
Discourse::Utils.execute_command(
|
|
"gzip",
|
|
"--decompress",
|
|
file[:filename],
|
|
failure_message: "Failed to decompress inventory file '#{file[:filename]}'.",
|
|
chdir: tmp_directory,
|
|
)
|
|
end
|
|
|
|
def prepare_for_all_sites
|
|
db_names = RailsMultisite::ConnectionManagement.all_dbs
|
|
db_files = {}
|
|
|
|
db_names.each { |db| db_files[db] = Tempfile.new("#{db}-inventory.csv") }
|
|
|
|
download_and_decompress_files
|
|
for_each_inventory_row do |row|
|
|
key = row[CSV_KEY_INDEX]
|
|
row_db = key.match(%r{uploads/([^/]+)/})&.[](1)
|
|
if row_db && file = db_files[row_db]
|
|
file.write(row.to_csv)
|
|
end
|
|
end
|
|
|
|
db_names.each { |db| db_files[db].rewind }
|
|
|
|
db_files
|
|
ensure
|
|
cleanup!
|
|
end
|
|
|
|
def s3_client
|
|
@s3_helper.s3_client
|
|
end
|
|
|
|
private
|
|
|
|
def cleanup!
|
|
return if @preloaded_inventory_file
|
|
files.each do |file|
|
|
File.delete(file[:filename]) if File.exist?(file[:filename])
|
|
File.delete(file[:filename][0...-3]) if File.exist?(file[:filename][0...-3])
|
|
end
|
|
end
|
|
|
|
def connection
|
|
@connection ||= ActiveRecord::Base.connection.raw_connection
|
|
end
|
|
|
|
def tmp_table_name
|
|
"#{type}_inventory"
|
|
end
|
|
|
|
def files
|
|
return if @preloaded_inventory_file
|
|
|
|
@files ||=
|
|
begin
|
|
symlink_file = unsorted_files.sort_by { |file| -file.last_modified.to_i }.first
|
|
|
|
return [] if symlink_file.blank?
|
|
|
|
if BackupMetadata.last_restore_date.present? &&
|
|
(symlink_file.last_modified - WAIT_AFTER_RESTORE_DAYS.days) <
|
|
BackupMetadata.last_restore_date
|
|
return []
|
|
end
|
|
|
|
@inventory_date = symlink_file.last_modified - INVENTORY_LAG
|
|
log "Downloading symlink file to tmp directory..."
|
|
failure_message = "Failed to download symlink file to tmp directory."
|
|
filename = File.join(tmp_directory, File.basename(symlink_file.key))
|
|
|
|
@s3_helper.download_file(symlink_file.key, filename, failure_message)
|
|
|
|
return [] if !File.exist?(filename)
|
|
|
|
File
|
|
.readlines(filename)
|
|
.map do |key|
|
|
key = key.sub("s3://#{bucket_name}/", "").sub("\n", "")
|
|
{ key: key, filename: File.join(tmp_directory, File.basename(key)) }
|
|
end
|
|
end
|
|
end
|
|
|
|
def download_and_decompress_files
|
|
files.each do |file|
|
|
next if File.exist?(file[:filename][0...-3])
|
|
|
|
download_inventory_file_to_tmp_directory(file)
|
|
decompress_inventory_file(file)
|
|
end
|
|
end
|
|
|
|
def tmp_directory
|
|
@tmp_directory ||=
|
|
begin
|
|
current_db = RailsMultisite::ConnectionManagement.current_db
|
|
directory = File.join(Rails.root, "tmp", INVENTORY_PREFIX, current_db)
|
|
FileUtils.mkdir_p(directory)
|
|
directory
|
|
end
|
|
end
|
|
|
|
def bucket_name
|
|
@s3_helper.s3_bucket_name
|
|
end
|
|
|
|
def bucket_folder_path
|
|
@s3_helper.s3_bucket_folder_path
|
|
end
|
|
|
|
def unsorted_files
|
|
objects = []
|
|
hive_path = File.join(bucket_folder_path, "hive")
|
|
@s3_helper.list(hive_path).each { |obj| objects << obj if obj.key.match?(/symlink\.txt\z/i) }
|
|
|
|
objects
|
|
rescue Aws::Errors::ServiceError => e
|
|
log("Failed to list inventory from S3", e)
|
|
[]
|
|
end
|
|
|
|
def log(message, ex = nil)
|
|
puts(message)
|
|
Rails.logger.error("#{ex}\n" + (ex.backtrace || []).join("\n")) if ex
|
|
end
|
|
|
|
def error(message)
|
|
log(message, StandardError.new(message))
|
|
end
|
|
end
|