BUGFIXES: improved performance of import an export

we no longer generate the entire dump in memory, instead we generate one table at a time
added some basic progress
This commit is contained in:
Sam 2014-01-29 16:49:01 +11:00
parent 538a83e619
commit cce5fb3303
7 changed files with 159 additions and 112 deletions

View File

@ -25,15 +25,24 @@ module Jobs
ordered_models_for_export.each do |model|
log " #{model.table_name}"
column_info = model.columns
order_col = column_info.map(&:name).find {|x| x == 'id'} || order_columns_for(model)
column_names = model.column_names
results = model.connection.raw_connection.async_exec("select * from #{model.table_name}").to_enum
@encoder.write_table(model.table_name, column_info) do |num_rows_written|
if order_col
model.connection.select_rows("select * from #{model.table_name} order by #{order_col} limit #{batch_size} offset #{num_rows_written}")
else
# Take the rows in the order the database returns them
log "WARNING: no order by clause is being used for #{model.name} (#{model.table_name}). Please update Jobs::Exporter order_columns_for for #{model.name}."
model.connection.select_rows("select * from #{model.table_name} limit #{batch_size} offset #{num_rows_written}")
log("#{num_rows_written} rows written") if num_rows_written > 0
rows = []
begin
while rows.count < batch_size
row = results.next
rows << column_names.map{|col| row[col]}
end
rescue StopIteration
# we are done
end
rows
end
end
"#{@output_base_filename}.tar.gz"
@ -97,8 +106,10 @@ module Jobs
`tar cvf #{tar_filename} #{upload_directory}`
end
FileUtils.cd(File.dirname(filenames.first)) do
`tar --append --file=#{tar_filename} #{File.basename(filenames.first)}`
filenames.each do |filename|
FileUtils.cd(File.dirname(filename)) do
`tar --append --file=#{tar_filename} #{File.basename(filename)}`
end
end

View File

@ -69,7 +69,7 @@ module Jobs
raise Import::FilenameMissingError
else
extract_files
@decoder = Import::JsonDecoder.new( File.join(tmp_directory('import'), 'tables.json') )
@decoder = Import::JsonDecoder.new( Dir[File.join(tmp_directory('import'), '*.json')] )
Import.set_import_started
Discourse.enable_maintenance_mode
end
@ -78,7 +78,7 @@ module Jobs
def extract_files
FileUtils.cd( tmp_directory('import') ) do
`tar xvzf #{@archive_filename} tables.json`
`tar xvzf #{@archive_filename}`
end
end
@ -118,6 +118,10 @@ module Jobs
self
end
def batch_size
1000
end
def set_schema_info(arg)
if arg[:source] && arg[:source].downcase == 'discourse'
if arg[:version] && arg[:version] <= Export.current_schema_version
@ -184,9 +188,16 @@ module Jobs
parameter_markers = fields.map {|x| "?"}.join(',')
sql_stmt = "INSERT INTO #{table_name} (#{fields.join(',')}) VALUES (#{parameter_markers})"
User.exec_sql("BEGIN TRANSACTION")
i = 0
rows.each do |row|
User.exec_sql(sql_stmt, *row)
if i % batch_size == 0 && i > 0
log "#{i} rows done"
end
User.exec_sql(sql_stmt, *row)
i += 1
end
User.exec_sql("COMMIT")
true
else

View File

@ -5,56 +5,61 @@ module Export
class SchemaArgumentsError < RuntimeError; end
class JsonEncoder
attr_accessor :stream_creator
include DirectoryHelper
def initialize
@table_data = {}
def initialize(stream_creator = nil)
@stream_creator = stream_creator
@stream_creator ||= lambda do |filename|
File.new(filename, 'w+b' )
end
def json_output_stream
@json_output_stream ||= File.new( File.join( tmp_directory('export'), 'tables.json' ), 'w+b' )
@schema_data = {
schema: {}
}
@table_info = {}
end
def write_json(name, data)
filename = File.join( tmp_directory('export'), "#{name}.json")
filenames << filename
stream = stream_creator.call(filename)
Oj.to_stream(stream, data, :mode => :compat)
stream.close
end
def write_schema_info(args)
raise SchemaArgumentsError unless args[:source].present? && args[:version].present?
@schema_data = {
schema: {
source: args[:source],
version: args[:version]
}
}
@schema_data[:schema][:source] = args[:source]
@schema_data[:schema][:version] = args[:version]
end
def write_table(table_name, columns)
@table_data[table_name] ||= {}
@table_data[table_name][:fields] = columns.map(&:name)
@table_data[table_name][:rows] ||= []
rows ||= []
row_count = 0
begin
rows = yield(row_count)
if rows
row_count += rows.size
@table_data[table_name][:rows] << rows
while true
current_rows = yield(rows.count)
break unless current_rows && current_rows.size > 0
rows.concat current_rows
end
# TODO: write to multiple files as needed.
# one file per table? multiple files per table?
# TODO still way too big a chunk, needs to be split up
write_json(table_name, rows)
end while rows && rows.size > 0
@table_info[table_name] ||= {
fields: columns.map(&:name),
row_count: rows.size
}
@table_data[table_name][:rows].flatten!(1)
@table_data[table_name][:row_count] = @table_data[table_name][:rows].size
end
def finish
@schema_data[:schema][:table_count] = @table_data.keys.count
json_output_stream.write( Oj.dump(@schema_data.merge(@table_data),
:mode => :compat) )
json_output_stream.close
@filenames = [File.join( tmp_directory('export'), 'tables.json' )]
@schema_data[:schema][:table_count] = @table_info.keys.count
write_json("schema", @schema_data.merge(@table_info))
end
def filenames

View File

@ -2,23 +2,39 @@ module Import
class JsonDecoder
def initialize(input_filename)
@input_filename = input_filename
def initialize(filenames, loader = nil)
@filemap = Hash[*
filenames.map do |filename|
[File.basename(filename, '.*'), filename]
end.flatten
]
@loader = loader || lambda{|filename| Oj.load_file(filename)}
end
def load_schema
@loader.call(@filemap['schema'])
end
def each_table
@filemap.each do |name, filename|
next if name == 'schema'
yield name, @loader.call(filename)
end
end
def input_stream
@input_stream ||= begin
File.open( @input_filename, 'rb' )
end
end
def start( opts )
@json = JSON.parse(input_stream.read)
opts[:callbacks][:schema_info].call( source: @json['schema']['source'], version: @json['schema']['version'], table_count: @json.keys.size - 1)
@json.each do |key, val|
next if key == 'schema'
opts[:callbacks][:table_data].call( key, val['fields'], val['rows'], val['row_count'] )
schema = load_schema
opts[:callbacks][:schema_info].call( source: schema['schema']['source'], version: schema['schema']['version'], table_count: schema.keys.size - 1)
each_table do |name, data|
info = schema[name]
opts[:callbacks][:table_data].call( name, info['fields'], data, info['row_count'] )
end
end

View File

@ -4,10 +4,14 @@ require 'export/json_encoder'
describe Export::JsonEncoder do
describe "exported data" do
before do
@encoder = Export::JsonEncoder.new
@testIO = StringIO.new
@encoder.stubs(:json_output_stream).returns(@testIO)
@encoder.stubs(:tmp_directory).returns( File.join(Rails.root, 'tmp', 'json_encoder_spec') )
@streams = {}
@encoder = Export::JsonEncoder.new(lambda{ |filename|
@streams[File.basename(filename, ".*")] = StringIO.new
})
end
let :schema do
JSON.parse(@streams['schema'].string)
end
describe "write_schema_info" do
@ -15,10 +19,9 @@ describe Export::JsonEncoder do
version = '20121216230719'
@encoder.write_schema_info( source: 'discourse', version: version )
@encoder.finish
json = JSON.parse( @testIO.string )
json.should have_key('schema')
json['schema']['source'].should == 'discourse'
json['schema']['version'].should == version
schema.should have_key('schema')
schema['schema']['source'].should == 'discourse'
schema['schema']['version'].should == version
end
it "should raise an exception when its arguments are invalid" do
@ -88,15 +91,13 @@ describe Export::JsonEncoder do
it "should have a table count of 0 when no tables were exported" do
@encoder.finish
json = JSON.parse( @testIO.string )
json['schema']['table_count'].should == 0
schema['schema']['table_count'].should == 0
end
it "should have a table count of 1 when one table was exported" do
@encoder.write_table(Topic.table_name, Topic.columns) { |row_count| [] }
@encoder.finish
json = JSON.parse( @testIO.string )
json['schema']['table_count'].should == 1
schema['schema']['table_count'].should == 1
end
it "should have a table count of 3 when three tables were exported" do
@ -104,15 +105,13 @@ describe Export::JsonEncoder do
@encoder.write_table(User.table_name, User.columns) { |row_count| [] }
@encoder.write_table(Post.table_name, Post.columns) { |row_count| [] }
@encoder.finish
json = JSON.parse( @testIO.string )
json['schema']['table_count'].should == 3
schema['schema']['table_count'].should == 3
end
it "should have a row count of 0 when no rows were exported" do
@encoder.write_table(Notification.table_name, Notification.columns) { |row_count| [] }
@encoder.finish
json = JSON.parse( @testIO.string )
json[Notification.table_name]['row_count'].should == 0
schema[Notification.table_name]['row_count'].should == 0
end
it "should have a row count of 1 when one row was exported" do
@ -124,8 +123,7 @@ describe Export::JsonEncoder do
end
end
@encoder.finish
json = JSON.parse( @testIO.string )
json[Notification.table_name]['row_count'].should == 1
schema[Notification.table_name]['row_count'].should == 1
end
it "should have a row count of 2 when two rows were exported" do
@ -138,8 +136,7 @@ describe Export::JsonEncoder do
end
end
@encoder.finish
json = JSON.parse( @testIO.string )
json[Notification.table_name]['row_count'].should == 2
schema[Notification.table_name]['row_count'].should == 2
end
end
end

View File

@ -7,26 +7,40 @@ describe Import::JsonDecoder do
context "given valid arguments" do
before do
@version = '20121201205642'
@export_data = {
schema: { source: 'discourse', version: @version},
categories: {
@schema = {
"schema" => { 'source' => 'discourse', 'version' => @version},
"categories" => {
fields: Category.columns.map(&:name),
rows: [
row_count: 2
},
"notifications" => {
fields: Notification.columns.map(&:name),
row_count: 2
}
}
@categories = [
["3", "entertainment", "AB9364", "155", nil, nil, nil, nil, "19", "2012-07-12 18:55:56.355932", "2012-07-12 18:55:56.355932", "1186", "17", "0", "0", "entertainment"],
["4", "question", "AB9364", "164", nil, nil, nil, nil, "1", "2012-07-12 18:55:56.355932", "2012-07-12 18:55:56.355932", "1186", "1", "0", "0", "question"]
]
},
notifications: {
fields: Notification.columns.map(&:name),
rows: [
@notifications = [
["1416", "2", "1214", "{\"topic_title\":\"UI: Where did the 'Create a Topic' button go?\",\"display_username\":\"Lowell Heddings\"}", "t", "2012-12-09 18:05:09.862898", "2012-12-09 18:05:09.862898", "394", "2", nil],
["1415", "2", "1187", "{\"topic_title\":\"Jenkins Config.xml\",\"display_username\":\"Sam\"}", "t", "2012-12-08 10:11:17.599724", "2012-12-08 10:11:17.599724", "392", "3", nil]
]
}
}
@testIO = StringIO.new(@export_data.to_json, 'r')
@decoder = Import::JsonDecoder.new('json_decoder_spec.json.gz')
@decoder.stubs(:input_stream).returns(@testIO)
@decoder = Import::JsonDecoder.new(['xyz/schema.json', 'xyz/categories.json', 'xyz/notifications.json'], lambda{|filename|
case filename
when 'xyz/schema.json'
@schema
when 'xyz/categories.json'
@categories
when 'xyz/notifications.json'
@notifications
end
})
@valid_args = { callbacks: { schema_info: stub_everything, table_data: stub_everything } }
end
@ -39,24 +53,24 @@ describe Import::JsonDecoder do
it "should call the schema_info callback with source and version parameters when export data is from discourse" do
@valid_args[:callbacks][:schema_info].expects(:call).with do |arg|
arg.should have_key(:source)
arg.should have_key(:version)
arg[:source].should == @export_data[:schema][:source]
arg[:version].should == @export_data[:schema][:version]
arg["source"].should == @schema["source"]
arg["version"].should == @schema["version"]
end
@decoder.start( @valid_args )
end
it "should call the table_data callback at least once for each table in the export file" do
@valid_args[:callbacks][:table_data].expects(:call).with('categories', @export_data[:categories][:fields], anything, anything).at_least_once
@valid_args[:callbacks][:table_data].expects(:call).with('notifications', @export_data[:notifications][:fields], anything, anything).at_least_once
@valid_args[:callbacks][:table_data].expects(:call).with('categories',
@schema['categories']['fields'],
anything, anything
).at_least_once
@valid_args[:callbacks][:table_data].expects(:call).with('notifications',
@schema['notifications']['fields'], anything, anything).at_least_once
@decoder.start( @valid_args )
end
end
context "given invalid arguments" do
end
end
end

View File

@ -13,8 +13,10 @@ describe Jobs::Exporter do
describe "execute" do
context 'when no export or import is running' do
before do
@testIO = StringIO.new
Export::JsonEncoder.any_instance.stubs(:json_output_stream).returns(@testIO)
@streams = {}
Export::JsonEncoder.any_instance.stubs(:stream_creator).returns(lambda {|filename|
@streams[File.basename(filename, '.*')] = StringIO.new
})
Jobs::Exporter.any_instance.stubs(:ordered_models_for_export).returns([])
Export.stubs(:is_export_running?).returns(false)
Export.stubs(:is_import_running?).returns(false)
@ -99,32 +101,23 @@ describe Jobs::Exporter do
it "should export all rows from the topics table in ascending id order" do
Jobs::Exporter.any_instance.stubs(:ordered_models_for_export).returns([Topic])
Jobs::Exporter.new.execute( @exporter_args )
json = JSON.parse( @testIO.string )
json.should have_key('topics')
json['topics'].should have_key('rows')
json['topics']['rows'].should have(3).rows
json['topics']['rows'][0][0].to_i.should == @topic1.id
json['topics']['rows'][1][0].to_i.should == @topic2.id
json['topics']['rows'][2][0].to_i.should == @topic3.id
topics = JSON.parse( @streams['topics'].string )
topics.should have(3).rows
topics.map{|row| row[0].to_i}.sort.should == [@topic1.id, @topic2.id, @topic3.id].sort
end
it "should export all rows from the post_replies table in ascending order by post_id, reply_id" do
# because post_replies doesn't have an id column, so order by one of its indexes
Jobs::Exporter.any_instance.stubs(:ordered_models_for_export).returns([PostReply])
Jobs::Exporter.new.execute( @exporter_args )
json = JSON.parse( @testIO.string )
json.should have_key('post_replies')
json['post_replies'].should have_key('rows')
json['post_replies']['rows'].should have(3).rows
json['post_replies']['rows'][0][1].to_i.should == @reply2.id
json['post_replies']['rows'][1][1].to_i.should == @reply1.id
json['post_replies']['rows'][2][1].to_i.should == @reply3.id
post_replies = JSON.parse( @streams['post_replies'].string )
post_replies.map{|row| row[1].to_i}.sort.should == [@reply1.id, @reply2.id, @reply3.id].sort
end
it "should export column names for each table" do
Jobs::Exporter.any_instance.stubs(:ordered_models_for_export).returns([Topic, TopicUser, PostReply])
Jobs::Exporter.new.execute( @exporter_args )
json = JSON.parse( @testIO.string )
json = JSON.parse( @streams['schema'].string )
json['topics'].should have_key('fields')
json['topic_users'].should have_key('fields')
json['post_replies'].should have_key('fields')