# frozen_string_literal: true

require "digest"

module Email
  class Receiver
    # If you add a new error, you need to
    #   * add it to Email::Processor#handle_failure()
    #   * add text to server.en.yml (parent key: "emails.incoming.errors")
    class ProcessingError < StandardError
    end
    class EmptyEmailError < ProcessingError
    end
    class ScreenedEmailError < ProcessingError
    end
    class UserNotFoundError < ProcessingError
    end
    class AutoGeneratedEmailError < ProcessingError
    end
    class BouncedEmailError < ProcessingError
    end
    class NoBodyDetectedError < ProcessingError
    end
    class NoSenderDetectedError < ProcessingError
    end
    class FromReplyByAddressError < ProcessingError
    end
    class InactiveUserError < ProcessingError
    end
    class SilencedUserError < ProcessingError
    end
    class BadDestinationAddress < ProcessingError
    end
    class StrangersNotAllowedError < ProcessingError
    end
    class ReplyNotAllowedError < ProcessingError
    end
    class InsufficientTrustLevelError < ProcessingError
    end
    class ReplyUserNotMatchingError < ProcessingError
    end
    class TopicNotFoundError < ProcessingError
    end
    class TopicClosedError < ProcessingError
    end
    class InvalidPost < ProcessingError
    end
    class TooShortPost < ProcessingError
    end
    class InvalidPostAction < ProcessingError
    end
    class UnsubscribeNotAllowed < ProcessingError
    end
    class EmailNotAllowed < ProcessingError
    end
    class OldDestinationError < ProcessingError
    end
    class ReplyToDigestError < ProcessingError
    end

    class TooManyRecipientsError < ProcessingError
      attr_reader :recipients_count

      def initialize(recipients_count:)
        @recipients_count = recipients_count
      end
    end

    attr_reader :incoming_email
    attr_reader :raw_email
    attr_reader :mail
    attr_reader :message_id

    COMMON_ENCODINGS ||= [-"utf-8", -"windows-1252", -"iso-8859-1"]

    def self.formats
      @formats ||= Enum.new(plaintext: 1, markdown: 2)
    end

    def initialize(mail_string, opts = {})
      raise EmptyEmailError if mail_string.blank?
      @staged_users = []
      @created_staged_users = []
      @raw_email = mail_string

      COMMON_ENCODINGS.each do |encoding|
        fixed = try_to_encode(mail_string, encoding)
        break @raw_email = fixed if fixed.present?
      end

      @mail = Mail.new(@raw_email)
      @message_id = @mail.message_id.presence || Digest::MD5.hexdigest(mail_string)
      @opts = opts
      @destinations ||= opts[:destinations]
    end

    def process!
      return if is_blocked?

      id_hash = Digest::SHA1.hexdigest(@message_id)

      DistributedMutex.synchronize("process_email_#{id_hash}") do
        begin
          # If we find an existing incoming email record with the exact same `message_id`
          # do not create a new `IncomingEmail` record to avoid double ups.
          return if @incoming_email = find_existing_and_update_imap

          Email::Validator.ensure_valid!(@mail)

          @from_email, @from_display_name = parse_from_field(@mail)
          @from_user = User.find_by_email(@from_email)
          @incoming_email = create_incoming_email

          post = process_internal

          raise BouncedEmailError if is_bounce?

          post
        rescue Exception => e
          @incoming_email.update_columns(error: e.class.name) if @incoming_email
          delete_created_staged_users
          raise
        end
      end
    end

    def find_existing_and_update_imap
      return unless incoming_email = IncomingEmail.find_by(message_id: @message_id)

      # If we are not doing this for IMAP purposes just return the record.
      return incoming_email if @opts[:imap_uid].blank?

      # If the message_id matches the post id regexp then we
      # generated the message_id not the imap server, e.g. in GroupSmtpEmail,
      # so we want to update the incoming email because it will
      # be missing IMAP details.
      #
      # Otherwise the incoming email is a completely new one from the IMAP
      # server (e.g. a message_id generated by Gmail) and does not need to
      # be updated, because message_ids from the IMAP server are not guaranteed
      # to be unique.
      return unless Email::MessageIdService.discourse_generated_message_id?(@message_id)

      incoming_email.update(
        imap_uid_validity: @opts[:imap_uid_validity],
        imap_uid: @opts[:imap_uid],
        imap_group_id: @opts[:imap_group_id],
        imap_sync: false,
      )

      incoming_email
    end

    def is_blocked?
      return false if SiteSetting.ignore_by_title.blank?
      Regexp.new(SiteSetting.ignore_by_title, Regexp::IGNORECASE) =~ @mail.subject
    end

    def create_incoming_email
      cc_addresses = Array.wrap(@mail.cc)
      cc_addresses.concat(embedded_email.cc) if has_been_forwarded? && embedded_email&.cc
      IncomingEmail.create(
        message_id: @message_id,
        raw: Email::Cleaner.new(@raw_email).execute,
        subject: subject,
        from_address: @from_email,
        to_addresses: @mail.to,
        cc_addresses: cc_addresses,
        imap_uid_validity: @opts[:imap_uid_validity],
        imap_uid: @opts[:imap_uid],
        imap_group_id: @opts[:imap_group_id],
        imap_sync: false,
        created_via: IncomingEmail.created_via_types[@opts[:source] || :unknown],
      )
    end

    def process_internal
      handle_bounce if is_bounce?
      raise NoSenderDetectedError if @from_email.blank?
      raise FromReplyByAddressError if is_from_reply_by_email_address?
      raise ScreenedEmailError if ScreenedEmail.should_block?(@from_email)

      user = @from_user

      if user.present?
        log_and_validate_user(user)
      else
        raise UserNotFoundError unless SiteSetting.enable_staged_users
      end

      recipients = get_all_recipients(@mail)
      if recipients.size > SiteSetting.maximum_recipients_per_new_group_email
        raise TooManyRecipientsError.new(recipients_count: recipients.size)
      end

      body, elided = select_body
      body ||= ""

      raise NoBodyDetectedError if body.blank? && attachments.empty? && !is_bounce?

      if is_auto_generated? && !sent_to_mailinglist_mirror?
        @incoming_email.update_columns(is_auto_generated: true)

        if SiteSetting.block_auto_generated_emails? && !is_bounce? && !@opts[:allow_auto_generated]
          raise AutoGeneratedEmailError
        end
      end

      if action = subscription_action_for(body, subject)
        raise UnsubscribeNotAllowed if user.nil?
        send_subscription_mail(action, user)
        return
      end

      if post = find_related_post
        # Most of the time, it is impossible to **reply** without a reply key, so exit early
        if user.blank?
          if sent_to_mailinglist_mirror? || !SiteSetting.find_related_post_with_key
            user = stage_from_user
          elsif user.blank?
            raise BadDestinationAddress
          end
        end

        create_reply(
          user: user,
          raw: body,
          elided: elided,
          post: post,
          topic: post.topic,
          skip_validations: user.staged?,
          bounce: is_bounce?,
        )
      else
        first_exception = nil

        destinations.each do |destination|
          begin
            return process_destination(destination, user, body, elided)
          rescue => e
            first_exception ||= e
          end
        end

        raise first_exception if first_exception

        # We don't stage new users for emails to reply addresses, exit if user is nil
        raise BadDestinationAddress if user.blank?

        # We only get here if there are no destinations (the email is not going to
        # a Category, Group, or PostReplyKey)
        post = find_related_post(force: true)

        if post && Guardian.new(user).can_see_post?(post)
          if destination_too_old?(post)
            raise OldDestinationError.new("#{Discourse.base_url}/p/#{post.id}")
          end
        end

        if EmailLog.where(email_type: "digest", message_id: @mail.in_reply_to).exists?
          raise ReplyToDigestError
        end
        raise BadDestinationAddress
      end
    end

    def log_and_validate_user(user)
      @incoming_email.update_columns(user_id: user.id)

      raise InactiveUserError if !user.active && !user.staged
      raise SilencedUserError if user.silenced?
    end

    def get_all_recipients(mail)
      recipients = Set.new

      %i[to cc bcc].each do |field|
        next if mail[field].blank?

        mail[field].each do |address_field|
          begin
            address_field.decoded
            recipients << address_field.address.downcase
          end
        end
      end

      recipients
    end

    def is_bounce?
      @mail.bounced? || bounce_key
    end

    def handle_bounce
      @incoming_email.update_columns(is_bounce: true)
      mail_error_statuses = Array.wrap(@mail.error_status)

      if email_log.present?
        email_log.update_columns(bounced: true, bounce_error_code: mail_error_statuses.first)
        post = email_log.post
        topic = email_log.topic
      end

      DiscourseEvent.trigger(:email_bounce, @mail, @incoming_email, @email_log)

      if mail_error_statuses.any? { |s| s.start_with?(Email::SMTP_STATUS_TRANSIENT_FAILURE) }
        Email::Receiver.update_bounce_score(@from_email, SiteSetting.soft_bounce_score)
      else
        Email::Receiver.update_bounce_score(@from_email, SiteSetting.hard_bounce_score)
      end

      if SiteSetting.whispers_allowed_groups.present? && @from_user&.staged?
        return if email_log.blank?

        if post.present? && topic.present? && topic.archetype == Archetype.private_message
          body, elided = select_body
          body ||= ""

          create_reply(
            user: @from_user,
            raw: body,
            elided: elided,
            post: post,
            topic: topic,
            skip_validations: true,
            bounce: true,
          )
        end
      end

      raise BouncedEmailError
    end

    def is_from_reply_by_email_address?
      Email::Receiver.reply_by_email_address_regex.match(@from_email)
    end

    def bounce_key
      @bounce_key ||=
        begin
          verp = all_destinations.select { |to| to[/\+verp-\h{32}@/] }.first
          verp && verp[/\+verp-(\h{32})@/, 1]
        end
    end

    def email_log
      return nil if bounce_key.blank?
      @email_log ||= EmailLog.find_by(bounce_key: bounce_key)
    end

    def self.update_bounce_score(email, score)
      if user = User.find_by_email(email)
        old_bounce_score = user.user_stat.bounce_score
        new_bounce_score = old_bounce_score + score
        range = (old_bounce_score + 1..new_bounce_score)

        user.user_stat.bounce_score = new_bounce_score
        user.user_stat.reset_bounce_score_after =
          SiteSetting.reset_bounce_score_after_days.days.from_now
        user.user_stat.save!

        if range === SiteSetting.bounce_score_threshold
          # NOTE: we check bounce_score before sending emails
          # So log we revoked the email...
          reason =
            I18n.t(
              "user.email.revoked",
              email: user.email,
              date: user.user_stat.reset_bounce_score_after,
            )
          StaffActionLogger.new(Discourse.system_user).log_revoke_email(user, reason)
          # ... and PM the user
          SystemMessage.create_from_system_user(user, :email_revoked)
        end
      end
    end

    def is_auto_generated?
      return false if SiteSetting.auto_generated_allowlist.split("|").include?(@from_email)
      @mail[:precedence].to_s[/list|junk|bulk|auto_reply/i] ||
        @mail[:from].to_s[/(mailer[\-_]?daemon|post[\-_]?master|no[\-_]?reply)@/i] ||
        @mail[:subject].to_s[
          /\A\s*(Auto:|Automatic reply|Autosvar|Automatisk svar|Automatisch antwoord|Abwesenheitsnotiz|Risposta Non al computer|Automatisch antwoord|Auto Response|Respuesta automática|Fuori sede|Out of Office|Frånvaro|Réponse automatique)/i
        ] ||
        @mail.header.reject { |h| h.name.downcase == "x-auto-response-suppress" }.to_s[
          /auto[\-_]?(response|submitted|replied|reply|generated|respond)|holidayreply|machinegenerated/i
        ]
    end

    def is_spam?
      case SiteSetting.email_in_spam_header
      when "X-Spam-Flag"
        @mail[:x_spam_flag].to_s[/YES/i]
      when "X-Spam-Status"
        @mail[:x_spam_status].to_s[/\AYes, /i]
      when "X-SES-Spam-Verdict"
        @mail[:x_ses_spam_verdict].to_s[/FAIL/i]
      else
        false
      end
    end

    def auth_res_action
      @auth_res_action ||= AuthenticationResults.new(@mail.header[:authentication_results]).action
    end

    def select_body
      text = nil
      html = nil
      text_content_type = nil

      if @mail.multipart?
        text = fix_charset(@mail.text_part)
        html = fix_charset(@mail.html_part)
        text_content_type = @mail.text_part&.content_type
      elsif @mail.content_type.to_s["text/html"]
        html = fix_charset(@mail)
      elsif @mail.content_type.blank? || @mail.content_type["text/plain"]
        text = fix_charset(@mail)
        text_content_type = @mail.content_type
      end

      return unless text.present? || html.present?

      if text.present?
        text = trim_discourse_markers(text)
        text, elided_text = trim_reply_and_extract_elided(text)

        if @opts[:convert_plaintext] || sent_to_mailinglist_mirror?
          text_content_type ||= ""
          converter_opts = {
            format_flowed: !!(text_content_type =~ /format\s*=\s*["']?flowed["']?/i),
            delete_flowed_space: !!(text_content_type =~ /DelSp\s*=\s*["']?yes["']?/i),
          }
          text = PlainTextToMarkdown.new(text, converter_opts).to_markdown
          elided_text = PlainTextToMarkdown.new(elided_text, converter_opts).to_markdown
        end
      end

      markdown, elided_markdown =
        if html.present?
          # use the first html extracter that matches
          if html_extracter = HTML_EXTRACTERS.select { |_, r| html[r] }.min_by { |_, r| html =~ r }
            doc = Nokogiri::HTML5.fragment(html)
            self.public_send(:"extract_from_#{html_extracter[0]}", doc)
          else
            markdown =
              HtmlToMarkdown.new(html, keep_img_tags: true, keep_cid_imgs: true).to_markdown
            markdown = trim_discourse_markers(markdown)
            trim_reply_and_extract_elided(markdown)
          end
        end

      text_format = Receiver.formats[:plaintext]
      if text.blank? || (SiteSetting.incoming_email_prefer_html && markdown.present?)
        text, elided_text, text_format = markdown, elided_markdown, Receiver.formats[:markdown]
      end

      if SiteSetting.strip_incoming_email_lines && text.present?
        in_code = nil

        text =
          text
            .lines
            .map! do |line|
              stripped = line.strip << "\n"

              # Do not strip list items.
              if (stripped[0] == "*" || stripped[0] == "-" || stripped[0] == "+") &&
                   stripped[1] == " "
                next line
              end

              # Match beginning and ending of code blocks.
              if !in_code && stripped[0..2] == "```"
                in_code = "```"
              elsif in_code == "```" && stripped[0..2] == "```"
                in_code = nil
              elsif !in_code && stripped[0..4] == "[code"
                in_code = "[code]"
              elsif in_code == "[code]" && stripped[0..6] == "[/code]"
                in_code = nil
              end

              # Strip only lines outside code blocks.
              in_code ? line : stripped
            end
            .join
      end

      [text, elided_text, text_format]
    end

    def to_markdown(html, elided_html)
      markdown = HtmlToMarkdown.new(html, keep_img_tags: true, keep_cid_imgs: true).to_markdown
      elided_markdown =
        HtmlToMarkdown.new(elided_html, keep_img_tags: true, keep_cid_imgs: true).to_markdown
      [EmailReplyTrimmer.trim(markdown), elided_markdown]
    end

    HTML_EXTRACTERS ||= [
      [:gmail, /class="gmail_(signature|extra)/],
      [:outlook, /id="(divRplyFwdMsg|Signature)"/],
      [:word, /class="WordSection1"/],
      [:exchange, /name="message(Body|Reply)Section"/],
      [:apple_mail, /id="AppleMailSignature"/],
      [:mozilla, /class="moz-/],
      [:protonmail, /class="protonmail_/],
      [:zimbra, /data-marker="__/],
      [:newton, /(id|class)="cm_/],
      [:front, /class="front-/],
    ]

    def extract_from_gmail(doc)
      # GMail adds a bunch of 'gmail_' prefixed classes like: gmail_signature, gmail_extra, gmail_quote, gmail_default...
      elided = doc.css(".gmail_signature, .gmail_extra").remove
      to_markdown(doc.to_html, elided.to_html)
    end

    def extract_from_outlook(doc)
      # Outlook properly identifies the signature and any replied/forwarded email
      # Use their id to remove them and anything that comes after
      elided = doc.css("#Signature, #Signature ~ *, hr, #divRplyFwdMsg, #divRplyFwdMsg ~ *").remove
      to_markdown(doc.to_html, elided.to_html)
    end

    def extract_from_word(doc)
      # Word (?) keeps the content in the 'WordSection1' class and uses <p> tags
      # When there's something else (<table>, <div>, etc..) there's high chance it's a signature or forwarded email
      elided =
        doc.css(
          ".WordSection1 > :not(p):not(ul):first-of-type, .WordSection1 > :not(p):not(ul):first-of-type ~ *",
        ).remove
      to_markdown(doc.at(".WordSection1").to_html, elided.to_html)
    end

    def extract_from_exchange(doc)
      # Exchange is using the 'messageReplySection' class for forwarded emails
      # And 'messageBodySection' for the actual email
      elided = doc.css("div[name='messageReplySection']").remove
      to_markdown(doc.css("div[name='messageReplySection']").to_html, elided.to_html)
    end

    def extract_from_apple_mail(doc)
      # AppleMail is the worst. It adds 'AppleMailSignature' ids (!) to several div/p with no deterministic rules
      # Our best guess is to elide whatever comes after that.
      elided = doc.css("#AppleMailSignature:last-of-type ~ *").remove
      to_markdown(doc.to_html, elided.to_html)
    end

    def extract_from_mozilla(doc)
      # Mozilla (Thunderbird ?) properly identifies signature and forwarded emails
      # Remove them and anything that comes after
      elided =
        doc.css(
          "*[class^='moz-cite'], *[class^='moz-cite'] ~ *, " \
            "*[class^='moz-signature'], *[class^='moz-signature'] ~ *, " \
            "*[class^='moz-forward'], *[class^='moz-forward'] ~ *",
        ).remove
      to_markdown(doc.to_html, elided.to_html)
    end

    def extract_from_protonmail(doc)
      # Removes anything that has a class starting with "protonmail_" and everything after that
      elided = doc.css("*[class^='protonmail_'], *[class^='protonmail_'] ~ *").remove
      to_markdown(doc.to_html, elided.to_html)
    end

    def extract_from_zimbra(doc)
      # Removes anything that has a 'data-marker' attribute
      elided = doc.css("*[data-marker]").remove
      to_markdown(doc.to_html, elided.to_html)
    end

    def extract_from_newton(doc)
      # Removes anything that has an id or a class starting with 'cm_'
      elided = doc.css("*[id^='cm_'], *[class^='cm_']").remove
      to_markdown(doc.to_html, elided.to_html)
    end

    def extract_from_front(doc)
      # Removes anything that has a class starting with 'front-'
      elided = doc.css("*[class^='front-']").remove
      to_markdown(doc.to_html, elided.to_html)
    end

    def trim_reply_and_extract_elided(text)
      return text, "" if @opts[:skip_trimming] || !SiteSetting.trim_incoming_emails
      EmailReplyTrimmer.trim(text, true)
    end

    def fix_charset(mail_part)
      return nil if mail_part.blank? || mail_part.body.blank?

      string =
        begin
          mail_part.body.decoded
        rescue StandardError
          nil
        end

      return nil if string.blank?

      # common encodings
      encodings = COMMON_ENCODINGS.dup
      encodings.unshift(mail_part.charset) if mail_part.charset.present?

      # mail (>=2.5) decodes mails with 8bit transfer encoding to utf-8, so always try UTF-8 first
      if mail_part.content_transfer_encoding == "8bit"
        encodings.delete("UTF-8")
        encodings.unshift("UTF-8")
      end

      encodings.uniq.each do |encoding|
        fixed = try_to_encode(string, encoding)
        return fixed if fixed.present?
      end

      nil
    end

    def try_to_encode(string, encoding)
      encoded = string.encode("UTF-8", encoding)
      !encoded.nil? && encoded.valid_encoding? ? encoded : nil
    rescue Encoding::InvalidByteSequenceError,
           Encoding::UndefinedConversionError,
           Encoding::ConverterNotFoundError
      nil
    end

    def previous_replies_regex
      strings =
        I18n
          .available_locales
          .map do |locale|
            I18n.with_locale(locale) { I18n.t("user_notifications.previous_discussion") }
          end
          .uniq

      @previous_replies_regex ||=
        /\A--[- ]\n\*(?:#{strings.map { |x| Regexp.escape(x) }.join("|")})\*\n/im
    end

    def reply_above_line_regex
      strings =
        I18n
          .available_locales
          .map do |locale|
            I18n.with_locale(locale) { I18n.t("user_notifications.reply_above_line") }
          end
          .uniq

      @reply_above_line_regex ||= /\n(?:#{strings.map { |x| Regexp.escape(x) }.join("|")})\n/im
    end

    def trim_discourse_markers(reply)
      return "" if reply.blank?
      reply = reply.split(previous_replies_regex)[0]
      reply.split(reply_above_line_regex)[0]
    end

    def parse_from_field(mail, process_forwarded_emails: true)
      if email_log.present?
        email = email_log.to_address || email_log.user&.email
        return email, email_log.user&.username
      elsif mail.bounced?
        # "Final-Recipient" has a specific format (<name> ; <address>)
        # cf. https://www.ietf.org/rfc/rfc2298.html#section-3.2.4
        address_type, generic_address = mail.final_recipient.to_s.split(";").map { _1.to_s.strip }
        return generic_address, nil if generic_address.include?("@") && address_type == "rfc822"
      end

      return unless mail[:from]

      # For forwarded emails, where the from address matches a group incoming
      # email, we want to use the from address of the original email sender,
      # which we can extract from embedded_email_raw.
      if process_forwarded_emails && has_been_forwarded?
        if mail[:from].to_s =~ group_incoming_emails_regex
          if embedded_email && embedded_email[:from].errors.blank?
            from_address, from_display_name =
              Email::Receiver.extract_email_address_and_name(embedded_email[:from])
            return from_address, from_display_name if from_address
          end
        end
      end

      # extract proper sender when using mailman mailing list
      if mail[:x_mailman_version].present?
        address, name = Email::Receiver.extract_email_address_and_name_from_mailman(mail)
        return address, name if address
      end

      # For now we are only using the Reply-To header if the email has
      # been forwarded via Google Groups, which is why we are checking the
      # X-Original-From header too. In future we may want to use the Reply-To
      # header in more cases.
      if mail[:x_original_from].present? && mail[:reply_to].present?
        original_from_address, _ =
          Email::Receiver.extract_email_address_and_name(mail[:x_original_from])
        reply_to_address, reply_to_name =
          Email::Receiver.extract_email_address_and_name(mail[:reply_to])
        return reply_to_address, reply_to_name if original_from_address == reply_to_address
      end

      Email::Receiver.extract_email_address_and_name(mail[:from])
    rescue StandardError
      nil
    end

    def self.extract_email_address_and_name_from_mailman(mail)
      list_address, _ = Email::Receiver.extract_email_address_and_name(mail[:list_post])
      list_address, _ =
        Email::Receiver.extract_email_address_and_name(mail[:x_beenthere]) if list_address.blank?

      return if list_address.blank?

      # the CC header often includes the name of the sender
      address_to_name = mail[:cc]&.element&.addresses&.to_h { [_1.address, _1.name] } || {}

      %i[from reply_to x_mailfrom x_original_from].each do |header|
        next if mail[header].blank?
        email, name = Email::Receiver.extract_email_address_and_name(mail[header])
        if email.present? && email != list_address
          return email, name.presence || address_to_name[email]
        end
      end
    end

    def self.extract_email_address_and_name(value)
      begin
        # ensure the email header value is a string
        value = value.to_s
        # in embedded emails, converts [mailto:foo@bar.com] to <foo@bar.com>
        value = value.gsub(/\[mailto:([^\[\]]+?)\]/, "<\\1>")
        # 'mailto:' suffix isn't supported by Mail::Address parsing
        value = value.gsub("mailto:", "")
        # parse the email header value
        parsed = Mail::Address.new(value)
        # extract the email address and name
        mail = parsed.address.to_s.downcase.strip
        name = parsed.name.to_s.strip
        # ensure the email address is "valid"
        if mail.include?("@")
          # remove surrounding quotes from the name
          name = name[1...-1] if name.size > 2 && name[/\A(['"]).+(\1)\z/]
          # return the email address and name
          [mail, name]
        end
      rescue Mail::Field::ParseError, Mail::Field::IncompleteParseError => e
        # something went wrong parsing the email header value, return nil
      end
    end

    def subject
      @subject ||=
        if mail_subject = @mail.subject
          mail_subject.delete("\u0000")[0..254]
        else
          I18n.t("emails.incoming.default_subject", email: @from_email)
        end
    end

    def find_or_create_user(email, display_name, raise_on_failed_create: false, user: nil)
      User.transaction do
        user ||= User.find_by_email(email)

        if user.nil? && SiteSetting.enable_staged_users
          raise EmailNotAllowed unless EmailValidator.allowed?(email)

          username = UserNameSuggester.sanitize_username(display_name) if display_name.present?
          begin
            user =
              User.create!(
                email: email,
                username: UserNameSuggester.suggest(username.presence || email),
                name: display_name.presence || User.suggest_name(email),
                staged: true,
              )
            @created_staged_users << user
          rescue PG::UniqueViolation, ActiveRecord::RecordNotUnique, ActiveRecord::RecordInvalid
            raise if raise_on_failed_create
            user = nil
          end
        end

        @staged_users << user if user&.staged?
      end

      user
    end

    def find_or_create_user!(email, display_name)
      find_or_create_user(email, display_name, raise_on_failed_create: true)
    end

    def all_destinations
      @all_destinations ||= [
        @mail.destinations,
        [@mail[:x_forwarded_to]].flatten.compact.map(&:decoded),
        [@mail[:delivered_to]].flatten.compact.map(&:decoded),
      ].flatten.select(&:present?).uniq.lazy
    end

    def destinations
      @destinations ||=
        all_destinations.map { |d| Email::Receiver.check_address(d, is_bounce?) }.reject(&:blank?)
    end

    def sent_to_mailinglist_mirror?
      @sent_to_mailinglist_mirror ||=
        begin
          destinations.each do |destination|
            return true if destination.is_a?(Category) && destination.mailinglist_mirror?
          end

          false
        end
    end

    def self.check_address(address, include_verp = false)
      # only check for a group/category when 'email_in' is enabled
      if SiteSetting.email_in
        group = Group.find_by_email(address)
        return group if group

        category = Category.find_by_email(address)
        return category if category
      end

      # reply
      match = Email::Receiver.reply_by_email_address_regex(true, include_verp).match(address)
      if match && match.captures
        match.captures.each do |c|
          next if c.blank?
          post_reply_key = PostReplyKey.find_by(reply_key: c)
          return post_reply_key if post_reply_key
        end
      end
      nil
    end

    def process_destination(destination, user, body, elided)
      if SiteSetting.forwarded_emails_behaviour != "hide" && has_been_forwarded? &&
           process_forwarded_email(destination, user)
        return
      end

      return if is_bounce? && !destination.is_a?(PostReplyKey)

      if destination.is_a?(Group)
        user ||= stage_from_user
        create_group_post(destination, user, body, elided)
      elsif destination.is_a?(Category)
        if (user.nil? || user.staged?) && !destination.email_in_allow_strangers
          raise StrangersNotAllowedError
        end

        user ||= stage_from_user

        if !user.has_trust_level?(SiteSetting.email_in_min_trust) && !sent_to_mailinglist_mirror?
          raise InsufficientTrustLevelError
        end

        create_topic(
          user: user,
          raw: body,
          elided: elided,
          title: subject,
          category: destination.id,
          skip_validations: user.staged?,
        )
      elsif destination.is_a?(PostReplyKey)
        # We don't stage new users for emails to reply addresses, exit if user is nil
        raise BadDestinationAddress if user.blank?

        post = Post.with_deleted.find(destination.post_id)
        raise ReplyNotAllowedError if !Guardian.new(user).can_create_post?(post&.topic)

        if destination.user_id != user.id && !forwarded_reply_key?(destination, user)
          raise ReplyUserNotMatchingError,
                "post_reply_key.user_id => #{destination.user_id.inspect}, user.id => #{user.id.inspect}"
        end

        create_reply(
          user: user,
          raw: body,
          elided: elided,
          post: post,
          topic: post&.topic,
          skip_validations: user.staged?,
          bounce: is_bounce?,
        )
      end
    end

    def create_group_post(group, user, body, elided)
      message_ids = Email::Receiver.extract_reply_message_ids(@mail, max_message_id_count: 5)

      # incoming emails with matching message ids, and then cross references
      # these with any email addresses for the user vs to/from/cc of the
      # incoming emails. in effect, any incoming email record for these
      # message ids where the user is involved in any way will be returned
      incoming_emails = IncomingEmail.where(message_id: message_ids)
      if !group.allow_unknown_sender_topic_replies
        incoming_emails = incoming_emails.addressed_to_user(user)
      end
      post_ids = incoming_emails.pluck(:post_id) || []

      # if the user is directly replying to an email send to them from discourse,
      # there will be a corresponding EmailLog record, so we can use that as the
      # reply post if it exists
      if Email::MessageIdService.discourse_generated_message_id?(mail.in_reply_to)
        post_id_from_email_log =
          EmailLog
            .where(message_id: mail.in_reply_to)
            .addressed_to_user(user)
            .order(created_at: :desc)
            .limit(1)
            .pluck(:post_id)
            .last
        post_ids << post_id_from_email_log if post_id_from_email_log
      end

      target_post = post_ids.any? && Post.where(id: post_ids).order(:created_at).last
      too_old_for_group_smtp = (destination_too_old?(target_post) && group.smtp_enabled)

      if target_post.blank? || too_old_for_group_smtp
        create_topic(
          user: user,
          raw: new_group_topic_body(body, target_post, too_old_for_group_smtp),
          elided: elided,
          title: subject,
          archetype: Archetype.private_message,
          target_group_names: [group.name],
          is_group_message: true,
          skip_validations: true,
        )
      else
        # This must be done for the unknown user (who is staged) to
        # be allowed to post a reply in the topic.
        if group.allow_unknown_sender_topic_replies
          target_post.topic.topic_allowed_users.find_or_create_by!(user_id: user.id)
        end

        create_reply(
          user: user,
          raw: body,
          elided: elided,
          post: target_post,
          topic: target_post.topic,
          skip_validations: true,
        )
      end
    end

    def new_group_topic_body(body, target_post, too_old_for_group_smtp)
      return body if !too_old_for_group_smtp
      body + "\n\n----\n\n" +
        I18n.t(
          "emails.incoming.continuing_old_discussion",
          url: target_post.topic.url,
          title: target_post.topic.title,
          count: SiteSetting.disallow_reply_by_email_after_days,
        )
    end

    def forwarded_reply_key?(post_reply_key, user)
      incoming_emails =
        IncomingEmail
          .joins(:post)
          .where("posts.topic_id = ?", post_reply_key.post.topic_id)
          .addressed_to(post_reply_key.reply_key)
          .addressed_to_user(user)
          .pluck(:to_addresses, :cc_addresses)

      incoming_emails.each do |to_addresses, cc_addresses|
        unless contains_email_address_of_user?(to_addresses, user) ||
                 contains_email_address_of_user?(cc_addresses, user)
          next
        end

        if contains_reply_by_email_address(to_addresses, post_reply_key.reply_key) ||
             contains_reply_by_email_address(cc_addresses, post_reply_key.reply_key)
          return true
        end
      end

      false
    end

    def contains_email_address_of_user?(addresses, user)
      return false if addresses.blank?

      addresses = addresses.split(";")
      user.user_emails.any? { |user_email| addresses.include?(user_email.email) }
    end

    def contains_reply_by_email_address(addresses, reply_key)
      return false if addresses.blank?

      addresses
        .split(";")
        .each do |address|
          match = Email::Receiver.reply_by_email_address_regex.match(address)
          return true if match && match.captures&.include?(reply_key)
        end

      false
    end

    def has_been_forwarded?
      subject[/\A[[:blank:]]*(fwd?|tr)[[:blank:]]?:/i] && embedded_email_raw.present?
    end

    def embedded_email_raw
      return @embedded_email_raw if @embedded_email_raw
      text = fix_charset(@mail.multipart? ? @mail.text_part : @mail)
      @embedded_email_raw, @before_embedded = EmailReplyTrimmer.extract_embedded_email(text)
      @embedded_email_raw
    end

    def embedded_email
      @embedded_email ||=
        if embedded_email_raw.present?
          mail = Mail.new(embedded_email_raw)
          Email::Validator.ensure_valid_address_lists!(mail)
          mail
        else
          nil
        end
    end

    def process_forwarded_email(destination, user)
      user ||= stage_from_user
      case SiteSetting.forwarded_emails_behaviour
      when "create_replies"
        forwarded_email_create_replies(destination, user)
      when "quote"
        forwarded_email_quote_forwarded(destination, user)
      else
        false
      end
    end

    def forwarded_email_create_topic(
      destination:,
      user:,
      raw:,
      title:,
      date: nil,
      embedded_user: nil
    )
      if destination.is_a?(Group)
        topic_user = embedded_user&.call || user
        create_topic(
          user: topic_user,
          raw: raw,
          title: title,
          archetype: Archetype.private_message,
          target_usernames: [user.username],
          target_group_names: [destination.name],
          is_group_message: true,
          skip_validations: true,
          created_at: date,
        )
      elsif destination.is_a?(Category)
        return false if user.staged? && !destination.email_in_allow_strangers
        return false if !user.has_trust_level?(SiteSetting.email_in_min_trust)

        topic_user = embedded_user&.call || user
        create_topic(
          user: topic_user,
          raw: raw,
          title: title,
          category: destination.id,
          skip_validations: topic_user.staged?,
          created_at: date,
        )
      else
        false
      end
    end

    def forwarded_email_create_replies(destination, user)
      forwarded_by_address, forwarded_by_name =
        Email::Receiver.extract_email_address_and_name(@mail[:from])

      if forwarded_by_address && forwarded_by_name
        @forwarded_by_user = stage_sender_user(forwarded_by_address, forwarded_by_name)
      end

      email_address, display_name =
        parse_from_field(embedded_email, process_forwarded_emails: false)

      return false if email_address.blank? || !email_address.include?("@")

      post =
        forwarded_email_create_topic(
          destination: destination,
          user: user,
          raw: try_to_encode(embedded_email.decoded, "UTF-8").presence || embedded_email.to_s,
          title: embedded_email.subject.presence || subject,
          date: embedded_email.date,
          embedded_user: lambda { find_or_create_user(email_address, display_name) },
        )

      return false unless post

      if post.topic
        # mark post as seen for the forwarder
        PostTiming.record_timing(
          user_id: user.id,
          topic_id: post.topic_id,
          post_number: post.post_number,
          msecs: 5000,
        )

        # create reply when available
        if @before_embedded.present?
          post_type = Post.types[:regular]
          post_type = Post.types[:whisper] if post.topic.private_message? &&
            destination.usernames[user.username]

          create_reply(
            user: user,
            raw: @before_embedded,
            post: post,
            topic: post.topic,
            post_type: post_type,
            skip_validations: user.staged?,
          )
        else
          if @forwarded_by_user
            post.topic.topic_allowed_users.find_or_create_by!(user_id: @forwarded_by_user.id)
          end
          post.topic.add_small_action(@forwarded_by_user || user, "forwarded")
        end
      end

      true
    end

    def forwarded_email_quote_forwarded(destination, user)
      raw = <<~MD
        #{@before_embedded}

        [quote]
        #{PlainTextToMarkdown.new(@embedded_email_raw).to_markdown}
        [/quote]
      MD

      if forwarded_email_create_topic(
           destination: destination,
           user: user,
           raw: raw,
           title: subject,
         )
        true
      end
    end

    def self.reply_by_email_address_regex(extract_reply_key = true, include_verp = false)
      reply_addresses = [SiteSetting.reply_by_email_address]
      reply_addresses << (SiteSetting.alternative_reply_by_email_addresses.presence || "").split(
        "|",
      )

      if include_verp && SiteSetting.reply_by_email_address.present? &&
           SiteSetting.reply_by_email_address["+"]
        reply_addresses << SiteSetting.reply_by_email_address.sub(
          "%{reply_key}",
          "verp-%{reply_key}",
        )
      end

      reply_addresses.flatten!
      reply_addresses.select!(&:present?)
      reply_addresses.map! { |a| Regexp.escape(a) }
      reply_addresses.map! { |a| a.gsub("\+", "\+?") }
      reply_addresses.map! { |a| a.gsub(Regexp.escape("%{reply_key}"), "(\\h{32})?") }
      if reply_addresses.empty?
        /$a/ # a regex that can never match
      else
        /#{reply_addresses.join("|")}/
      end
    end

    def group_incoming_emails_regex
      @group_incoming_emails_regex =
        Regexp.union(DB.query_single(<<~SQL).map { |e| e.split("|") }.flatten.compact_blank.uniq)
          SELECT CONCAT(incoming_email, '|', email_username)
          FROM groups
          WHERE incoming_email IS NOT NULL OR email_username IS NOT NULL
        SQL
    end

    def category_email_in_regex
      @category_email_in_regex ||=
        Regexp.union Category
                       .pluck(:email_in)
                       .select(&:present?)
                       .map { |e| e.split("|") }
                       .flatten
                       .uniq
    end

    def find_related_post(force: false)
      return if !force && SiteSetting.find_related_post_with_key && !sent_to_mailinglist_mirror?

      message_ids = Email::Receiver.extract_reply_message_ids(@mail, max_message_id_count: 5)
      return if message_ids.empty?

      Email::MessageIdService.find_post_from_message_ids(message_ids)
    end

    def self.extract_reply_message_ids(mail, max_message_id_count:)
      message_ids = [mail.in_reply_to, Email::Receiver.extract_references(mail.references)]
      message_ids.flatten!
      message_ids.select!(&:present?)
      message_ids.uniq!
      message_ids.first(max_message_id_count)
    end

    def self.extract_references(references)
      if Array === references
        references
      elsif references.present?
        references.split(/[\s,]/).map { |r| Email::MessageIdService.message_id_clean(r) }
      end
    end

    def likes
      @likes ||= Set.new ["+1", "<3", "❤", I18n.t("post_action_types.like.title").downcase]
    end

    def subscription_action_for(body, subject)
      return unless SiteSetting.unsubscribe_via_email
      return if sent_to_mailinglist_mirror?

      if ([subject, body].compact.map(&:to_s).map(&:downcase) & ["unsubscribe"]).any?
        :confirm_unsubscribe
      end
    end

    def post_action_for(body)
      PostActionType.types[:like] if likes.include?(body.strip.downcase)
    end

    def create_topic(options = {})
      enable_email_pm_setting(options[:user]) if options[:archetype] == Archetype.private_message

      create_post_with_attachments(options)
    end

    def notification_level_for(body)
      # since we are stripping save all this work on long replies
      return nil if body.length > 40

      body = body.strip.downcase
      case body
      when "mute"
        NotificationLevels.topic_levels[:muted]
      when "track"
        NotificationLevels.topic_levels[:tracking]
      when "watch"
        NotificationLevels.topic_levels[:watching]
      else
        nil
      end
    end

    def create_reply(options = {})
      raise TopicNotFoundError if options[:topic].nil? || options[:topic].trashed?
      if options[:bounce] && options[:topic].archetype != Archetype.private_message
        raise BouncedEmailError
      end

      options[:post] = nil if options[:post]&.trashed?
      if options[:topic].archetype == Archetype.private_message
        enable_email_pm_setting(options[:user])
      end

      if post_action_type = post_action_for(options[:raw])
        create_post_action(options[:user], options[:post], post_action_type)
      elsif notification_level = notification_level_for(options[:raw])
        TopicUser.change(
          options[:user].id,
          options[:post].topic_id,
          notification_level: notification_level,
        )
      else
        raise TopicClosedError if options[:topic].closed?
        options[:topic_id] = options[:topic].id
        options[:reply_to_post_number] = options[:post]&.post_number
        options[:is_group_message] = options[:topic].private_message? &&
          options[:topic].allowed_groups.exists?
        create_post_with_attachments(options)
      end
    end

    def create_post_action(user, post, type)
      result = PostActionCreator.new(user, post, type).perform
      raise InvalidPostAction.new if result.failed? && result.forbidden
    end

    def is_allowed?(attachment)
      attachment.content_type !~ SiteSetting.blocked_attachment_content_types_regex &&
        attachment.filename !~ SiteSetting.blocked_attachment_filenames_regex
    end

    def attachments
      @attachments ||=
        begin
          attachments = @mail.attachments.select { |attachment| is_allowed?(attachment) }
          attachments << @mail if @mail.attachment? && is_allowed?(@mail)

          @mail.parts.each { |part| attachments << part if part.attachment? && is_allowed?(part) }

          attachments.uniq!
          attachments
        end
    end

    def create_post_with_attachments(options = {})
      add_elided_to_raw!(options)
      options[:raw] = add_attachments(options[:raw], options[:user], options)
      create_post(options)
    end

    def add_attachments(raw, user, options = {})
      raw = raw.dup

      rejected_attachments = []
      attachments.each do |attachment|
        tmp = Tempfile.new(["discourse-email-attachment", File.extname(attachment.filename)])
        begin
          # read attachment
          File.open(tmp.path, "w+b") { |f| f.write attachment.body.decoded }
          # create the upload for the user
          opts = { for_group_message: options[:is_group_message] }
          upload = UploadCreator.new(tmp, attachment.filename, opts).create_for(user.id)
          if upload.errors.empty?
            # try to inline images
            if attachment.content_type&.start_with?("image/")
              if raw[attachment.url]
                raw.sub!(attachment.url, upload.url)

                InlineUploads.match_img(
                  raw,
                  uploads: {
                    upload.url => upload,
                  },
                ) do |match, src, replacement, _|
                  raw = raw.sub(match, replacement) if src == upload.url
                end
              elsif raw[/\[image:[^\]]*\]/i]
                raw.sub!(/\[image:[^\]]*\]/i, UploadMarkdown.new(upload).to_markdown)
              else
                raw << "\n\n#{UploadMarkdown.new(upload).to_markdown}\n\n"
              end
            else
              raw << "\n\n#{UploadMarkdown.new(upload).to_markdown}\n\n"
            end
          else
            rejected_attachments << upload
            raw << "\n\n#{I18n.t("emails.incoming.missing_attachment", filename: upload.original_filename)}\n\n"
          end
        ensure
          tmp&.close!
        end
      end
      if rejected_attachments.present? && !user.staged?
        notify_about_rejected_attachment(rejected_attachments)
      end

      raw
    end

    def notify_about_rejected_attachment(attachments)
      errors = []

      attachments.each do |a|
        error = a.errors.messages.values[0][0]
        errors << "#{a.original_filename}: #{error}"
      end

      message = Mail::Message.new(@mail)
      template_args = {
        former_title: message.subject,
        destination: message.to,
        site_name: SiteSetting.title,
        rejected_errors: errors.join("\n"),
      }

      client_message =
        RejectionMailer.send_rejection(:email_reject_attachment, message.from, template_args)
      Email::Sender.new(client_message, :email_reject_attachment).send
    end

    def add_elided_to_raw!(options)
      is_private_message =
        options[:archetype] == Archetype.private_message || options[:topic].try(:private_message?)

      # only add elided part in messages
      if options[:elided].present? &&
           (SiteSetting.always_show_trimmed_content || is_private_message)
        options[:raw] << Email::Receiver.elided_html(options[:elided])
        options[:elided] = ""
      end
    end

    def create_post(options = {})
      options[:import_mode] = @opts[:import_mode]

      options[:via_email] = true
      options[:raw_email] = @raw_email

      options[:created_at] ||= @mail.date
      options[:created_at] = DateTime.now if options[:created_at] > DateTime.now

      add_elided_to_raw!(options)

      if sent_to_mailinglist_mirror?
        options[:skip_validations] = true
        options[:skip_guardian] = true
      else
        options[:email_spam] = is_spam?
        options[:first_post_checks] = true if is_spam?
        options[:email_auth_res_action] = auth_res_action
      end

      user = options.delete(:user)

      if options[:bounce]
        options[:raw] = I18n.t(
          "system_messages.email_bounced",
          email: user.email,
          raw: options[:raw],
        )
        user = Discourse.system_user
        options[:post_type] = Post.types[:whisper]
      end

      # To avoid race conditions with the post alerter and Group SMTP
      # emails, we skip the jobs here and enqueue them only _after_
      # the incoming email has been updated with the post and topic.
      options[:skip_jobs] = true
      options[:skip_events] = true
      result = NewPostManager.new(user, options).perform

      errors = result.errors.full_messages
      if errors.any? { |message|
           message.include?(I18n.t("activerecord.attributes.post.raw").strip) &&
             message.include?(
               I18n.t("errors.messages.too_short", count: SiteSetting.min_post_length).strip,
             )
         }
        raise TooShortPost
      end

      raise InvalidPost, errors.join("\n") if result.errors.present?

      if result.post
        IncomingEmail.transaction do
          @incoming_email.update_columns(topic_id: result.post.topic_id, post_id: result.post.id)
          result.post.update(outbound_message_id: @incoming_email.message_id)
        end

        if result.post.topic&.private_message? && !is_bounce?
          add_other_addresses(result.post, user, @mail)

          if has_been_forwarded?
            add_other_addresses(result.post, @forwarded_by_user || user, embedded_email)
          end
        end

        # Alert the people involved in the topic now that the incoming email
        # has been linked to the post.
        PostJobsEnqueuer.new(
          result.post,
          result.post.topic,
          options[:topic_id].blank?,
          import_mode: options[:import_mode],
          post_alert_options: options[:post_alert_options],
        ).enqueue_jobs
        if result.post.is_first_post?
          DiscourseEvent.trigger(:topic_created, result.post.topic, options, user)
        end
        DiscourseEvent.trigger(:post_created, result.post, options, user)
      end

      result.post
    end

    def self.elided_html(elided)
      html = +"\n\n" << "<details class='elided'>" << "\n"
      html << "<summary title='#{I18n.t("emails.incoming.show_trimmed_content")}'>&#183;&#183;&#183;</summary>" <<
        "\n\n"
      html << elided << "\n\n"
      html << "</details>" << "\n"
      html
    end

    def add_other_addresses(post, sender, mail_object)
      max_staged_users_post = nil

      %i[to cc bcc].each do |d|
        next if mail_object[d].blank?

        mail_object[d].each do |address_field|
          begin
            address_field.decoded
            email = address_field.address.downcase
            display_name = address_field.display_name.try(:to_s)
            next if !email.include?("@")

            if should_invite?(email)
              user = User.find_by_email(email)

              # cap number of staged users created per email
              if (!user || user.staged) &&
                   @staged_users.count >= SiteSetting.maximum_staged_users_per_email
                max_staged_users_post ||=
                  post.topic.add_moderator_post(
                    sender,
                    I18n.t("emails.incoming.maximum_staged_user_per_email_reached"),
                    import_mode: @opts[:import_mode],
                  )
                next
              end

              user = find_or_create_user(email, display_name, user: user)
              if user && can_invite?(post.topic, user)
                post.topic.topic_allowed_users.create!(user_id: user.id)
                TopicUser.auto_notification_for_staging(
                  user.id,
                  post.topic_id,
                  TopicUser.notification_reasons[:auto_watch],
                )
                post.topic.add_small_action(
                  sender,
                  "invited_user",
                  user.username,
                  import_mode: @opts[:import_mode],
                )
              end
            end
          rescue ActiveRecord::RecordInvalid, EmailNotAllowed
            # don't care if user already allowed or the user's email address is not allowed
          end
        end
      end
    end

    def should_invite?(email)
      email !~ Email::Receiver.reply_by_email_address_regex &&
        email !~ group_incoming_emails_regex && email !~ category_email_in_regex
    end

    def can_invite?(topic, user)
      !topic.topic_allowed_users.where(user_id: user.id).exists? &&
        !topic
          .topic_allowed_groups
          .where("group_id IN (SELECT group_id FROM group_users WHERE user_id = ?)", user.id)
          .exists?
    end

    def send_subscription_mail(action, user)
      message = SubscriptionMailer.public_send(action, user)
      Email::Sender.new(message, :subscription).send
    end

    def stage_from_user
      @from_user ||= stage_sender_user(@from_email, @from_display_name)
    end

    def stage_sender_user(email, display_name)
      find_or_create_user!(email, display_name).tap { |u| log_and_validate_user(u) }
    end

    def delete_created_staged_users
      @created_staged_users.each do |user|
        @incoming_email.update_columns(user_id: nil) if @incoming_email.user&.id == user.id

        UserDestroyer.new(Discourse.system_user).destroy(user, quiet: true) if user.posts.count == 0
      end
    end

    def enable_email_pm_setting(user)
      # ensure user PM emails are enabled (since user is posting via email)
      if !user.staged &&
           user.user_option.email_messages_level == UserOption.email_level_types[:never]
        user.user_option.update!(email_messages_level: UserOption.email_level_types[:always])
      end
    end

    def destination_too_old?(post)
      return false if post.blank?
      num_of_days = SiteSetting.disallow_reply_by_email_after_days
      num_of_days > 0 && post.created_at < num_of_days.days.ago
    end
  end
end