FIX: S3 inventory data can be splitted into multiple csv files

This commit is contained in:
Vinoth Kannan 2019-02-14 03:41:52 +05:30
parent 0476169692
commit 1045bbc35b
2 changed files with 57 additions and 43 deletions

View File

@ -5,7 +5,7 @@ require "csv"
class S3Inventory class S3Inventory
attr_reader :inventory_id, :csv_filename, :model attr_reader :inventory_id, :model, :last_modified
CSV_KEY_INDEX ||= 1 CSV_KEY_INDEX ||= 1
CSV_ETAG_INDEX ||= 2 CSV_ETAG_INDEX ||= 2
@ -24,38 +24,29 @@ class S3Inventory
end end
end end
def file
@file ||= unsorted_files.sort_by { |file| -file.last_modified.to_i }.first
end
def list_missing def list_missing
if file.blank? if files.blank?
error("Failed to list inventory from S3") error("Failed to list inventory from S3")
return return
end end
DistributedMutex.synchronize("s3_inventory_list_missing_#{inventory_id}") do DistributedMutex.synchronize("s3_inventory_list_missing_#{inventory_id}") do
current_db = RailsMultisite::ConnectionManagement.current_db download_inventory_files_to_tmp_directory
timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S") decompress_inventory_files
@tmp_directory = File.join(Rails.root, "tmp", INVENTORY_PREFIX, current_db, timestamp)
@archive_filename = File.join(@tmp_directory, File.basename(file.key))
@csv_filename = @archive_filename[0...-3]
FileUtils.mkdir_p(@tmp_directory)
download_inventory_file_to_tmp_directory
decompress_inventory_file
begin begin
table_name = "#{inventory_id}_inventory" table_name = "#{inventory_id}_inventory"
connection = ActiveRecord::Base.connection.raw_connection connection = ActiveRecord::Base.connection.raw_connection
connection.exec("CREATE TEMP TABLE #{table_name}(key text UNIQUE, etag text PRIMARY KEY)") connection.exec("CREATE TEMP TABLE #{table_name}(key text UNIQUE, etag text PRIMARY KEY)")
connection.copy_data("COPY #{table_name} FROM STDIN CSV") do connection.copy_data("COPY #{table_name} FROM STDIN CSV") do
CSV.foreach(csv_filename, headers: false) do |row| files.each do |file|
connection.put_copy_data("#{row[CSV_KEY_INDEX]},#{row[CSV_ETAG_INDEX]}\n") CSV.foreach(file[:filename][0...-3], headers: false) do |row|
connection.put_copy_data("#{row[CSV_KEY_INDEX]},#{row[CSV_ETAG_INDEX]}\n")
end
end end
end end
uploads = model.where("created_at < ?", file.last_modified) uploads = model.where("created_at < ?", last_modified)
missing_uploads = uploads.joins("LEFT JOIN #{table_name} ON #{table_name}.etag = #{model.table_name}.etag").where("#{table_name}.etag is NULL") missing_uploads = uploads.joins("LEFT JOIN #{table_name} ON #{table_name}.etag = #{model.table_name}.etag").where("#{table_name}.etag is NULL")
if (missing_count = missing_uploads.count) > 0 if (missing_count = missing_uploads.count) > 0
@ -71,18 +62,21 @@ class S3Inventory
end end
end end
def download_inventory_file_to_tmp_directory def download_inventory_files_to_tmp_directory
log "Downloading inventory file to tmp directory..." files.each do |file|
failure_message = "Failed to inventory file to tmp directory." log "Downloading inventory file '#{file[:key]}' to tmp directory..."
failure_message = "Failed to inventory file '#{file[:key]}' to tmp directory."
@s3_helper.download_file(file.key, @archive_filename, failure_message) @s3_helper.download_file(file[:key], file[:filename], failure_message)
end
end end
def decompress_inventory_file def decompress_inventory_files
log "Decompressing inventory file, this may take a while..." FileUtils.cd(tmp_directory) do
files.each do |file|
FileUtils.cd(@tmp_directory) do log "Decompressing inventory file '#{file[:filename]}', this may take a while..."
Discourse::Utils.execute_command('gzip', '--decompress', @archive_filename, failure_message: "Failed to decompress inventory file.") Discourse::Utils.execute_command('gzip', '--decompress', file[:filename], failure_message: "Failed to decompress inventory file '#{file[:filename]}'.")
end
end end
end end
@ -123,6 +117,34 @@ class S3Inventory
private private
def files
@files ||= begin
symlink_file = unsorted_files.sort_by { |file| -file.last_modified.to_i }.first
return [] if symlink_file.blank?
@last_modified = symlink_file.last_modified
log "Downloading symlink file to tmp directory..."
failure_message = "Failed to download symlink file to tmp directory."
filename = File.join(tmp_directory, File.basename(symlink_file.key))
@s3_helper.download_file(symlink_file.key, filename, failure_message)
File.readlines(filename).map do |key|
key = key.sub("s3://#{bucket_name}/").sub("\n", "")
{ key: key, filename: File.join(tmp_directory, File.basename(key)) }
end
end
end
def tmp_directory
@tmp_directory ||= begin
current_db = RailsMultisite::ConnectionManagement.current_db
timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S")
directory = File.join(Rails.root, "tmp", INVENTORY_PREFIX, current_db, timestamp)
FileUtils.mkdir_p(directory)
directory
end
end
def inventory_configuration def inventory_configuration
filter_prefix = inventory_id filter_prefix = inventory_id
filter_prefix = File.join(bucket_folder_path, filter_prefix) if bucket_folder_path.present? filter_prefix = File.join(bucket_folder_path, filter_prefix) if bucket_folder_path.present?
@ -159,8 +181,9 @@ class S3Inventory
def unsorted_files def unsorted_files
objects = [] objects = []
@s3_helper.list(inventory_data_path).each do |obj| hive_path = File.join(inventory_path, bucket_name, inventory_id, "hive")
if obj.key.match?(/\.csv\.gz$/i) @s3_helper.list(hive_path).each do |obj|
if obj.key.match?(/symlink\.txt$/i)
objects << obj objects << obj
end end
end end
@ -170,10 +193,6 @@ class S3Inventory
log("Failed to list inventory from S3", e) log("Failed to list inventory from S3", e)
end end
def inventory_data_path
File.join(inventory_path, bucket_name, inventory_id, "data")
end
def inventory_path_arn def inventory_path_arn
File.join(bucket_arn, inventory_path) File.join(bucket_arn, inventory_path)
end end

View File

@ -17,8 +17,7 @@ describe "S3Inventory" do
SiteSetting.enable_s3_inventory = true SiteSetting.enable_s3_inventory = true
client.stub_responses(:list_objects, -> (context) { client.stub_responses(:list_objects, -> (context) {
inventory_data_path = "#{S3Inventory::INVENTORY_PREFIX}/#{S3Inventory::INVENTORY_VERSION}/bucket/original/data" expect(context.params[:prefix]).to eq("#{S3Inventory::INVENTORY_PREFIX}/#{S3Inventory::INVENTORY_VERSION}/bucket/original/hive")
expect(context.params[:prefix]).to eq(inventory_data_path)
{ {
contents: [ contents: [
@ -50,10 +49,6 @@ describe "S3Inventory" do
}) })
end end
it "should return the latest inventory file name" do
expect(inventory.file.key).to eq("example1.csv.gz")
end
it "should raise error if an inventory file is not found" do it "should raise error if an inventory file is not found" do
client.stub_responses(:list_objects, contents: []) client.stub_responses(:list_objects, contents: [])
output = capture_stdout { inventory.list_missing } output = capture_stdout { inventory.list_missing }
@ -69,14 +64,14 @@ describe "S3Inventory" do
upload = Fabricate(:upload, etag: "ETag", created_at: 1.days.ago) upload = Fabricate(:upload, etag: "ETag", created_at: 1.days.ago)
Fabricate(:upload, etag: "ETag2", created_at: Time.now) Fabricate(:upload, etag: "ETag2", created_at: Time.now)
inventory.expects(:decompress_inventory_file) inventory.expects(:decompress_inventory_files)
inventory.expects(:csv_filename).returns(csv_filename) inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).at_least(1)
inventory.file.expects(:last_modified).returns(Time.now) inventory.expects(:last_modified).returns(Time.now)
output = capture_stdout do output = capture_stdout do
inventory.list_missing inventory.list_missing
end end
expect(output).to eq("Downloading inventory file to tmp directory...\n#{upload.url}\n1 of 4 uploads are missing\n") expect(output).to eq("Downloading inventory file 'Key' to tmp directory...\n#{upload.url}\n1 of 4 uploads are missing\n")
end end
end end