# frozen_string_literal: true

module Scheduler
  # ThreadPool manages a pool of worker threads that process tasks from a queue.
  # It maintains a minimum number of threads and can scale up to a maximum number
  # when there's more work to be done.
  #
  # Usage:
  #  pool = ThreadPool.new(min_threads: 0, max_threads: 4, idle_time: 0.1)
  #  pool.post { do_something }
  #  pool.stats (returns thread count, busy thread count, etc.)
  #
  #  pool.shutdown (do not accept new tasks)
  #  pool.wait_for_termination(timeout: 1) (optional timeout)

  class ThreadPool
    class ShutdownError < StandardError
    end

    def initialize(min_threads:, max_threads:, idle_time: nil)
      # 30 seconds is a reasonable default for idle time
      # it is particularly useful for the use case of:
      # ThreadPool.new(min_threads: 4, max_threads: 4)
      # operators would get confused about idle time cause why does it matter
      idle_time ||= 30
      raise ArgumentError, "min_threads must be 0 or larger" if min_threads < 0
      raise ArgumentError, "max_threads must be 1 or larger" if max_threads < 1
      raise ArgumentError, "max_threads must be >= min_threads" if max_threads < min_threads
      raise ArgumentError, "idle_time must be positive" if idle_time <= 0

      @min_threads = min_threads
      @max_threads = max_threads
      @idle_time = idle_time

      @threads = Set.new
      @busy_threads = Set.new

      @queue = Queue.new
      @mutex = Mutex.new
      @new_work = ConditionVariable.new
      @shutdown = false

      # Initialize minimum number of threads
      @min_threads.times { spawn_thread }
    end

    def post(&block)
      raise ShutdownError, "Cannot post work to a shutdown ThreadPool" if shutdown?

      db = RailsMultisite::ConnectionManagement.current_db
      wrapped_block = wrap_block(block, db)

      @mutex.synchronize do
        @queue << wrapped_block
        spawn_thread if @threads.length == 0

        @new_work.signal
      end
    end

    def wait_for_termination(timeout: nil)
      threads_to_join = nil
      @mutex.synchronize { threads_to_join = @threads.to_a }

      if timeout.nil?
        threads_to_join.each(&:join)
      else
        failed_to_shutdown = false

        deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
        threads_to_join.each do |thread|
          remaining_time = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
          break if remaining_time <= 0
          if !thread.join(remaining_time)
            Rails.logger.error "ThreadPool: Failed to join thread within timeout\n#{thread.backtrace.join("\n")}"
            failed_to_shutdown = true
          end
        end

        if failed_to_shutdown
          @mutex.synchronize { @threads.each(&:kill) }
          raise ShutdownError, "Failed to shutdown ThreadPool within timeout"
        end
      end
    end

    def shutdown
      @mutex.synchronize do
        return if @shutdown
        @shutdown = true
        @threads.length.times { @queue << :shutdown }
        @new_work.broadcast
      end
    end

    def shutdown?
      @mutex.synchronize { @shutdown }
    end

    def stats
      @mutex.synchronize do
        {
          thread_count: @threads.size,
          queued_tasks: @queue.size,
          shutdown: @shutdown,
          min_threads: @min_threads,
          max_threads: @max_threads,
          busy_thread_count: @busy_threads.size,
        }
      end
    end

    private

    def wrap_block(block, db)
      proc do
        begin
          RailsMultisite::ConnectionManagement.with_connection(db) { block.call }
        rescue StandardError => e
          Discourse.warn_exception(
            e,
            message: "Discourse Scheduler ThreadPool: Unhandled exception",
          )
        end
      end
    end

    def thread_loop
      done = false
      while !done
        work = nil

        @mutex.synchronize do
          # we may have already have work so no need
          # to wait for signals, this also handles the race
          # condition between spinning up threads and posting work
          work = @queue.pop(timeout: 0)
          @new_work.wait(@mutex, @idle_time) if !work

          if !work && @queue.empty?
            done = @threads.count > @min_threads
          else
            work ||= @queue.pop

            if work == :shutdown
              work = nil
              done = true
            end
          end

          @busy_threads << Thread.current if work

          if !done && work && @queue.length > 0 && @threads.length < @max_threads &&
               @busy_threads.length == @threads.length
            spawn_thread
          end

          @threads.delete(Thread.current) if done
        end

        if work
          begin
            work.call
          ensure
            @mutex.synchronize { @busy_threads.delete(Thread.current) }
          end
        end
      end
    end

    # Outside of constructor usage this is called from a synchronized block
    # we are already synchronized
    def spawn_thread
      thread = Thread.new { thread_loop }
      thread.abort_on_exception = true
      @threads << thread
    end
  end
end