2013-02-06 03:16:51 +08:00
require_dependency 'import/json_decoder'
require_dependency 'import/import'
require_dependency 'import/adapter/base'
( Dir . entries ( File . join ( Rails . root , 'lib' , 'import' , 'adapter' ) ) - [ '.' , '..' , 'base.rb' ] ) . each do | f |
require_dependency " import/adapter/ #{ f } "
end
module Jobs
class Importer < Jobs :: Base
2013-03-23 23:02:59 +08:00
sidekiq_options retry : false
2013-02-06 03:16:51 +08:00
BACKUP_SCHEMA = 'backup'
def initialize
@index_definitions = { }
@format = :json
@warnings = [ ]
end
def execute ( args )
ordered_models_for_import . each { | model | model . primary_key } # a HACK to workaround cache problems
raise Import :: ImportDisabledError unless SiteSetting . allow_import?
raise Import :: ImportInProgressError if Import :: is_import_running?
raise Export :: ExportInProgressError if Export :: is_export_running?
# Disable printing of NOTICE, DETAIL and other unimportant messages from postgresql
User . exec_sql ( " SET client_min_messages TO WARNING " )
@format = args [ :format ] || :json
@archive_filename = args [ :filename ]
if args [ :user_id ]
# After the import is done, we'll need to reload the user record and make sure it's the same person
# before sending a notification
user = User . where ( id : args [ :user_id ] . to_i ) . first
@user_info = { user_id : user . id , email : user . email }
else
@user_info = nil
end
start_import
backup_tables
begin
load_data
create_indexes
extract_uploads
rescue
log " Performing a ROLLBACK because something went wrong! "
rollback
raise
end
ensure
finish_import
end
def ordered_models_for_import
Export . models_included_in_export
end
def start_import
if @format != :json
raise Import :: FormatInvalidError
elsif @archive_filename . nil?
raise Import :: FilenameMissingError
else
extract_files
@decoder = Import :: JsonDecoder . new ( File . join ( tmp_directory , 'tables.json' ) )
Import . set_import_started
Discourse . enable_maintenance_mode
end
self
end
def tmp_directory
@tmp_directory || = begin
f = File . join ( Rails . root , 'tmp' , Time . now . strftime ( 'import%Y%m%d%H%M%S' ) )
Dir . mkdir ( f ) unless Dir [ f ] . present?
f
end
end
def extract_files
FileUtils . cd ( tmp_directory ) do
` tar xvzf #{ @archive_filename } tables.json `
end
end
def backup_tables
log " Backing up tables "
ActiveRecord :: Base . transaction do
create_backup_schema
ordered_models_for_import . each do | model |
backup_and_setup_table ( model )
end
end
self
end
def create_backup_schema
User . exec_sql ( " DROP SCHEMA IF EXISTS #{ BACKUP_SCHEMA } CASCADE " )
User . exec_sql ( " CREATE SCHEMA #{ BACKUP_SCHEMA } " )
self
end
def backup_and_setup_table ( model )
log " #{ model . table_name } "
@index_definitions [ model . table_name ] = model . exec_sql ( " SELECT indexdef FROM pg_indexes WHERE tablename = ' #{ model . table_name } ' and schemaname = 'public'; " ) . map { | x | x [ 'indexdef' ] }
model . exec_sql ( " ALTER TABLE #{ model . table_name } SET SCHEMA #{ BACKUP_SCHEMA } " )
model . exec_sql ( " CREATE TABLE #{ model . table_name } (LIKE #{ BACKUP_SCHEMA } . #{ model . table_name } INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING COMMENTS INCLUDING STORAGE); " )
self
end
def load_data
log " Importing data "
@decoder . start (
callbacks : {
schema_info : method ( :set_schema_info ) ,
table_data : method ( :load_table )
}
)
self
end
def set_schema_info ( arg )
2013-03-05 08:42:44 +08:00
if arg [ :source ] && arg [ :source ] . downcase == 'discourse'
if arg [ :version ] && arg [ :version ] < = Export . current_schema_version
2013-02-06 03:16:51 +08:00
@export_schema_version = arg [ :version ]
if arg [ :table_count ] == ordered_models_for_import . size
true
else
raise Import :: WrongTableCountError . new ( " Expected to find #{ ordered_models_for_import . size } tables, but export file has #{ arg [ :table_count ] } tables! " )
end
elsif arg [ :version ] . nil?
raise ArgumentError . new ( " The schema version must be provided. " )
else
raise Import :: UnsupportedSchemaVersion . new ( " Export file is from a newer version of Discourse. Upgrade and run migrations to import this file. " )
end
else
raise Import :: UnsupportedExportSource
end
end
def load_table ( table_name , fields_arg , row_data , row_count )
fields = fields_arg . dup
model = Export :: models_included_in_export . find { | m | m . table_name == table_name }
if model
@adapters || = Import . adapters_for_version ( @export_schema_version )
log " #{ table_name } : #{ row_count } rows "
if @adapters [ table_name ]
@adapters [ table_name ] . each do | adapter |
fields = adapter . apply_to_column_names ( table_name , fields )
end
end
if fields . size > model . columns . size
raise Import :: WrongFieldCountError . new ( " Table #{ table_name } is expected to have #{ model . columns . size } fields, but got #{ fields . size } ! Maybe your Discourse server is older than the server that this export file comes from? " )
end
# If there are fewer fields in the data than the model has, then insert only those fields and
# hope that the table uses default values or allows null for the missing columns.
# If the table doesn't have defaults or is not nullable, then a migration adapter should have been created
# along with the migration.
column_info = model . columns
col_num = - 1
rows = row_data . map do | row |
if @adapters [ table_name ]
@adapters [ table_name ] . each do | adapter |
row = adapter . apply_to_row ( table_name , row )
end
end
row
end . transpose . map do | col_values |
col_num += 1
case column_info [ col_num ] . type
when :boolean
col_values . map { | v | v . nil? ? nil : ( v == 'f' ? false : true ) }
else
col_values
end
end . transpose
parameter_markers = fields . map { | x | " ? " } . join ( ',' )
sql_stmt = " INSERT INTO #{ table_name } ( #{ fields . join ( ',' ) } ) VALUES ( #{ parameter_markers } ) "
rows . each do | row |
User . exec_sql ( sql_stmt , * row )
end
true
else
add_warning " Export file contained an unrecognized table named: #{ table_name } ! It was ignored. "
end
end
def create_indexes
log " Creating indexes "
ordered_models_for_import . each do | model |
log " #{ model . table_name } "
@index_definitions [ model . table_name ] . each do | indexdef |
model . exec_sql ( indexdef )
end
# The indexdef statements don't create the primary keys, so we need to find the primary key and do it ourselves.
pkey_index_def = @index_definitions [ model . table_name ] . find { | ixdef | ixdef =~ / ([ \ S]{1,}_pkey) / }
2013-03-05 08:42:44 +08:00
if pkey_index_def && pkey_index_name = / ([ \ S]{1,}_pkey) / . match ( pkey_index_def ) [ 1 ]
2013-02-06 03:16:51 +08:00
model . exec_sql ( " ALTER TABLE ONLY #{ model . table_name } ADD PRIMARY KEY USING INDEX #{ pkey_index_name } " )
end
if model . columns . map ( & :name ) . include? ( 'id' )
max_id = model . exec_sql ( " SELECT MAX(id) AS max FROM #{ model . table_name } " ) [ 0 ] [ 'max' ] . to_i + 1
seq_name = " #{ model . table_name } _id_seq "
model . exec_sql ( " CREATE SEQUENCE #{ seq_name } START WITH #{ max_id } INCREMENT BY 1 NO MINVALUE NO MAXVALUE CACHE 1 " )
model . exec_sql ( " ALTER SEQUENCE #{ seq_name } OWNED BY #{ model . table_name } .id " )
model . exec_sql ( " ALTER TABLE #{ model . table_name } ALTER COLUMN id SET DEFAULT nextval(' #{ seq_name } ') " )
end
end
self
end
def extract_uploads
if ` tar tf #{ @archive_filename } | grep "uploads/" ` . present?
FileUtils . cd ( File . join ( Rails . root , 'public' ) ) do
` tar -xz --keep-newer-files -f #{ @archive_filename } uploads/ `
end
end
end
def rollback
ordered_models_for_import . each do | model |
log " #{ model . table_name } "
model . exec_sql ( " DROP TABLE IF EXISTS #{ model . table_name } " ) rescue nil
begin
model . exec_sql ( " ALTER TABLE #{ BACKUP_SCHEMA } . #{ model . table_name } SET SCHEMA public " )
rescue = > e
log " Failed to restore. #{ e . class . name } : #{ e . message } "
end
end
end
def finish_import
Import . set_import_is_not_running
Discourse . disable_maintenance_mode
FileUtils . rm_rf ( tmp_directory ) if Dir [ tmp_directory ] . present?
if @warnings . size > 0
log " WARNINGS: "
@warnings . each do | message |
log " #{ message } "
end
end
# send_notification
end
def send_notification
# Doesn't work. "WARNING: Can't mass-assign protected attributes: created_at"
# Still a problem with the activerecord schema_cache I think.
2013-03-05 08:42:44 +08:00
# if @user_info && @user_info[:user_id]
2013-02-06 03:16:51 +08:00
# user = User.where(id: @user_info[:user_id]).first
2013-03-05 08:42:44 +08:00
# if user && user.email == @user_info[:email]
2013-02-06 03:16:51 +08:00
# SystemMessage.new(user).create('import_succeeded')
# end
# end
true
end
def add_warning ( message )
@warnings << message
end
def log ( * args )
puts args
args . each do | arg |
Rails . logger . info " #{ Time . now . to_formatted_s ( :db ) } : [IMPORTER] #{ arg } "
end
true
end
end
2013-03-23 23:02:59 +08:00
end