Add new, experimental version of mbox importer

This commit is contained in:
Gerhard Schlager 2017-05-26 22:26:18 +02:00
parent 781f364d86
commit 8299e7e8c3
No known key found for this signature in database
GPG Key ID: 7DACA3C95B36014B
8 changed files with 710 additions and 5 deletions

View File

@ -190,4 +190,5 @@ gem 'sassc', require: false
if ENV["IMPORT"] == "1"
gem 'mysql2'
gem 'redcarpet'
gem 'sqlite3', '~> 1.3.13'
end

View File

@ -28,6 +28,9 @@ module Email
class InvalidPostAction < ProcessingError; end
attr_reader :incoming_email
attr_reader :raw_email
attr_reader :mail
attr_reader :message_id
def initialize(mail_string)
raise EmptyEmailError if mail_string.blank?
@ -241,7 +244,7 @@ module Email
def try_to_encode(string, encoding)
encoded = string.encode("UTF-8", encoding)
encoded.present? && encoded.valid_encoding? ? encoded : nil
!encoded.nil? && encoded.valid_encoding? ? encoded : nil
rescue Encoding::InvalidByteSequenceError,
Encoding::UndefinedConversionError,
Encoding::ConverterNotFoundError
@ -622,10 +625,7 @@ module Email
# only add elided part in messages
if options[:elided].present? && (SiteSetting.always_show_trimmed_content || is_private_message)
options[:raw] << "\n\n" << "<details class='elided'>" << "\n"
options[:raw] << "<summary title='#{I18n.t('emails.incoming.show_trimmed_content')}'>&#183;&#183;&#183;</summary>" << "\n"
options[:raw] << options[:elided] << "\n"
options[:raw] << "</details>" << "\n"
options[:raw] << Email::Receiver.elided_html(options[:elided])
end
user = options.delete(:user)
@ -643,6 +643,14 @@ module Email
result.post
end
def self.elided_html(elided)
html = "\n\n" << "<details class='elided'>" << "\n"
html << "<summary title='#{I18n.t('emails.incoming.show_trimmed_content')}'>&#183;&#183;&#183;</summary>" << "\n"
html << elided << "\n"
html << "</details>" << "\n"
html
end
def add_other_addresses(topic, sender)
%i(to cc bcc).each do |d|
if @mail[d] && @mail[d].address_list && @mail[d].address_list.addresses

View File

@ -0,0 +1,17 @@
if ARGV.length != 1 || !File.exists?(ARGV[0])
STDERR.puts '', 'Usage of mbox importer:', 'bundle exec ruby mbox-experimental.rb <path/to/settings.yml>'
STDERR.puts '', "Use the settings file from #{File.expand_path('mbox/settings.yml', File.dirname(__FILE__))} as an example."
exit 1
end
module ImportScripts
module Mbox
require_relative 'mbox/support/settings'
@settings = Settings.load(ARGV[0])
require_relative 'mbox/importer'
Importer.new(@settings).perform
end
end

View File

@ -0,0 +1,161 @@
require_relative '../base'
require_relative 'support/database'
require_relative 'support/indexer'
require_relative 'support/settings'
module ImportScripts::Mbox
class Importer < ImportScripts::Base
# @param settings [ImportScripts::Mbox::Settings]
def initialize(settings)
@settings = settings
super()
@database = Database.new(@settings.data_dir, @settings.batch_size)
end
def change_site_settings
super
SiteSetting.enable_staged_users = true
end
protected
def execute
index_messages
import_categories
import_users
import_posts
end
def index_messages
puts '', 'creating index'
indexer = Indexer.new(@database, @settings)
indexer.execute
end
def import_categories
puts '', 'creating categories'
rows = @database.fetch_categories
create_categories(rows) do |row|
{
id: row['name'],
name: row['name']
}
end
end
def import_users
puts '', 'creating users'
total_count = @database.count_users
last_email = ''
batches do |offset|
rows, last_email = @database.fetch_users(last_email)
break if rows.empty?
next if all_records_exist?(:users, rows.map { |row| row['email'] })
create_users(rows, total: total_count, offset: offset) do |row|
{
id: row['email'],
email: row['email'],
name: row['name'],
trust_level: @settings.trust_level,
staged: true,
created_at: to_time(row['date_of_first_message'])
}
end
end
end
def batches
super(@settings.batch_size)
end
def import_posts
puts '', 'creating topics and posts'
total_count = @database.count_messages
last_row_id = 0
batches do |offset|
rows, last_row_id = @database.fetch_messages(last_row_id)
break if rows.empty?
next if all_records_exist?(:posts, rows.map { |row| row['msg_id'] })
create_posts(rows, total: total_count, offset: offset) do |row|
if row['in_reply_to'].blank?
map_first_post(row)
else
map_reply(row)
end
end
end
end
def map_post(row)
user_id = user_id_from_imported_user_id(row['from_email']) || Discourse::SYSTEM_USER_ID
body = row['body'] || ''
body << map_attachments(row['raw_message'], user_id) if row['attachment_count'].positive?
body << Email::Receiver.elided_html(row['elided']) if row['elided'].present?
{
id: row['msg_id'],
user_id: user_id,
created_at: to_time(row['email_date']),
raw: body,
raw_email: row['raw_message'],
via_email: true,
# cook_method: Post.cook_methods[:email] # this is slowing down the import by factor 4
}
end
def map_first_post(row)
mapped = map_post(row)
mapped[:category] = category_id_from_imported_category_id(row['category'])
mapped[:title] = row['subject'].strip[0...255]
mapped
end
def map_reply(row)
parent = @lookup.topic_lookup_from_imported_post_id(row['in_reply_to'])
if parent.blank?
puts "Parent message #{row['in_reply_to']} doesn't exist. Skipping #{row['msg_id']}: #{row['subject'][0..40]}"
return nil
end
mapped = map_post(row)
mapped[:topic_id] = parent[:topic_id]
mapped
end
def map_attachments(raw_message, user_id)
receiver = Email::Receiver.new(raw_message)
attachment_markdown = ''
receiver.attachments.each do |attachment|
tmp = Tempfile.new(['discourse-email-attachment', File.extname(attachment.filename)])
begin
File.open(tmp.path, 'w+b') { |f| f.write attachment.body.decoded }
upload = UploadCreator.new(tmp, attachment.filename).create_for(user_id)
if upload && upload.errors.empty?
attachment_markdown << "\n\n#{receiver.attachment_markdown(upload)}\n\n"
end
ensure
tmp.try(:close!)
end
end
attachment_markdown
end
def to_time(datetime)
Time.zone.at(DateTime.iso8601(datetime)) if datetime
end
end
end

View File

@ -0,0 +1,9 @@
# PostgreSQL mailing lists
#data_dir: /data/import/postgres
#split_regex: "^From .*@postgresql.org.*"
# ruby-talk mailing list
data_dir: /data/import/ruby-talk/news/gmane/comp/lang/ruby
split_regex: ""
default_trust_level: 1

View File

@ -0,0 +1,261 @@
require 'sqlite3'
module ImportScripts::Mbox
class Database
SCHEMA_VERSION = 1
def initialize(directory, batch_size)
@db = SQLite3::Database.new("#{directory}/index.db", results_as_hash: true)
@batch_size = batch_size
configure_database
upgrade_schema_version
create_table_for_categories
create_table_for_imported_files
create_table_for_emails
create_table_for_replies
create_table_for_users
end
def insert_category(category)
@db.execute(<<-SQL, category)
INSERT OR REPLACE INTO category (name, description)
VALUES (:name, :description)
SQL
end
def insert_imported_file(imported_file)
@db.execute(<<-SQL, imported_file)
INSERT OR REPLACE INTO imported_file (category, filename, checksum)
VALUES (:category, :filename, :checksum)
SQL
end
def insert_email(email)
@db.execute(<<-SQL, email)
INSERT OR REPLACE INTO email (msg_id, from_email, from_name, subject,
email_date, raw_message, body, elided, attachment_count, charset,
category, filename, first_line_number, last_line_number)
VALUES (:msg_id, :from_email, :from_name, :subject,
:email_date, :raw_message, :body, :elided, :attachment_count, :charset,
:category, :filename, :first_line_number, :last_line_number)
SQL
end
def insert_replies(msg_id, reply_message_ids)
sql = <<-SQL
INSERT OR REPLACE INTO reply (msg_id, in_reply_to)
VALUES (:msg_id, :in_reply_to)
SQL
@db.prepare(sql) do |stmt|
reply_message_ids.each do |in_reply_to|
stmt.execute(msg_id, in_reply_to)
end
end
end
def update_in_reply_to_of_emails
@db.execute <<-SQL
UPDATE email
SET in_reply_to = (
SELECT e.msg_id
FROM reply r
JOIN email e ON (r.in_reply_to = e.msg_id)
WHERE r.msg_id = email.msg_id
ORDER BY e.email_date DESC
LIMIT 1
)
SQL
end
def sort_emails
@db.execute 'DELETE FROM email_order'
@db.execute <<-SQL
WITH RECURSIVE
messages(msg_id, level, email_date) AS (
SELECT msg_id, 0 AS level, email_date
FROM email
WHERE in_reply_to IS NULL
UNION ALL
SELECT e.msg_id, m.level + 1, e.email_date
FROM email e
JOIN messages m ON e.in_reply_to = m.msg_id
ORDER BY level, email_date, msg_id
)
INSERT INTO email_order (msg_id)
SELECT msg_id
FROM messages
SQL
end
def fill_users_from_emails
@db.execute 'DELETE FROM user'
@db.execute <<-SQL
INSERT INTO user (email, name, date_of_first_message)
SELECT from_email, MIN(from_name) AS from_name, MIN(email_date)
FROM email
WHERE from_email IS NOT NULL
GROUP BY from_email
ORDER BY from_email
SQL
end
def fetch_imported_files(category)
@db.execute(<<-SQL, category)
SELECT filename, checksum
FROM imported_file
WHERE category = :category
SQL
end
def fetch_categories
@db.execute <<-SQL
SELECT name, description
FROM category
ORDER BY name
SQL
end
def count_users
@db.get_first_value <<-SQL
SELECT COUNT(*)
FROM user
SQL
end
def fetch_users(last_email)
rows = @db.execute(<<-SQL, last_email)
SELECT email, name, date_of_first_message
FROM user
WHERE email > :last_email
LIMIT #{@batch_size}
SQL
add_last_column_value(rows, 'email')
end
def count_messages
@db.get_first_value <<-SQL
SELECT COUNT(*)
FROM email
WHERE email_date IS NOT NULL
SQL
end
def fetch_messages(last_row_id)
rows = @db.execute(<<-SQL, last_row_id)
SELECT o.ROWID, e.msg_id, from_email, subject, email_date, in_reply_to,
raw_message, body, elided, attachment_count, category
FROM email e
JOIN email_order o USING (msg_id)
WHERE email_date IS NOT NULL AND
o.ROWID > :last_row_id
ORDER BY o.ROWID
LIMIT #{@batch_size}
SQL
add_last_column_value(rows, 'rowid')
end
private
def configure_database
@db.execute 'PRAGMA journal_mode = TRUNCATE'
end
def upgrade_schema_version
# current_version = query("PRAGMA user_version").last[0]
@db.execute "PRAGMA user_version = #{SCHEMA_VERSION}"
end
def create_table_for_categories
@db.execute <<-SQL
CREATE TABLE IF NOT EXISTS category (
name TEXT NOT NULL PRIMARY KEY,
description TEXT
)
SQL
end
def create_table_for_imported_files
@db.execute <<-SQL
CREATE TABLE IF NOT EXISTS imported_file (
category TEXT NOT NULL,
filename TEXT NOT NULL,
checksum TEXT NOT NULL,
PRIMARY KEY (category, filename),
FOREIGN KEY(category) REFERENCES category(name)
)
SQL
end
def create_table_for_emails
@db.execute <<-SQL
CREATE TABLE IF NOT EXISTS email (
msg_id TEXT NOT NULL PRIMARY KEY,
from_email TEXT,
from_name TEXT,
subject TEXT,
in_reply_to TEXT,
email_date DATETIME,
raw_message TEXT,
body TEXT,
elided TEXT,
attachment_count INTEGER NOT NULL DEFAULT 0,
charset TEXT,
category TEXT NOT NULL,
filename TEXT NOT NULL,
first_line_number INTEGER,
last_line_number INTEGER,
FOREIGN KEY(category) REFERENCES category(name)
)
SQL
@db.execute 'CREATE INDEX IF NOT EXISTS email_by_from ON email (from_email)'
@db.execute 'CREATE INDEX IF NOT EXISTS email_by_in_reply_to ON email (in_reply_to)'
@db.execute 'CREATE INDEX IF NOT EXISTS email_by_date ON email (email_date)'
@db.execute <<-SQL
CREATE TABLE IF NOT EXISTS email_order (
msg_id TEXT NOT NULL PRIMARY KEY
)
SQL
end
def create_table_for_replies
@db.execute <<-SQL
CREATE TABLE IF NOT EXISTS reply (
msg_id TEXT NOT NULL,
in_reply_to TEXT NOT NULL,
PRIMARY KEY (msg_id, in_reply_to),
FOREIGN KEY(msg_id) REFERENCES email(msg_id)
)
SQL
@db.execute 'CREATE INDEX IF NOT EXISTS reply_by_in_reply_to ON reply (in_reply_to)'
end
def create_table_for_users
@db.execute <<-SQL
CREATE TABLE IF NOT EXISTS user (
email TEXT NOT NULL PRIMARY KEY,
name TEXT,
date_of_first_message DATETIME NOT NULL
)
SQL
end
def add_last_column_value(rows, *last_columns)
return rows if last_columns.empty?
result = [rows]
last_row = rows.last
last_columns.each { |column| result.push(last_row ? last_row[column] : nil) }
result
end
end
end

View File

@ -0,0 +1,226 @@
require_relative 'database'
require 'json'
require 'yaml'
module ImportScripts::Mbox
class Indexer
# @param database [ImportScripts::Mbox::Database]
# @param settings [ImportScripts::Mbox::Settings]
def initialize(database, settings)
@database = database
@root_directory = settings.data_dir
@split_regex = settings.split_regex
end
def execute
directories = Dir.glob(File.join(@root_directory, '*'))
directories.select! { |f| File.directory?(f) }
directories.sort!
directories.each do |directory|
puts "indexing files in #{directory}"
category = index_category(directory)
index_emails(directory, category[:name])
end
puts '', 'indexing replies and users'
@database.update_in_reply_to_of_emails
@database.sort_emails
@database.fill_users_from_emails
end
private
METADATA_FILENAME = 'metadata.yml'.freeze
def index_category(directory)
metadata_file = File.join(directory, METADATA_FILENAME)
if File.exist?(metadata_file)
# workaround for YML files that contain classname in file header
yaml = File.read(metadata_file).sub(/^--- !.*$/, '---')
metadata = YAML.load(yaml)
else
metadata = {}
end
category = {
name: metadata['name'].presence || File.basename(directory),
description: metadata['description']
}
@database.insert_category(category)
category
end
def index_emails(directory, category_name)
all_messages(directory, category_name) do |receiver, filename, first_line_number, last_line_number|
msg_id = receiver.message_id
parsed_email = receiver.mail
from_email, from_display_name = receiver.parse_from_field(parsed_email)
body, elided = receiver.select_body
reply_message_ids = extract_reply_message_ids(parsed_email)
email = {
msg_id: msg_id,
from_email: from_email,
from_name: from_display_name,
subject: extract_subject(receiver, category_name),
email_date: parsed_email.date&.to_s,
raw_message: receiver.raw_email,
body: body,
elided: elided,
attachment_count: receiver.attachments.count,
charset: parsed_email.charset&.downcase,
category: category_name,
filename: File.basename(filename),
first_line_number: first_line_number,
last_line_number: last_line_number
}
@database.insert_email(email)
@database.insert_replies(msg_id, reply_message_ids) unless reply_message_ids.empty?
end
end
def imported_file_checksums(category_name)
rows = @database.fetch_imported_files(category_name)
rows.each_with_object({}) do |row, hash|
hash[row['filename']] = row['checksum']
end
end
def all_messages(directory, category_name)
checksums = imported_file_checksums(category_name)
Dir.foreach(directory) do |filename|
filename = File.join(directory, filename)
next if ignored_file?(filename, checksums)
puts "indexing #{filename}"
if @split_regex.present?
each_mail(filename) do |raw_message, first_line_number, last_line_number|
yield read_mail_from_string(raw_message), filename, first_line_number, last_line_number
end
else
yield read_mail_from_file(filename), filename
end
mark_as_fully_indexed(category_name, filename)
end
end
def mark_as_fully_indexed(category_name, filename)
imported_file = {
category: category_name,
filename: filename,
checksum: calc_checksum(filename)
}
@database.insert_imported_file(imported_file)
end
def each_mail(filename)
raw_message = ''
first_line_number = 1
last_line_number = 0
each_line(filename) do |line|
line = line.scrub
if line =~ @split_regex && last_line_number.positive?
yield raw_message, first_line_number, last_line_number
raw_message = ''
first_line_number = last_line_number + 1
else
raw_message << line
end
last_line_number += 1
end
yield raw_message, first_line_number, last_line_number if raw_message.present?
end
def each_line(filename)
raw_file = File.open(filename, 'r')
text_file = filename.end_with?('.gz') ? Zlib::GzipReader.new(raw_file) : raw_file
text_file.each_line do |line|
yield line
end
ensure
raw_file.close if raw_file
end
def read_mail_from_file(filename)
raw_message = File.read(filename)
read_mail_from_string(raw_message)
end
def read_mail_from_string(raw_message)
Email::Receiver.new(raw_message)
end
def extract_reply_message_ids(mail)
message_ids = [mail.in_reply_to, Email::Receiver.extract_references(mail.references)]
message_ids.flatten!
message_ids.select!(&:present?)
message_ids.uniq!
message_ids.first(20)
end
def extract_subject(receiver, list_name)
subject = receiver.subject
return nil if subject.blank?
# TODO: make the list name (or maybe multiple names) configurable
# Strip mailing list name from subject
subject = subject.gsub(/\[#{Regexp.escape(list_name)}\]/, '').strip
clean_subject(subject)
end
# TODO: refactor and move prefixes to settings
def clean_subject(subject)
original_length = subject.length
# Strip Reply prefix from title (Standard and localized)
subject = subject.gsub(/^Re: */i, '')
subject = subject.gsub(/^R: */i, '') #Italian
subject = subject.gsub(/^RIF: */i, '') #Italian
# Strip Forward prefix from title (Standard and localized)
subject = subject.gsub(/^Fwd: */i, '')
subject = subject.gsub(/^I: */i, '') #Italian
subject.strip
# In case of mixed localized prefixes there could be many of them
# if the mail client didn't strip the localized ones
if original_length > subject.length
clean_subject(subject)
else
subject
end
end
def ignored_file?(filename, checksums)
File.directory?(filename) || metadata_file?(filename) || fully_indexed?(filename, checksums)
end
def metadata_file?(filename)
File.basename(filename) == METADATA_FILENAME
end
def fully_indexed?(filename, checksums)
checksum = checksums[filename]
checksum.present? && calc_checksum(filename) == checksum
end
def calc_checksum(filename)
Digest::SHA256.file(filename).hexdigest
end
end
end

View File

@ -0,0 +1,22 @@
require 'yaml'
module ImportScripts::Mbox
class Settings
def self.load(filename)
yaml = YAML.load_file(filename)
Settings.new(yaml)
end
attr_reader :data_dir
attr_reader :split_regex
attr_reader :batch_size
attr_reader :trust_level
def initialize(yaml)
@data_dir = yaml['data_dir']
@split_regex = Regexp.new(yaml['split_regex']) unless yaml['split_regex'].empty?
@batch_size = 1000 # no need to make this actually configurable at the moment
@trust_level = yaml['default_trust_level']
end
end
end