From 16c65a94f726a8f4cb330c8742eff887ffaaed6c Mon Sep 17 00:00:00 2001 From: David Taylor Date: Wed, 29 Jul 2020 01:49:45 +0100 Subject: [PATCH] PERF: Preload S3 inventory data for multisite clusters --- .../scheduled/ensure_s3_uploads_existence.rb | 31 +++++++- lib/s3_inventory.rb | 76 +++++++++++++++---- spec/components/s3_inventory_spec.rb | 46 ++++++++++- spec/fixtures/csv/s3_inventory.csv | 7 +- 4 files changed, 139 insertions(+), 21 deletions(-) diff --git a/app/jobs/scheduled/ensure_s3_uploads_existence.rb b/app/jobs/scheduled/ensure_s3_uploads_existence.rb index ccde386f3e6..6e2d671c117 100644 --- a/app/jobs/scheduled/ensure_s3_uploads_existence.rb +++ b/app/jobs/scheduled/ensure_s3_uploads_existence.rb @@ -5,9 +5,38 @@ module Jobs class EnsureS3UploadsExistence < ::Jobs::Scheduled every 1.day + def perform(*args) + super + ensure + if @db_inventories + @db_inventories.values.each { |f| f.close; f.unlink } + end + end + + def prepare_for_all_sites + inventory = S3Inventory.new(s3_helper, :upload) + @db_inventories = inventory.prepare_for_all_sites + @inventory_date = inventory.inventory_date + end + def execute(args) return unless SiteSetting.enable_s3_inventory - Discourse.store.list_missing_uploads(skip_optimized: true) + require 's3_inventory' + + if !@db_inventories && Rails.configuration.multisite && GlobalSetting.use_s3? + prepare_for_all_sites + end + + if @db_inventories && preloaded_inventory_file = @db_inventories[RailsMultisite::ConnectionManagement.current_db] + S3Inventory.new( + s3_helper, + :upload, + preloaded_inventory_file: preloaded_inventory_file, + preloaded_inventory_date: @inventory_date + ).backfill_etags_and_list_missing + else + S3Inventory.new(s3_helper, :upload).backfill_etags_and_list_missing + end end end end diff --git a/lib/s3_inventory.rb b/lib/s3_inventory.rb index c448d81768c..1869f0e8e88 100644 --- a/lib/s3_inventory.rb +++ b/lib/s3_inventory.rb @@ -12,9 +12,15 @@ class S3Inventory INVENTORY_PREFIX ||= "inventory" INVENTORY_VERSION ||= "1" - def initialize(s3_helper, type) + def initialize(s3_helper, type, preloaded_inventory_file: nil, preloaded_inventory_date: nil) @s3_helper = s3_helper + if preloaded_inventory_file && preloaded_inventory_date + # Data preloaded, so we don't need to fetch it again + @preloaded_inventory_file = preloaded_inventory_file + @inventory_date = preloaded_inventory_date + end + if type == :upload @type = "original" @model = Upload @@ -25,32 +31,25 @@ class S3Inventory end def backfill_etags_and_list_missing - if files.blank? + if !@preloaded_inventory_file && files.blank? error("Failed to list inventory from S3") return end DistributedMutex.synchronize("s3_inventory_list_missing_#{type}", validity: 30.minutes) do begin - files.each do |file| - next if File.exists?(file[:filename][0...-3]) - - download_inventory_file_to_tmp_directory(file) - decompress_inventory_file(file) - end + download_and_decompress_files if !@preloaded_inventory_file multisite_prefix = Discourse.store.upload_path ActiveRecord::Base.transaction do begin connection.exec("CREATE TEMP TABLE #{table_name}(url text UNIQUE, etag text, PRIMARY KEY(etag, url))") connection.copy_data("COPY #{table_name} FROM STDIN CSV") do - files.each do |file| - CSV.foreach(file[:filename][0...-3], headers: false) do |row| - key = row[CSV_KEY_INDEX] - next if Rails.configuration.multisite && key.exclude?(multisite_prefix) - url = File.join(Discourse.store.absolute_base_url, key) - connection.put_copy_data("#{url},#{row[CSV_ETAG_INDEX]}\n") - end + for_each_inventory_row do |row| + key = row[CSV_KEY_INDEX] + next if Rails.configuration.multisite && key.exclude?(multisite_prefix) + url = File.join(Discourse.store.absolute_base_url, key) + connection.put_copy_data("#{url},#{row[CSV_ETAG_INDEX]}\n") end end @@ -87,6 +86,16 @@ class S3Inventory end end + def for_each_inventory_row + if @preloaded_inventory_file + CSV.foreach(@preloaded_inventory_file) { |row| yield(row) } + else + files.each do |file| + CSV.foreach(file[:filename][0...-3]) { |row| yield(row) } + end + end + end + def download_inventory_file_to_tmp_directory(file) return if File.exists?(file[:filename]) @@ -136,9 +145,36 @@ class S3Inventory ) end + def prepare_for_all_sites + db_names = RailsMultisite::ConnectionManagement.all_dbs + db_files = {} + + db_names.each do |db| + db_files[db] = Tempfile.new("#{db}-inventory.csv") + end + + download_and_decompress_files + for_each_inventory_row do |row| + key = row[CSV_KEY_INDEX] + row_db = key.match(/uploads\/([^\/]+)\//)&.[](1) + if row_db && file = db_files[row_db] + file.write(row.to_csv) + end + end + + db_names.each do |db| + db_files[db].rewind + end + + db_files + ensure + cleanup! + end + private def cleanup! + return if @preloaded_inventory_file files.each do |file| File.delete(file[:filename]) if File.exists?(file[:filename]) File.delete(file[:filename][0...-3]) if File.exists?(file[:filename][0...-3]) @@ -154,6 +190,7 @@ class S3Inventory end def files + return if @preloaded_inventory_file @files ||= begin symlink_file = unsorted_files.sort_by { |file| -file.last_modified.to_i }.first return [] if symlink_file.blank? @@ -171,6 +208,15 @@ class S3Inventory end end + def download_and_decompress_files + files.each do |file| + next if File.exists?(file[:filename][0...-3]) + + download_inventory_file_to_tmp_directory(file) + decompress_inventory_file(file) + end + end + def tmp_directory @tmp_directory ||= begin current_db = RailsMultisite::ConnectionManagement.current_db diff --git a/spec/components/s3_inventory_spec.rb b/spec/components/s3_inventory_spec.rb index 4e70329741b..eb1f2c9cfe7 100644 --- a/spec/components/s3_inventory_spec.rb +++ b/spec/components/s3_inventory_spec.rb @@ -62,6 +62,7 @@ describe "S3Inventory" do freeze_time CSV.foreach(csv_filename, headers: false) do |row| + next unless row[S3Inventory::CSV_KEY_INDEX].include?("default") Fabricate(:upload, etag: row[S3Inventory::CSV_ETAG_INDEX], updated_at: 2.days.ago) end @@ -82,8 +83,8 @@ describe "S3Inventory" do it "should backfill etags to uploads table correctly" do files = [ - ["#{Discourse.store.absolute_base_url}/original/1X/0184537a4f419224404d013414e913a4f56018f2.jpg", "defcaac0b4aca535c284e95f30d608d0"], - ["#{Discourse.store.absolute_base_url}/original/1X/0789fbf5490babc68326b9cec90eeb0d6590db05.png", "25c02eaceef4cb779fc17030d33f7f06"] + ["#{Discourse.store.absolute_base_url}/uploads/default/original/1X/0184537a4f419224404d013414e913a4f56018f2.jpg", "defcaac0b4aca535c284e95f30d608d0"], + ["#{Discourse.store.absolute_base_url}/uploads/default/original/1X/0789fbf5490babc68326b9cec90eeb0d6590db05.png", "25c02eaceef4cb779fc17030d33f7f06"] ] files.each { |file| Fabricate(:upload, url: file[0]) } @@ -95,4 +96,45 @@ describe "S3Inventory" do expect(Upload.by_users.order(:url).pluck(:url, :etag)).to eq(files) end + + it "should work when passed preloaded data" do + freeze_time + + CSV.foreach(csv_filename, headers: false) do |row| + next unless row[S3Inventory::CSV_KEY_INDEX].include?("default") + Fabricate(:upload, etag: row[S3Inventory::CSV_ETAG_INDEX], updated_at: 2.days.ago) + end + + upload = Fabricate(:upload, etag: "ETag", updated_at: 1.days.ago) + Fabricate(:upload, etag: "ETag2", updated_at: Time.now) + no_etag = Fabricate(:upload, updated_at: 2.days.ago) + + output = capture_stdout do + File.open(csv_filename) do |f| + preloaded_inventory = S3Inventory.new( + helper, + :upload, + preloaded_inventory_file: f, + preloaded_inventory_date: Time.now + ) + preloaded_inventory.backfill_etags_and_list_missing + end + end + + expect(output).to eq("#{upload.url}\n#{no_etag.url}\n2 of 5 uploads are missing\n") + expect(Discourse.stats.get("missing_s3_uploads")).to eq(2) + end + + it "can create per-site files", type: :multisite do + freeze_time + + inventory.stubs(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]) + + files = inventory.prepare_for_all_sites + db1 = files["default"].read + db2 = files["second"].read + expect(db1.lines.count).to eq(3) + expect(db2.lines.count).to eq(1) + files.values.each { |f| f.close; f.unlink } + end end diff --git a/spec/fixtures/csv/s3_inventory.csv b/spec/fixtures/csv/s3_inventory.csv index a45103c2a03..eb461b3ecc7 100644 --- a/spec/fixtures/csv/s3_inventory.csv +++ b/spec/fixtures/csv/s3_inventory.csv @@ -1,3 +1,4 @@ -"abc","original/1X/0184537a4f419224404d013414e913a4f56018f2.jpg","defcaac0b4aca535c284e95f30d608d0" -"abc","original/1X/050afc0ab01debe8cf48fd2ce50fbbf5eb072815.jpg","0cdc623af39cde0adb382670a6dc702a" -"abc","original/1X/0789fbf5490babc68326b9cec90eeb0d6590db05.png","25c02eaceef4cb779fc17030d33f7f06" +"abc","uploads/default/original/1X/0184537a4f419224404d013414e913a4f56018f2.jpg","defcaac0b4aca535c284e95f30d608d0" +"abc","uploads/default/original/1X/050afc0ab01debe8cf48fd2ce50fbbf5eb072815.jpg","0cdc623af39cde0adb382670a6dc702a" +"abc","uploads/default/original/1X/0789fbf5490babc68326b9cec90eeb0d6590db05.png","25c02eaceef4cb779fc17030d33f7f06" +"abc","uploads/second/original/1X/f789fbf5490babc68326b9cec90eeb0d6590db03.png","15c02eaceef4cb779fc17030d33f7f04"