mirror of
https://github.com/discourse/discourse.git
synced 2024-11-27 05:23:37 +08:00
PERF: stream CSV exports so they don't fail when they are too big
This commit is contained in:
parent
a4cd068481
commit
86d50d0017
|
@ -6,56 +6,76 @@ module Jobs
|
||||||
class ExportCsvFile < Jobs::Base
|
class ExportCsvFile < Jobs::Base
|
||||||
include ActionView::Helpers::NumberHelper
|
include ActionView::Helpers::NumberHelper
|
||||||
|
|
||||||
HEADER_ATTRS_FOR = {}
|
|
||||||
HEADER_ATTRS_FOR['user_archive'] = ['topic_title','category','sub_category','is_pm','post','like_count','reply_count','url','created_at']
|
|
||||||
HEADER_ATTRS_FOR['user_list'] = ['id','name','username','email','title','created_at','last_seen_at','last_posted_at','last_emailed_at','trust_level','approved','suspended_at','suspended_till','blocked','active','admin','moderator','ip_address']
|
|
||||||
HEADER_ATTRS_FOR['user_stats'] = ['topics_entered','posts_read_count','time_read','topic_count','post_count','likes_given','likes_received']
|
|
||||||
HEADER_ATTRS_FOR['user_sso'] = ['external_id','external_email', 'external_username', 'external_name', 'external_avatar_url']
|
|
||||||
HEADER_ATTRS_FOR['staff_action'] = ['staff_user','action','subject','created_at','details', 'context']
|
|
||||||
HEADER_ATTRS_FOR['screened_email'] = ['email','action','match_count','last_match_at','created_at','ip_address']
|
|
||||||
HEADER_ATTRS_FOR['screened_ip'] = ['ip_address','action','match_count','last_match_at','created_at']
|
|
||||||
HEADER_ATTRS_FOR['screened_url'] = ['domain','action','match_count','last_match_at','created_at']
|
|
||||||
HEADER_ATTRS_FOR['report'] = ['date', 'value']
|
|
||||||
|
|
||||||
sidekiq_options retry: false
|
sidekiq_options retry: false
|
||||||
attr_accessor :current_user
|
|
||||||
|
HEADER_ATTRS_FOR ||= HashWithIndifferentAccess.new({
|
||||||
|
user_archive: ['topic_title','category','sub_category','is_pm','post','like_count','reply_count','url','created_at'],
|
||||||
|
user_list: ['id','name','username','email','title','created_at','last_seen_at','last_posted_at','last_emailed_at','trust_level','approved','suspended_at','suspended_till','blocked','active','admin','moderator','ip_address'],
|
||||||
|
user_stats: ['topics_entered','posts_read_count','time_read','topic_count','post_count','likes_given','likes_received'],
|
||||||
|
user_sso: ['external_id','external_email', 'external_username', 'external_name', 'external_avatar_url'],
|
||||||
|
staff_action: ['staff_user','action','subject','created_at','details', 'context'],
|
||||||
|
screened_email: ['email','action','match_count','last_match_at','created_at','ip_address'],
|
||||||
|
screened_ip: ['ip_address','action','match_count','last_match_at','created_at'],
|
||||||
|
screened_url: ['domain','action','match_count','last_match_at','created_at'],
|
||||||
|
report: ['date', 'value'],
|
||||||
|
})
|
||||||
|
|
||||||
def execute(args)
|
def execute(args)
|
||||||
@entity = args[:entity]
|
@entity = args[:entity]
|
||||||
@extra = HashWithIndifferentAccess.new(args[:args]) if args[:args]
|
@extra = HashWithIndifferentAccess.new(args[:args]) if args[:args]
|
||||||
@file_name = @entity
|
|
||||||
@current_user = User.find_by(id: args[:user_id])
|
@current_user = User.find_by(id: args[:user_id])
|
||||||
|
|
||||||
export_method = "#{@entity}_export".to_sym
|
export_method = :"#{@entity}_export"
|
||||||
data =
|
raise Discourse::InvalidParameters.new(:entity) unless respond_to?(export_method)
|
||||||
if respond_to?(export_method)
|
|
||||||
send(export_method)
|
|
||||||
else
|
|
||||||
raise Discourse::InvalidParameters.new(:entity)
|
|
||||||
end
|
|
||||||
|
|
||||||
if data && data.length > 0
|
file_name_prefix = if @entity == "user_archive"
|
||||||
set_file_path
|
"#{@entity.split('_').join('-')}-#{@current_user.username}-#{Time.now.strftime("%y%m%d-%H%M%S")}"
|
||||||
write_csv_file(data)
|
else
|
||||||
|
"#{@entity.split('_').join('-')}-#{Time.now.strftime("%y%m%d-%H%M%S")}"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
file = UserExport.create(file_name: file_name_prefix, user_id: @current_user.id)
|
||||||
|
file_name = "#{file_name_prefix}-#{file.id}.csv"
|
||||||
|
absolute_path = "#{UserExport.base_directory}/#{file_name}"
|
||||||
|
|
||||||
|
# ensure directory exists
|
||||||
|
FileUtils.mkdir_p(UserExport.base_directory) unless Dir.exists?(UserExport.base_directory)
|
||||||
|
|
||||||
|
# write to CSV file
|
||||||
|
CSV.open(absolute_path, "w") do |csv|
|
||||||
|
csv << get_header
|
||||||
|
send(export_method).each { |d| csv << d }
|
||||||
|
end
|
||||||
|
|
||||||
|
# compress CSV file
|
||||||
|
`gzip -5 #{absolute_path}`
|
||||||
|
|
||||||
ensure
|
ensure
|
||||||
notify_user
|
notify_user(file_name, absolute_path)
|
||||||
end
|
end
|
||||||
|
|
||||||
def user_archive_export
|
def user_archive_export
|
||||||
user_archive_data = Post.includes(:topic => :category).where(user_id: @current_user.id).select(:topic_id, :post_number, :raw, :like_count, :reply_count, :created_at).order(:created_at).with_deleted.to_a
|
return enum_for(:user_archive_export) unless block_given?
|
||||||
user_archive_data.map do |user_archive|
|
|
||||||
get_user_archive_fields(user_archive)
|
Post.includes(topic: :category)
|
||||||
|
.where(user_id: @current_user.id)
|
||||||
|
.select(:topic_id, :post_number, :raw, :like_count, :reply_count, :created_at)
|
||||||
|
.order(:created_at)
|
||||||
|
.with_deleted
|
||||||
|
.each do |user_archive|
|
||||||
|
yield get_user_archive_fields(user_archive)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def user_list_export
|
def user_list_export
|
||||||
|
return enum_for(:user_list_export) unless block_given?
|
||||||
|
|
||||||
user_array = []
|
user_array = []
|
||||||
user_field_ids = UserField.pluck(:id)
|
user_field_ids = UserField.pluck(:id)
|
||||||
|
|
||||||
condition = {}
|
condition = {}
|
||||||
if @extra && @extra[:trust_level] && trust_level = TrustLevel.levels[@extra[:trust_level].to_sym]
|
if @extra && @extra[:trust_level] && trust_level = TrustLevel.levels[@extra[:trust_level].to_sym]
|
||||||
condition = {trust_level: trust_level}
|
condition = { trust_level: trust_level }
|
||||||
end
|
end
|
||||||
|
|
||||||
if SiteSetting.enable_sso
|
if SiteSetting.enable_sso
|
||||||
|
@ -65,8 +85,7 @@ module Jobs
|
||||||
user_info_string = add_single_sign_on(user, user_info_string)
|
user_info_string = add_single_sign_on(user, user_info_string)
|
||||||
user_info_string = add_custom_fields(user, user_info_string, user_field_ids)
|
user_info_string = add_custom_fields(user, user_info_string, user_field_ids)
|
||||||
user_info_string = add_group_names(user, user_info_string)
|
user_info_string = add_group_names(user, user_info_string)
|
||||||
user_array.push(user_info_string.split(","))
|
yield user_info_string.split(",")
|
||||||
user_info_string = nil
|
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
# SSO disabled
|
# SSO disabled
|
||||||
|
@ -74,78 +93,78 @@ module Jobs
|
||||||
user_info_string = get_base_user_string(user)
|
user_info_string = get_base_user_string(user)
|
||||||
user_info_string = add_custom_fields(user, user_info_string, user_field_ids)
|
user_info_string = add_custom_fields(user, user_info_string, user_field_ids)
|
||||||
user_info_string = add_group_names(user, user_info_string)
|
user_info_string = add_group_names(user, user_info_string)
|
||||||
user_array.push(user_info_string.split(","))
|
yield user_info_string.split(",")
|
||||||
user_info_string = nil
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
user_field_ids = nil
|
|
||||||
user_array
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def staff_action_export
|
def staff_action_export
|
||||||
if @current_user.admin?
|
return enum_for(:staff_action_export) unless block_given?
|
||||||
staff_action_data = UserHistory.only_staff_actions.order('id DESC').to_a
|
|
||||||
|
staff_action_data = if @current_user.admin?
|
||||||
|
UserHistory.only_staff_actions.order('id DESC')
|
||||||
else
|
else
|
||||||
# moderator
|
UserHistory.where(admin_only: false).only_staff_actions.order('id DESC')
|
||||||
staff_action_data = UserHistory.where(admin_only: false).only_staff_actions.order('id DESC').to_a
|
|
||||||
end
|
end
|
||||||
|
|
||||||
staff_action_data.map do |staff_action|
|
staff_action_data.each do |staff_action|
|
||||||
get_staff_action_fields(staff_action)
|
yield get_staff_action_fields(staff_action)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def screened_email_export
|
def screened_email_export
|
||||||
screened_email_data = ScreenedEmail.order('last_match_at desc').to_a
|
return enum_for(:screened_email_export) unless block_given?
|
||||||
screened_email_data.map do |screened_email|
|
|
||||||
get_screened_email_fields(screened_email)
|
ScreenedEmail.order('last_match_at DESC').each do |screened_email|
|
||||||
|
yield get_screened_email_fields(screened_email)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def screened_ip_export
|
def screened_ip_export
|
||||||
screened_ip_data = ScreenedIpAddress.order('id desc').to_a
|
return enum_for(:screened_ip_export) unless block_given?
|
||||||
screened_ip_data.map do |screened_ip|
|
|
||||||
get_screened_ip_fields(screened_ip)
|
ScreenedIpAddress.order('id DESC').each do |screened_ip|
|
||||||
|
yield get_screened_ip_fields(screened_ip)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def screened_url_export
|
def screened_url_export
|
||||||
screened_url_data = ScreenedUrl.select("domain, sum(match_count) as match_count, max(last_match_at) as last_match_at, min(created_at) as created_at").group(:domain).order('last_match_at DESC').to_a
|
return enum_for(:screened_url_export) unless block_given?
|
||||||
screened_url_data.map do |screened_url|
|
|
||||||
get_screened_url_fields(screened_url)
|
ScreenedUrl.select("domain, sum(match_count) as match_count, max(last_match_at) as last_match_at, min(created_at) as created_at")
|
||||||
|
.group(:domain)
|
||||||
|
.order('last_match_at DESC')
|
||||||
|
.each do |screened_url|
|
||||||
|
yield get_screened_url_fields(screened_url)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def report_export
|
def report_export
|
||||||
@extra[:start_date] = @extra[:start_date].to_date if @extra[:start_date].is_a?(String)
|
return enum_for(:report_export) unless block_given?
|
||||||
@extra[:end_date] = @extra[:end_date].to_date if @extra[:end_date].is_a?(String)
|
|
||||||
@extra[:category_id] = @extra[:category_id].to_i if @extra[:category_id]
|
@extra[:start_date] = @extra[:start_date].to_date if @extra[:start_date].is_a?(String)
|
||||||
@extra[:group_id] = @extra[:group_id].to_i if @extra[:group_id]
|
@extra[:end_date] = @extra[:end_date].to_date if @extra[:end_date].is_a?(String)
|
||||||
r = Report.find(@extra[:name], @extra)
|
@extra[:category_id] = @extra[:category_id].to_i if @extra[:category_id]
|
||||||
r.data.map do |row|
|
@extra[:group_id] = @extra[:group_id].to_i if @extra[:group_id]
|
||||||
[row[:x].to_s(:db), row[:y].to_s(:db)]
|
Report.find(@extra[:name], @extra).data.each do |row|
|
||||||
|
yield [row[:x].to_s(:db), row[:y].to_s(:db)]
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def get_header
|
def get_header
|
||||||
|
if @entity == 'user_list'
|
||||||
case @entity
|
header_array = HEADER_ATTRS_FOR['user_list'] + HEADER_ATTRS_FOR['user_stats']
|
||||||
when 'user_list'
|
header_array.concat(HEADER_ATTRS_FOR['user_sso']) if SiteSetting.enable_sso
|
||||||
header_array = HEADER_ATTRS_FOR['user_list'] + HEADER_ATTRS_FOR['user_stats']
|
user_custom_fields = UserField.all
|
||||||
if SiteSetting.enable_sso
|
if user_custom_fields.present?
|
||||||
header_array.concat(HEADER_ATTRS_FOR['user_sso'])
|
user_custom_fields.each do |custom_field|
|
||||||
|
header_array.push("#{custom_field.name} (custom user field)")
|
||||||
end
|
end
|
||||||
user_custom_fields = UserField.all
|
|
||||||
if user_custom_fields.present?
|
|
||||||
user_custom_fields.each do |custom_field|
|
|
||||||
header_array.push("#{custom_field.name} (custom user field)")
|
|
||||||
end
|
|
||||||
end
|
|
||||||
header_array.push("group_names")
|
|
||||||
else
|
|
||||||
header_array = HEADER_ATTRS_FOR[@entity]
|
|
||||||
end
|
end
|
||||||
|
header_array.push("group_names")
|
||||||
|
else
|
||||||
|
header_array = HEADER_ATTRS_FOR[@entity]
|
||||||
|
end
|
||||||
|
|
||||||
header_array
|
header_array
|
||||||
end
|
end
|
||||||
|
@ -227,7 +246,7 @@ module Jobs
|
||||||
user.username if !user.nil?
|
user.username if !user.nil?
|
||||||
elsif attr == 'subject'
|
elsif attr == 'subject'
|
||||||
user = User.find_by(id: staff_action.attributes['target_user_id'])
|
user = User.find_by(id: staff_action.attributes['target_user_id'])
|
||||||
user.nil? ? staff_action.attributes[attr] : "#{user.username} #{staff_action.attributes[attr]}"
|
user.nil? ? staff_action.attributes[attr] : "#{user.username} #{staff_action.attributes[attr]}"
|
||||||
else
|
else
|
||||||
staff_action.attributes[attr]
|
staff_action.attributes[attr]
|
||||||
end
|
end
|
||||||
|
@ -289,43 +308,20 @@ module Jobs
|
||||||
screened_url_array
|
screened_url_array
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def notify_user(file_name, absolute_path)
|
||||||
def set_file_path
|
|
||||||
if @entity == "user_archive"
|
|
||||||
file_name_prefix = "#{@file_name.split('_').join('-')}-#{current_user.username}-#{Time.now.strftime("%y%m%d-%H%M%S")}"
|
|
||||||
else
|
|
||||||
file_name_prefix = "#{@file_name.split('_').join('-')}-#{Time.now.strftime("%y%m%d-%H%M%S")}"
|
|
||||||
end
|
|
||||||
@file = UserExport.create(file_name: file_name_prefix, user_id: @current_user.id)
|
|
||||||
@file_name = "#{file_name_prefix}-#{@file.id}.csv"
|
|
||||||
|
|
||||||
# ensure directory exists
|
|
||||||
dir = File.dirname("#{UserExport.base_directory}/#{@file_name}")
|
|
||||||
FileUtils.mkdir_p(dir) unless Dir.exists?(dir)
|
|
||||||
end
|
|
||||||
|
|
||||||
def write_csv_file(data)
|
|
||||||
# write to CSV file
|
|
||||||
CSV.open(File.expand_path("#{UserExport.base_directory}/#{@file_name}", __FILE__), "w") do |csv|
|
|
||||||
csv << get_header
|
|
||||||
data.each do |value|
|
|
||||||
csv << value
|
|
||||||
end
|
|
||||||
end
|
|
||||||
# compress CSV file
|
|
||||||
`gzip -5 #{File.expand_path("#{UserExport.base_directory}/#{@file_name}", __FILE__)}`
|
|
||||||
end
|
|
||||||
|
|
||||||
def notify_user
|
|
||||||
if @current_user
|
if @current_user
|
||||||
if @file_name != "" && File.exists?("#{UserExport.base_directory}/#{@file_name}.gz")
|
if file_name.present? && File.exists?("#{absolute_path}.gz")
|
||||||
SystemMessage.create_from_system_user(@current_user, :csv_export_succeeded, download_link: "#{Discourse.base_uri}/export_csv/#{@file_name}.gz", file_name: "#{@file_name}.gz", file_size: number_to_human_size(File.size("#{UserExport.base_directory}/#{@file_name}.gz")))
|
SystemMessage.create_from_system_user(
|
||||||
|
@current_user,
|
||||||
|
:csv_export_succeeded,
|
||||||
|
download_link: "#{Discourse.base_uri}/export_csv/#{file_name}.gz",
|
||||||
|
file_name: "#{file_name}.gz",
|
||||||
|
file_size: number_to_human_size(File.size("#{absolute_path}.gz"))
|
||||||
|
)
|
||||||
else
|
else
|
||||||
SystemMessage.create_from_system_user(@current_user, :csv_export_failed)
|
SystemMessage.create_from_system_user(@current_user, :csv_export_failed)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
|
@ -2,23 +2,16 @@ class UserExport < ActiveRecord::Base
|
||||||
|
|
||||||
def self.get_download_path(filename)
|
def self.get_download_path(filename)
|
||||||
path = File.join(UserExport.base_directory, filename)
|
path = File.join(UserExport.base_directory, filename)
|
||||||
if File.exists?(path)
|
File.exists?(path) ? path : nil
|
||||||
return path
|
|
||||||
else
|
|
||||||
nil
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.remove_old_exports
|
def self.remove_old_exports
|
||||||
expired_exports = UserExport.where('created_at < ?', 2.days.ago).to_a
|
UserExport.where('created_at < ?', 2.days.ago).find_each do |expired_export|
|
||||||
expired_exports.map do |expired_export|
|
|
||||||
file_name = "#{expired_export.file_name}-#{expired_export.id}.csv.gz"
|
file_name = "#{expired_export.file_name}-#{expired_export.id}.csv.gz"
|
||||||
file_path = "#{UserExport.base_directory}/#{file_name}"
|
file_path = "#{UserExport.base_directory}/#{file_name}"
|
||||||
|
|
||||||
if File.exist?(file_path)
|
File.delete(file_path) if File.exist?(file_path)
|
||||||
File.delete(file_path)
|
expired_export.destroy
|
||||||
end
|
|
||||||
UserExport.find(expired_export.id).destroy
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user