mirror of
https://github.com/discourse/discourse.git
synced 2025-02-21 04:10:22 +08:00

This reverts commit 97769f3d0226642905cf0605aeb0bc69d7295ca1. The code is confusing but this change is quite risky. Defer for now until we can look at it properly.
290 lines
8.4 KiB
Ruby
290 lines
8.4 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "aws-sdk-s3"
|
|
require "csv"
|
|
|
|
class S3Inventory
|
|
|
|
attr_reader :type, :model, :inventory_date
|
|
|
|
CSV_KEY_INDEX ||= 1
|
|
CSV_ETAG_INDEX ||= 2
|
|
INVENTORY_PREFIX ||= "inventory"
|
|
INVENTORY_VERSION ||= "1"
|
|
|
|
def initialize(s3_helper, type)
|
|
@s3_helper = s3_helper
|
|
|
|
if type == :upload
|
|
@type = "original"
|
|
@model = Upload
|
|
elsif type == :optimized
|
|
@type = "optimized"
|
|
@model = OptimizedImage
|
|
end
|
|
end
|
|
|
|
def backfill_etags_and_list_missing
|
|
if files.blank?
|
|
error("Failed to list inventory from S3")
|
|
return
|
|
end
|
|
|
|
DistributedMutex.synchronize("s3_inventory_list_missing_#{type}") do
|
|
download_inventory_files_to_tmp_directory
|
|
decompress_inventory_files
|
|
|
|
multisite_prefix = "uploads/#{RailsMultisite::ConnectionManagement.current_db}/"
|
|
ActiveRecord::Base.transaction do
|
|
begin
|
|
connection.exec("CREATE TEMP TABLE #{table_name}(url text UNIQUE, etag text, PRIMARY KEY(etag, url))")
|
|
connection.copy_data("COPY #{table_name} FROM STDIN CSV") do
|
|
files.each do |file|
|
|
CSV.foreach(file[:filename][0...-3], headers: false) do |row|
|
|
key = row[CSV_KEY_INDEX]
|
|
next if Rails.configuration.multisite && key.exclude?(multisite_prefix)
|
|
url = File.join(Discourse.store.absolute_base_url, key)
|
|
connection.put_copy_data("#{url},#{row[CSV_ETAG_INDEX]}\n")
|
|
end
|
|
end
|
|
end
|
|
|
|
# backfilling etags
|
|
connection.async_exec("UPDATE #{model.table_name}
|
|
SET etag = #{table_name}.etag
|
|
FROM #{table_name}
|
|
WHERE #{model.table_name}.etag IS NULL
|
|
AND #{model.table_name}.url = #{table_name}.url")
|
|
|
|
list_missing_post_uploads if type == "original"
|
|
|
|
uploads = (model == Upload) ? model.by_users.where("created_at < ?", inventory_date) : model
|
|
missing_uploads = uploads
|
|
.joins("LEFT JOIN #{table_name} ON #{table_name}.etag = #{model.table_name}.etag")
|
|
.where("#{table_name}.etag IS NULL AND #{model.table_name}.etag IS NOT NULL")
|
|
|
|
if (missing_count = missing_uploads.count) > 0
|
|
missing_uploads.select(:id, :url).find_each do |upload|
|
|
log upload.url
|
|
end
|
|
|
|
log "#{missing_count} of #{uploads.count} #{model.name.underscore.pluralize} are missing"
|
|
end
|
|
|
|
Discourse.stats.set("missing_s3_#{model.table_name}", missing_count)
|
|
ensure
|
|
connection.exec("DROP TABLE #{table_name}") unless connection.nil?
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
def list_missing_post_uploads
|
|
log "Listing missing post uploads..."
|
|
|
|
missing = Post.find_missing_uploads(include_local_upload: false) do |post, _, _, sha1|
|
|
next if sha1.blank?
|
|
|
|
upload_id = nil
|
|
result = connection.exec("SELECT * FROM #{table_name} WHERE url LIKE '%original/%/#{sha1}%'")
|
|
|
|
if result.count >= 1
|
|
begin
|
|
url = result[0]["url"]
|
|
key = url.sub(/^#{Discourse.store.absolute_base_url}\//, "")
|
|
data = @s3_helper.object(key).data
|
|
filename = (data.content_disposition&.match(/filename=\"(.*)\"/) || [])[1]
|
|
|
|
upload = Upload.new(
|
|
user_id: Discourse.system_user.id,
|
|
original_filename: filename || File.basename(key),
|
|
filesize: data.content_length,
|
|
url: url,
|
|
sha1: sha1,
|
|
etag: result[0]["etag"]
|
|
)
|
|
upload.save!(validate: false)
|
|
upload_id = upload.id
|
|
post.link_post_uploads
|
|
rescue Aws::S3::Errors::NotFound
|
|
next
|
|
end
|
|
end
|
|
|
|
upload_id
|
|
end
|
|
|
|
Discourse.stats.set("missing_post_uploads", missing[:count])
|
|
log "#{missing[:count]} post uploads are missing."
|
|
end
|
|
|
|
def download_inventory_files_to_tmp_directory
|
|
files.each do |file|
|
|
next if File.exists?(file[:filename])
|
|
|
|
log "Downloading inventory file '#{file[:key]}' to tmp directory..."
|
|
failure_message = "Failed to inventory file '#{file[:key]}' to tmp directory."
|
|
|
|
@s3_helper.download_file(file[:key], file[:filename], failure_message)
|
|
end
|
|
end
|
|
|
|
def decompress_inventory_files
|
|
files.each do |file|
|
|
log "Decompressing inventory file '#{file[:filename]}', this may take a while..."
|
|
Discourse::Utils.execute_command('gzip', '--decompress', file[:filename], failure_message: "Failed to decompress inventory file '#{file[:filename]}'.", chdir: tmp_directory)
|
|
end
|
|
end
|
|
|
|
def update_bucket_policy
|
|
@s3_helper.s3_client.put_bucket_policy(
|
|
bucket: bucket_name,
|
|
policy: {
|
|
"Version": "2012-10-17",
|
|
"Statement": [
|
|
{
|
|
"Sid": "InventoryAndAnalyticsPolicy",
|
|
"Effect": "Allow",
|
|
"Principal": { "Service": "s3.amazonaws.com" },
|
|
"Action": ["s3:PutObject"],
|
|
"Resource": ["#{inventory_path_arn}/*"],
|
|
"Condition": {
|
|
"ArnLike": {
|
|
"aws:SourceArn": bucket_arn
|
|
},
|
|
"StringEquals": {
|
|
"s3:x-amz-acl": "bucket-owner-full-control"
|
|
}
|
|
}
|
|
}
|
|
]
|
|
}.to_json
|
|
)
|
|
end
|
|
|
|
def update_bucket_inventory_configuration
|
|
@s3_helper.s3_client.put_bucket_inventory_configuration(
|
|
bucket: bucket_name,
|
|
id: inventory_id,
|
|
inventory_configuration: inventory_configuration,
|
|
use_accelerate_endpoint: false
|
|
)
|
|
end
|
|
|
|
private
|
|
|
|
def connection
|
|
@connection ||= ActiveRecord::Base.connection.raw_connection
|
|
end
|
|
|
|
def table_name
|
|
"#{type}_inventory"
|
|
end
|
|
|
|
def files
|
|
@files ||= begin
|
|
symlink_file = unsorted_files.sort_by { |file| -file.last_modified.to_i }.first
|
|
return [] if symlink_file.blank?
|
|
|
|
@inventory_date = symlink_file.last_modified - 1.day
|
|
log "Downloading symlink file to tmp directory..."
|
|
failure_message = "Failed to download symlink file to tmp directory."
|
|
filename = File.join(tmp_directory, File.basename(symlink_file.key))
|
|
|
|
@s3_helper.download_file(symlink_file.key, filename, failure_message)
|
|
File.readlines(filename).map do |key|
|
|
key = key.sub("s3://#{bucket_name}/", "").sub("\n", "")
|
|
{ key: key, filename: File.join(tmp_directory, File.basename(key)) }
|
|
end
|
|
end
|
|
end
|
|
|
|
def tmp_directory
|
|
@tmp_directory ||= begin
|
|
current_db = RailsMultisite::ConnectionManagement.current_db
|
|
timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S")
|
|
directory = File.join(Rails.root, "tmp", INVENTORY_PREFIX, current_db, timestamp)
|
|
FileUtils.mkdir_p(directory)
|
|
directory
|
|
end
|
|
end
|
|
|
|
def inventory_configuration
|
|
filter_prefix = type
|
|
filter_prefix = File.join(bucket_folder_path, filter_prefix) if bucket_folder_path.present?
|
|
|
|
{
|
|
destination: {
|
|
s3_bucket_destination: {
|
|
bucket: bucket_arn,
|
|
prefix: inventory_path,
|
|
format: "CSV"
|
|
}
|
|
},
|
|
filter: {
|
|
prefix: filter_prefix
|
|
},
|
|
is_enabled: SiteSetting.enable_s3_inventory,
|
|
id: inventory_id,
|
|
included_object_versions: "Current",
|
|
optional_fields: ["ETag"],
|
|
schedule: {
|
|
frequency: "Daily"
|
|
}
|
|
}
|
|
end
|
|
|
|
def bucket_name
|
|
@s3_helper.s3_bucket_name
|
|
end
|
|
|
|
def bucket_folder_path
|
|
@s3_helper.s3_bucket_folder_path
|
|
end
|
|
|
|
def unsorted_files
|
|
objects = []
|
|
|
|
hive_path = File.join(inventory_path, bucket_name, inventory_id, "hive")
|
|
@s3_helper.list(hive_path).each do |obj|
|
|
if obj.key.match?(/symlink\.txt$/i)
|
|
objects << obj
|
|
end
|
|
end
|
|
|
|
objects
|
|
rescue Aws::Errors::ServiceError => e
|
|
log("Failed to list inventory from S3", e)
|
|
end
|
|
|
|
def inventory_id
|
|
@inventory_id ||= begin
|
|
id = Rails.configuration.multisite ? "original" : type # TODO: rename multisite path to "uploads"
|
|
bucket_folder_path.present? ? "#{bucket_folder_path}-#{id}" : id
|
|
end
|
|
end
|
|
|
|
def inventory_path_arn
|
|
File.join(bucket_arn, inventory_path)
|
|
end
|
|
|
|
def inventory_path
|
|
path = File.join(INVENTORY_PREFIX, INVENTORY_VERSION)
|
|
path = File.join(bucket_folder_path, path) if bucket_folder_path.present?
|
|
path
|
|
end
|
|
|
|
def bucket_arn
|
|
"arn:aws:s3:::#{bucket_name}"
|
|
end
|
|
|
|
def log(message, ex = nil)
|
|
puts(message)
|
|
Rails.logger.error("#{ex}\n" + (ex.backtrace || []).join("\n")) if ex
|
|
end
|
|
|
|
def error(message)
|
|
log(message, StandardError.new(message))
|
|
end
|
|
end
|