Improved mailing list import.

Now uses a SQLite database to store messages rather than JSON files
for performance and memory considerations.
This commit is contained in:
Robin Ward 2016-06-14 11:44:35 -04:00
parent 470da6205c
commit b42f28d4c3
2 changed files with 188 additions and 81 deletions

View File

@ -198,10 +198,14 @@ class ImportScripts::Base
def all_records_exist?(type, import_ids)
return false if import_ids.empty?
Post.exec_sql('CREATE TEMP TABLE import_ids(val varchar(200) PRIMARY KEY)')
orig_conn = ActiveRecord::Base.connection
conn = orig_conn.raw_connection
conn.exec('CREATE TEMP TABLE import_ids(val varchar(200) PRIMARY KEY)')
import_id_clause = import_ids.map { |id| "('#{PG::Connection.escape_string(id.to_s)}')" }.join(",")
Post.exec_sql("INSERT INTO import_ids VALUES #{import_id_clause}")
conn.exec("INSERT INTO import_ids VALUES #{import_id_clause}")
existing = "#{type.to_s.classify}CustomField".constantize.where(name: 'import_id')
existing = existing.joins('JOIN import_ids ON val = value')
@ -211,7 +215,7 @@ class ImportScripts::Base
return true
end
ensure
Post.exec_sql('DROP TABLE import_ids')
conn.exec('DROP TABLE import_ids')
end
# Iterate through a list of user records to be imported.
@ -366,7 +370,7 @@ class ImportScripts::Base
params[:parent_category_id] = top.id if top
end
new_category = create_category(params, params[:id])
create_category(params, params[:id])
created += 1
end

View File

@ -1,3 +1,4 @@
require 'sqlite3'
require File.expand_path(File.dirname(__FILE__) + "/base.rb")
class ImportScripts::Mbox < ImportScripts::Base
@ -5,96 +6,169 @@ class ImportScripts::Mbox < ImportScripts::Base
BATCH_SIZE = 1000
CATEGORY_ID = 6
MBOX_DIR = "/tmp/mbox-input"
USER_INDEX_PATH = "#{MBOX_DIR}/user-index.json"
TOPIC_INDEX_PATH = "#{MBOX_DIR}/topic-index.json"
REPLY_INDEX_PATH = "#{MBOX_DIR}/replies-index.json"
MBOX_DIR = File.expand_path("~/import/site")
# Remove to not split individual files
SPLIT_AT = /^From owner-/
def execute
create_indices
create_email_indices
create_user_indices
massage_indices
import_users
create_forum_topics
import_replies
end
def all_messages
def open_db
SQLite3::Database.new("#{MBOX_DIR}/index.db")
end
files = Dir["#{MBOX_DIR}/*/*"]
def all_messages
files = Dir["#{MBOX_DIR}/messages/*"]
files.each_with_index do |f, idx|
raw = File.read(f)
mail = Mail.read_from_string(raw)
yield mail, f
print_status(idx, files.size)
if SPLIT_AT.present?
msg = ""
File.foreach(f).with_index do |line, line_num|
line = line.scrub
if line =~ SPLIT_AT
if !msg.empty?
mail = Mail.read_from_string(msg)
yield mail
print_status(idx, files.size)
msg = ""
end
end
msg << line
end
if !msg.empty?
mail = Mail.read_from_string(msg)
yield mail
print_status(idx, files.size)
msg = ""
end
else
raw = File.read(f)
mail = Mail.read_from_string(raw)
yield mail
print_status(idx, files.size)
end
end
end
def create_indices
return if File.exist?(USER_INDEX_PATH) && File.exist?(TOPIC_INDEX_PATH) && File.exist?(REPLY_INDEX_PATH)
puts "", "creating indices"
users = {}
def massage_indices
db = open_db
db.execute "UPDATE emails SET reply_to = null WHERE reply_to = ''"
topics = []
rows = db.execute "SELECT msg_id, title, reply_to FROM emails ORDER BY email_date ASC"
topic_lookup = {}
topic_titles = {}
replies = []
msg_ids = {}
titles = {}
rows.each do |row|
msg_ids[row[0]] = true
titles[row[1]] = row[0]
end
all_messages do |mail, filename|
users[mail.from.first] = mail[:from].display_names.first
msg_id = mail['Message-ID'].to_s
reply_to = mail['In-Reply-To'].to_s
title = clean_title(mail['Subject'].to_s)
date = Time.parse(mail['date'].to_s).to_i
# First, any replies where the parent doesn't exist should have that field cleared
not_found = []
rows.each do |row|
msg_id, _, reply_to = row
if reply_to.present?
topic = topic_lookup[reply_to] || reply_to
topic_lookup[msg_id] = topic
replies << {id: msg_id, topic: topic, file: filename, title: title, date: date}
else
topics << {id: msg_id, file: filename, title: title, date: date}
topic_titles[title] ||= msg_id
not_found << msg_id if msg_ids[reply_to].blank?
end
end
replies.sort! {|a, b| a[:date] <=> b[:date]}
topics.sort! {|a, b| a[:date] <=> b[:date]}
# Replies without parents should be hoisted to topics
to_hoist = []
replies.each do |r|
to_hoist << r if !topic_lookup[r[:topic]]
puts "#{not_found.size} records couldn't be associated with parents"
if not_found.present?
db.execute "UPDATE emails SET reply_to = NULL WHERE msg_id IN (#{not_found.map {|nf| "'#{nf}'"}.join(',')})"
end
to_hoist.each do |h|
replies.delete(h)
topics << {id: h[:id], file: h[:file], title: h[:title], date: h[:date]}
topic_titles[h[:title]] ||= h[:id]
dupe_titles = db.execute "SELECT title, COUNT(*) FROM emails GROUP BY title HAVING count(*) > 1"
puts "#{dupe_titles.size} replies to wire up"
dupe_titles.each do |t|
title = t[0]
first = titles[title]
db.execute "UPDATE emails SET reply_to = ? WHERE title = ? and msg_id <> ?", [first, title, first]
end
# Topics with duplicate replies should be replies
to_group = []
topics.each do |t|
first = topic_titles[t[:title]]
to_group << t if first && first != t[:id]
ensure
db.close
end
def create_email_indices
db = open_db
db.execute "DROP TABLE IF EXISTS emails"
db.execute <<-SQL
CREATE TABLE emails (
msg_id VARCHAR(995) PRIMARY KEY,
from_email VARCHAR(255) NOT NULL,
from_name VARCHAR(255) NOT NULL,
title VARCHAR(255) NOT NULL,
reply_to VARCHAR(955) NULL,
email_date DATETIME NOT NULL,
message TEXT NOT NULL
);
SQL
db.execute "CREATE INDEX by_title ON emails (title)"
db.execute "CREATE INDEX by_email ON emails (from_email)"
puts "", "creating indices"
all_messages do |mail|
msg_id = mail['Message-ID'].to_s
# Many ways to get a name
from = mail[:from]
from_name = nil
from_email = nil
if mail.from.present?
from_email = mail.from.first
end
display_names = from.try(:display_names)
if display_names.present?
from_name = display_names.first
end
if from_name.blank? && from.to_s =~ /\(([^\)]+)\)/
from_name = Regexp.last_match[1]
end
from_name = from.to_s if from_name.blank?
title = clean_title(mail['Subject'].to_s)
reply_to = mail['In-Reply-To'].to_s
email_date = mail['date'].to_s
db.execute "INSERT OR IGNORE INTO emails (msg_id, from_email, from_name, title, reply_to, email_date, message)
VALUES (?, ?, ?, ?, ?, ?, ?)",
[msg_id, from_email, from_name, title, reply_to, email_date, mail.to_s]
end
ensure
db.close
end
to_group.each do |t|
topics.delete(t)
replies << {id: t[:id], topic: topic_titles[t[:title]], file: t[:file], title: t[:title], date: t[:date]}
end
def create_user_indices
db = open_db
db.execute "DROP TABLE IF EXISTS users"
db.execute <<-SQL
CREATE TABLE users (
email VARCHAR(995) PRIMARY KEY,
name VARCHAR(255) NOT NULL
);
SQL
replies.sort! {|a, b| a[:date] <=> b[:date]}
topics.sort! {|a, b| a[:date] <=> b[:date]}
File.write(USER_INDEX_PATH, {users: users}.to_json)
File.write(TOPIC_INDEX_PATH, {topics: topics}.to_json)
File.write(REPLY_INDEX_PATH, {replies: replies}.to_json)
db.execute "INSERT OR IGNORE INTO users (email, name) SELECT from_email, from_name FROM emails"
ensure
db.close
end
def clean_title(title)
title ||= ""
#Strip mailing list name from subject
title = title.gsub(/\[[^\]]+\]+/, '').strip
@ -125,24 +199,26 @@ class ImportScripts::Mbox < ImportScripts::Base
def import_users
puts "", "importing users"
db = open_db
all_users = ::JSON.parse(File.read(USER_INDEX_PATH))['users']
user_keys = all_users.keys
total_count = user_keys.size
all_users = db.execute("SELECT name, email FROM users")
total_count = all_users.size
batches(BATCH_SIZE) do |offset|
users = user_keys[offset..offset+BATCH_SIZE-1]
users = all_users[offset..offset+BATCH_SIZE-1]
break if users.nil?
next if all_records_exist? :users, users
next if all_records_exist? :users, users.map {|u| u[1]}
create_users(users, total: total_count, offset: offset) do |email|
create_users(users, total: total_count, offset: offset) do |u|
{
id: email,
email: email,
name: all_users[email]
id: u[1],
email: u[1],
name: u[0]
}
end
end
ensure
db.close
end
def parse_email(msg)
@ -157,23 +233,33 @@ class ImportScripts::Mbox < ImportScripts::Base
def create_forum_topics
puts "", "creating forum topics"
all_topics = ::JSON.parse(File.read(TOPIC_INDEX_PATH))['topics']
db = open_db
all_topics = db.execute("SELECT msg_id,
from_email,
from_name,
title,
email_date,
message
FROM emails
WHERE reply_to IS NULL")
topic_count = all_topics.size
batches(BATCH_SIZE) do |offset|
topics = all_topics[offset..offset+BATCH_SIZE-1]
break if topics.nil?
next if all_records_exist? :posts, topics.map {|t| t['id']}
next if all_records_exist? :posts, topics.map {|t| t[0]}
create_posts(topics, total: topic_count, offset: offset) do |t|
raw_email = File.read(t['file'])
raw_email = t[5]
receiver = Email::Receiver.new(raw_email)
mail = Mail.read_from_string(raw_email)
mail.body
selected = receiver.select_body
next unless selected
selected = selected.join('') if selected.kind_of?(Array)
raw = selected.force_encoding(selected.encoding).encode("UTF-8")
@ -195,7 +281,7 @@ class ImportScripts::Mbox < ImportScripts::Base
end
end
{ id: t['id'],
{ id: t[0],
title: clean_title(title),
user_id: user_id_from_imported_user_id(mail.from.first) || Discourse::SYSTEM_USER_ID,
created_at: mail.date,
@ -204,34 +290,49 @@ class ImportScripts::Mbox < ImportScripts::Base
cook_method: Post.cook_methods[:email] }
end
end
ensure
db.close
end
def import_replies
puts "", "creating topic replies"
replies = ::JSON.parse(File.read(REPLY_INDEX_PATH))['replies']
db = open_db
replies = db.execute("SELECT msg_id,
from_email,
from_name,
title,
email_date,
message,
reply_to
FROM emails
WHERE reply_to IS NOT NULL")
post_count = replies.size
batches(BATCH_SIZE) do |offset|
posts = replies[offset..offset+BATCH_SIZE-1]
break if posts.nil?
next if all_records_exist? :posts, posts.map {|p| p['id']}
next if all_records_exist? :posts, posts.map {|p| p[0]}
create_posts(posts, total: post_count, offset: offset) do |p|
parent_id = p['topic']
id = p['id']
parent_id = p[6]
id = p[0]
topic = topic_lookup_from_imported_post_id(parent_id)
topic_id = topic[:topic_id] if topic
next unless topic_id
raw_email = File.read(p['file'])
raw_email = p[5]
receiver = Email::Receiver.new(raw_email)
mail = Mail.read_from_string(raw_email)
mail.body
selected = receiver.select_body
selected = selected.join('') if selected.kind_of?(Array)
next unless selected
raw = selected.force_encoding(selected.encoding).encode("UTF-8")
# import the attachments
@ -258,6 +359,8 @@ class ImportScripts::Mbox < ImportScripts::Base
cook_method: Post.cook_methods[:email] }
end
end
ensure
db.close
end
end