From cce5fb3303b39efdb308195fe3a8c9a58e68ffeb Mon Sep 17 00:00:00 2001 From: Sam Date: Wed, 29 Jan 2014 16:49:01 +1100 Subject: [PATCH] BUGFIXES: improved performance of import an export we no longer generate the entire dump in memory, instead we generate one table at a time added some basic progress --- app/jobs/regular/exporter.rb | 29 ++++++--- app/jobs/regular/importer.rb | 15 ++++- lib/export/json_encoder.rb | 67 +++++++++++---------- lib/import/json_decoder.rb | 34 ++++++++--- spec/components/export/json_encoder_spec.rb | 37 ++++++------ spec/components/import/json_decoder_spec.rb | 62 +++++++++++-------- spec/jobs/exporter_spec.rb | 27 +++------ 7 files changed, 159 insertions(+), 112 deletions(-) diff --git a/app/jobs/regular/exporter.rb b/app/jobs/regular/exporter.rb index a230c4002e3..2e653df3554 100644 --- a/app/jobs/regular/exporter.rb +++ b/app/jobs/regular/exporter.rb @@ -25,15 +25,24 @@ module Jobs ordered_models_for_export.each do |model| log " #{model.table_name}" column_info = model.columns - order_col = column_info.map(&:name).find {|x| x == 'id'} || order_columns_for(model) + column_names = model.column_names + + results = model.connection.raw_connection.async_exec("select * from #{model.table_name}").to_enum + @encoder.write_table(model.table_name, column_info) do |num_rows_written| - if order_col - model.connection.select_rows("select * from #{model.table_name} order by #{order_col} limit #{batch_size} offset #{num_rows_written}") - else - # Take the rows in the order the database returns them - log "WARNING: no order by clause is being used for #{model.name} (#{model.table_name}). Please update Jobs::Exporter order_columns_for for #{model.name}." - model.connection.select_rows("select * from #{model.table_name} limit #{batch_size} offset #{num_rows_written}") + log("#{num_rows_written} rows written") if num_rows_written > 0 + + rows = [] + begin + while rows.count < batch_size + row = results.next + rows << column_names.map{|col| row[col]} + end + rescue StopIteration + # we are done end + + rows end end "#{@output_base_filename}.tar.gz" @@ -97,8 +106,10 @@ module Jobs `tar cvf #{tar_filename} #{upload_directory}` end - FileUtils.cd(File.dirname(filenames.first)) do - `tar --append --file=#{tar_filename} #{File.basename(filenames.first)}` + filenames.each do |filename| + FileUtils.cd(File.dirname(filename)) do + `tar --append --file=#{tar_filename} #{File.basename(filename)}` + end end diff --git a/app/jobs/regular/importer.rb b/app/jobs/regular/importer.rb index 4c105ba40e7..8b3aae78abf 100644 --- a/app/jobs/regular/importer.rb +++ b/app/jobs/regular/importer.rb @@ -69,7 +69,7 @@ module Jobs raise Import::FilenameMissingError else extract_files - @decoder = Import::JsonDecoder.new( File.join(tmp_directory('import'), 'tables.json') ) + @decoder = Import::JsonDecoder.new( Dir[File.join(tmp_directory('import'), '*.json')] ) Import.set_import_started Discourse.enable_maintenance_mode end @@ -78,7 +78,7 @@ module Jobs def extract_files FileUtils.cd( tmp_directory('import') ) do - `tar xvzf #{@archive_filename} tables.json` + `tar xvzf #{@archive_filename}` end end @@ -118,6 +118,10 @@ module Jobs self end + def batch_size + 1000 + end + def set_schema_info(arg) if arg[:source] && arg[:source].downcase == 'discourse' if arg[:version] && arg[:version] <= Export.current_schema_version @@ -184,9 +188,16 @@ module Jobs parameter_markers = fields.map {|x| "?"}.join(',') sql_stmt = "INSERT INTO #{table_name} (#{fields.join(',')}) VALUES (#{parameter_markers})" + User.exec_sql("BEGIN TRANSACTION") + i = 0 rows.each do |row| + if i % batch_size == 0 && i > 0 + log "#{i} rows done" + end User.exec_sql(sql_stmt, *row) + i += 1 end + User.exec_sql("COMMIT") true else diff --git a/lib/export/json_encoder.rb b/lib/export/json_encoder.rb index a27bde341b0..1312fcd6cd3 100644 --- a/lib/export/json_encoder.rb +++ b/lib/export/json_encoder.rb @@ -5,56 +5,61 @@ module Export class SchemaArgumentsError < RuntimeError; end class JsonEncoder + attr_accessor :stream_creator + include DirectoryHelper - def initialize - @table_data = {} + def initialize(stream_creator = nil) + @stream_creator = stream_creator + @stream_creator ||= lambda do |filename| + File.new(filename, 'w+b' ) + end + + @schema_data = { + schema: {} + } + + @table_info = {} end - def json_output_stream - @json_output_stream ||= File.new( File.join( tmp_directory('export'), 'tables.json' ), 'w+b' ) + + def write_json(name, data) + filename = File.join( tmp_directory('export'), "#{name}.json") + filenames << filename + stream = stream_creator.call(filename) + Oj.to_stream(stream, data, :mode => :compat) + stream.close end def write_schema_info(args) raise SchemaArgumentsError unless args[:source].present? && args[:version].present? - @schema_data = { - schema: { - source: args[:source], - version: args[:version] - } - } + @schema_data[:schema][:source] = args[:source] + @schema_data[:schema][:version] = args[:version] end def write_table(table_name, columns) - @table_data[table_name] ||= {} - @table_data[table_name][:fields] = columns.map(&:name) - @table_data[table_name][:rows] ||= [] + rows ||= [] - row_count = 0 - begin - rows = yield(row_count) - if rows - row_count += rows.size - @table_data[table_name][:rows] << rows - end + while true + current_rows = yield(rows.count) + break unless current_rows && current_rows.size > 0 + rows.concat current_rows + end - # TODO: write to multiple files as needed. - # one file per table? multiple files per table? + # TODO still way too big a chunk, needs to be split up + write_json(table_name, rows) - end while rows && rows.size > 0 + @table_info[table_name] ||= { + fields: columns.map(&:name), + row_count: rows.size + } - @table_data[table_name][:rows].flatten!(1) - @table_data[table_name][:row_count] = @table_data[table_name][:rows].size end def finish - @schema_data[:schema][:table_count] = @table_data.keys.count - json_output_stream.write( Oj.dump(@schema_data.merge(@table_data), - :mode => :compat) ) - json_output_stream.close - - @filenames = [File.join( tmp_directory('export'), 'tables.json' )] + @schema_data[:schema][:table_count] = @table_info.keys.count + write_json("schema", @schema_data.merge(@table_info)) end def filenames diff --git a/lib/import/json_decoder.rb b/lib/import/json_decoder.rb index e1b876878a1..52d49ec205a 100644 --- a/lib/import/json_decoder.rb +++ b/lib/import/json_decoder.rb @@ -2,26 +2,42 @@ module Import class JsonDecoder - def initialize(input_filename) - @input_filename = input_filename + def initialize(filenames, loader = nil) + @filemap = Hash[* + filenames.map do |filename| + [File.basename(filename, '.*'), filename] + end.flatten + ] + @loader = loader || lambda{|filename| Oj.load_file(filename)} + end + + def load_schema + @loader.call(@filemap['schema']) + end + + def each_table + @filemap.each do |name, filename| + next if name == 'schema' + yield name, @loader.call(filename) + end end def input_stream @input_stream ||= begin - File.open( @input_filename, 'rb' ) end end def start( opts ) - @json = JSON.parse(input_stream.read) - opts[:callbacks][:schema_info].call( source: @json['schema']['source'], version: @json['schema']['version'], table_count: @json.keys.size - 1) - @json.each do |key, val| - next if key == 'schema' - opts[:callbacks][:table_data].call( key, val['fields'], val['rows'], val['row_count'] ) + schema = load_schema + opts[:callbacks][:schema_info].call( source: schema['schema']['source'], version: schema['schema']['version'], table_count: schema.keys.size - 1) + + each_table do |name, data| + info = schema[name] + opts[:callbacks][:table_data].call( name, info['fields'], data, info['row_count'] ) end end end -end \ No newline at end of file +end diff --git a/spec/components/export/json_encoder_spec.rb b/spec/components/export/json_encoder_spec.rb index a2c5f624eca..2d5cfb279d6 100644 --- a/spec/components/export/json_encoder_spec.rb +++ b/spec/components/export/json_encoder_spec.rb @@ -4,10 +4,14 @@ require 'export/json_encoder' describe Export::JsonEncoder do describe "exported data" do before do - @encoder = Export::JsonEncoder.new - @testIO = StringIO.new - @encoder.stubs(:json_output_stream).returns(@testIO) - @encoder.stubs(:tmp_directory).returns( File.join(Rails.root, 'tmp', 'json_encoder_spec') ) + @streams = {} + @encoder = Export::JsonEncoder.new(lambda{ |filename| + @streams[File.basename(filename, ".*")] = StringIO.new + }) + end + + let :schema do + JSON.parse(@streams['schema'].string) end describe "write_schema_info" do @@ -15,10 +19,9 @@ describe Export::JsonEncoder do version = '20121216230719' @encoder.write_schema_info( source: 'discourse', version: version ) @encoder.finish - json = JSON.parse( @testIO.string ) - json.should have_key('schema') - json['schema']['source'].should == 'discourse' - json['schema']['version'].should == version + schema.should have_key('schema') + schema['schema']['source'].should == 'discourse' + schema['schema']['version'].should == version end it "should raise an exception when its arguments are invalid" do @@ -88,15 +91,13 @@ describe Export::JsonEncoder do it "should have a table count of 0 when no tables were exported" do @encoder.finish - json = JSON.parse( @testIO.string ) - json['schema']['table_count'].should == 0 + schema['schema']['table_count'].should == 0 end it "should have a table count of 1 when one table was exported" do @encoder.write_table(Topic.table_name, Topic.columns) { |row_count| [] } @encoder.finish - json = JSON.parse( @testIO.string ) - json['schema']['table_count'].should == 1 + schema['schema']['table_count'].should == 1 end it "should have a table count of 3 when three tables were exported" do @@ -104,15 +105,13 @@ describe Export::JsonEncoder do @encoder.write_table(User.table_name, User.columns) { |row_count| [] } @encoder.write_table(Post.table_name, Post.columns) { |row_count| [] } @encoder.finish - json = JSON.parse( @testIO.string ) - json['schema']['table_count'].should == 3 + schema['schema']['table_count'].should == 3 end it "should have a row count of 0 when no rows were exported" do @encoder.write_table(Notification.table_name, Notification.columns) { |row_count| [] } @encoder.finish - json = JSON.parse( @testIO.string ) - json[Notification.table_name]['row_count'].should == 0 + schema[Notification.table_name]['row_count'].should == 0 end it "should have a row count of 1 when one row was exported" do @@ -124,8 +123,7 @@ describe Export::JsonEncoder do end end @encoder.finish - json = JSON.parse( @testIO.string ) - json[Notification.table_name]['row_count'].should == 1 + schema[Notification.table_name]['row_count'].should == 1 end it "should have a row count of 2 when two rows were exported" do @@ -138,8 +136,7 @@ describe Export::JsonEncoder do end end @encoder.finish - json = JSON.parse( @testIO.string ) - json[Notification.table_name]['row_count'].should == 2 + schema[Notification.table_name]['row_count'].should == 2 end end end diff --git a/spec/components/import/json_decoder_spec.rb b/spec/components/import/json_decoder_spec.rb index b132105791f..cb86a743a89 100644 --- a/spec/components/import/json_decoder_spec.rb +++ b/spec/components/import/json_decoder_spec.rb @@ -7,26 +7,40 @@ describe Import::JsonDecoder do context "given valid arguments" do before do @version = '20121201205642' - @export_data = { - schema: { source: 'discourse', version: @version}, - categories: { + @schema = { + "schema" => { 'source' => 'discourse', 'version' => @version}, + "categories" => { fields: Category.columns.map(&:name), - rows: [ + row_count: 2 + }, + "notifications" => { + fields: Notification.columns.map(&:name), + row_count: 2 + } + } + + @categories = [ ["3", "entertainment", "AB9364", "155", nil, nil, nil, nil, "19", "2012-07-12 18:55:56.355932", "2012-07-12 18:55:56.355932", "1186", "17", "0", "0", "entertainment"], ["4", "question", "AB9364", "164", nil, nil, nil, nil, "1", "2012-07-12 18:55:56.355932", "2012-07-12 18:55:56.355932", "1186", "1", "0", "0", "question"] - ] - }, - notifications: { - fields: Notification.columns.map(&:name), - rows: [ + ] + + @notifications = [ ["1416", "2", "1214", "{\"topic_title\":\"UI: Where did the 'Create a Topic' button go?\",\"display_username\":\"Lowell Heddings\"}", "t", "2012-12-09 18:05:09.862898", "2012-12-09 18:05:09.862898", "394", "2", nil], ["1415", "2", "1187", "{\"topic_title\":\"Jenkins Config.xml\",\"display_username\":\"Sam\"}", "t", "2012-12-08 10:11:17.599724", "2012-12-08 10:11:17.599724", "392", "3", nil] ] - } - } - @testIO = StringIO.new(@export_data.to_json, 'r') - @decoder = Import::JsonDecoder.new('json_decoder_spec.json.gz') - @decoder.stubs(:input_stream).returns(@testIO) + + @decoder = Import::JsonDecoder.new(['xyz/schema.json', 'xyz/categories.json', 'xyz/notifications.json'], lambda{|filename| + case filename + when 'xyz/schema.json' + @schema + when 'xyz/categories.json' + @categories + when 'xyz/notifications.json' + @notifications + end + }) + + @valid_args = { callbacks: { schema_info: stub_everything, table_data: stub_everything } } end @@ -39,24 +53,24 @@ describe Import::JsonDecoder do it "should call the schema_info callback with source and version parameters when export data is from discourse" do @valid_args[:callbacks][:schema_info].expects(:call).with do |arg| - arg.should have_key(:source) - arg.should have_key(:version) - arg[:source].should == @export_data[:schema][:source] - arg[:version].should == @export_data[:schema][:version] + arg["source"].should == @schema["source"] + arg["version"].should == @schema["version"] end @decoder.start( @valid_args ) end it "should call the table_data callback at least once for each table in the export file" do - @valid_args[:callbacks][:table_data].expects(:call).with('categories', @export_data[:categories][:fields], anything, anything).at_least_once - @valid_args[:callbacks][:table_data].expects(:call).with('notifications', @export_data[:notifications][:fields], anything, anything).at_least_once + @valid_args[:callbacks][:table_data].expects(:call).with('categories', + @schema['categories']['fields'], + anything, anything + ).at_least_once + + @valid_args[:callbacks][:table_data].expects(:call).with('notifications', + @schema['notifications']['fields'], anything, anything).at_least_once @decoder.start( @valid_args ) end end - context "given invalid arguments" do - - end end -end \ No newline at end of file +end diff --git a/spec/jobs/exporter_spec.rb b/spec/jobs/exporter_spec.rb index 2bc5affb56f..c620c733feb 100644 --- a/spec/jobs/exporter_spec.rb +++ b/spec/jobs/exporter_spec.rb @@ -13,8 +13,10 @@ describe Jobs::Exporter do describe "execute" do context 'when no export or import is running' do before do - @testIO = StringIO.new - Export::JsonEncoder.any_instance.stubs(:json_output_stream).returns(@testIO) + @streams = {} + Export::JsonEncoder.any_instance.stubs(:stream_creator).returns(lambda {|filename| + @streams[File.basename(filename, '.*')] = StringIO.new + }) Jobs::Exporter.any_instance.stubs(:ordered_models_for_export).returns([]) Export.stubs(:is_export_running?).returns(false) Export.stubs(:is_import_running?).returns(false) @@ -99,32 +101,23 @@ describe Jobs::Exporter do it "should export all rows from the topics table in ascending id order" do Jobs::Exporter.any_instance.stubs(:ordered_models_for_export).returns([Topic]) Jobs::Exporter.new.execute( @exporter_args ) - json = JSON.parse( @testIO.string ) - json.should have_key('topics') - json['topics'].should have_key('rows') - json['topics']['rows'].should have(3).rows - json['topics']['rows'][0][0].to_i.should == @topic1.id - json['topics']['rows'][1][0].to_i.should == @topic2.id - json['topics']['rows'][2][0].to_i.should == @topic3.id + topics = JSON.parse( @streams['topics'].string ) + topics.should have(3).rows + topics.map{|row| row[0].to_i}.sort.should == [@topic1.id, @topic2.id, @topic3.id].sort end it "should export all rows from the post_replies table in ascending order by post_id, reply_id" do # because post_replies doesn't have an id column, so order by one of its indexes Jobs::Exporter.any_instance.stubs(:ordered_models_for_export).returns([PostReply]) Jobs::Exporter.new.execute( @exporter_args ) - json = JSON.parse( @testIO.string ) - json.should have_key('post_replies') - json['post_replies'].should have_key('rows') - json['post_replies']['rows'].should have(3).rows - json['post_replies']['rows'][0][1].to_i.should == @reply2.id - json['post_replies']['rows'][1][1].to_i.should == @reply1.id - json['post_replies']['rows'][2][1].to_i.should == @reply3.id + post_replies = JSON.parse( @streams['post_replies'].string ) + post_replies.map{|row| row[1].to_i}.sort.should == [@reply1.id, @reply2.id, @reply3.id].sort end it "should export column names for each table" do Jobs::Exporter.any_instance.stubs(:ordered_models_for_export).returns([Topic, TopicUser, PostReply]) Jobs::Exporter.new.execute( @exporter_args ) - json = JSON.parse( @testIO.string ) + json = JSON.parse( @streams['schema'].string ) json['topics'].should have_key('fields') json['topic_users'].should have_key('fields') json['post_replies'].should have_key('fields')