From 30922855f260dea3b142f4dd2e395c4e625282dd Mon Sep 17 00:00:00 2001
From: Daniel Waterworth <me@danielwaterworth.com>
Date: Wed, 7 Feb 2024 13:47:50 -0600
Subject: [PATCH] PERF: Don't allow a single user to monopolize the defer queue
 (#25593)

---
 lib/hijack.rb                    |  1 +
 lib/scheduler/defer.rb           | 16 ++++++++++----
 lib/work_queue.rb                |  9 ++++----
 spec/lib/scheduler/defer_spec.rb | 36 ++++++++++++++++++++++++++++++++
 spec/lib/work_queue_spec.rb      |  2 +-
 5 files changed, 54 insertions(+), 10 deletions(-)

diff --git a/lib/hijack.rb b/lib/hijack.rb
index 775ad3b7d57..b7a76808440 100644
--- a/lib/hijack.rb
+++ b/lib/hijack.rb
@@ -29,6 +29,7 @@ module Hijack
         Scheduler::Defer.later(
           "hijack #{params["controller"]} #{params["action"]} #{info}",
           force: false,
+          current_user: current_user&.id,
           &scheduled.method(:resolve)
         )
       rescue WorkQueue::WorkQueueFull
diff --git a/lib/scheduler/defer.rb b/lib/scheduler/defer.rb
index 36856239165..feb3b1ce3da 100644
--- a/lib/scheduler/defer.rb
+++ b/lib/scheduler/defer.rb
@@ -10,7 +10,9 @@ module Scheduler
       @async = !Rails.env.test?
       @queue =
         WorkQueue::ThreadSafeWrapper.new(
-          WorkQueue::FairQueue.new(500) { WorkQueue::BoundedQueue.new(100) },
+          WorkQueue::FairQueue.new(:site, 500) do
+            WorkQueue::FairQueue.new(:user, 100) { WorkQueue::BoundedQueue.new(50) }
+          end,
         )
 
       @mutex = Mutex.new
@@ -48,7 +50,13 @@ module Scheduler
       @async = val
     end
 
-    def later(desc = nil, db = RailsMultisite::ConnectionManagement.current_db, force: true, &blk)
+    def later(
+      desc = nil,
+      db = RailsMultisite::ConnectionManagement.current_db,
+      force: true,
+      current_user: nil,
+      &blk
+    )
       @stats_mutex.synchronize do
         stats = (@stats[desc] ||= { queued: 0, finished: 0, duration: 0, errors: 0 })
         stats[:queued] += 1
@@ -56,7 +64,7 @@ module Scheduler
 
       if @async
         start_thread if !@thread&.alive? && !@paused
-        @queue.push({ key: db, task: [db, blk, desc] }, force: force)
+        @queue.push({ site: db, user: current_user, db: db, job: blk, desc: desc }, force: force)
       else
         blk.call
       end
@@ -93,7 +101,7 @@ module Scheduler
 
     # using non_block to match Ruby #deq
     def do_work(non_block = false)
-      db, job, desc = @queue.shift(block: !non_block)[:task]
+      db, job, desc = @queue.shift(block: !non_block).values_at(:db, :job, :desc)
 
       start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
       db ||= RailsMultisite::ConnectionManagement::DEFAULT
diff --git a/lib/work_queue.rb b/lib/work_queue.rb
index d4df4207908..ec4bee08d55 100644
--- a/lib/work_queue.rb
+++ b/lib/work_queue.rb
@@ -51,15 +51,16 @@ module WorkQueue
   class FairQueue
     attr_reader :size
 
-    def initialize(limit, &blk)
+    def initialize(key, limit, &blk)
       @limit = limit
       @size = 0
+      @key = key
       @elements = Hash.new { |h, k| h[k] = blk.call }
     end
 
     def push(task, force:)
       raise WorkQueueFull if !force && @size >= @limit
-      key, task = task.values_at(:key, :task)
+      key = task[@key]
       @elements[key].push(task, force: force)
       @size += 1
       nil
@@ -72,10 +73,8 @@ module WorkQueue
         task = queue.shift
 
         @elements[key] = queue unless queue.empty?
-
         @size -= 1
-
-        { key: key, task: task }
+        task
       end
     end
 
diff --git a/spec/lib/scheduler/defer_spec.rb b/spec/lib/scheduler/defer_spec.rb
index f886335fd2a..8f418beebae 100644
--- a/spec/lib/scheduler/defer_spec.rb
+++ b/spec/lib/scheduler/defer_spec.rb
@@ -112,4 +112,40 @@ RSpec.describe Scheduler::Defer do
 
     expect(s).to eq("good")
   end
+
+  describe "#later" do
+    let!(:ivar) { Concurrent::IVar.new }
+    let!(:responses) { Thread::Queue.new }
+
+    def later(db, current_user, request)
+      @defer.later(nil, db, current_user: current_user) do
+        ivar.value
+        responses.push([db, current_user, request])
+      end
+    end
+
+    it "runs jobs in a fair order" do
+      later("site1", 1, 1)
+      later("site1", 1, 2)
+      later("site1", 2, 3)
+      later("site2", 3, 4)
+      later("site2", 4, 5)
+      later("site2", 4, 6)
+
+      ivar.set(nil)
+
+      result = 6.times.map { responses.shift }
+
+      expect(result).to eq(
+        [
+          ["site1", 1, 1],
+          ["site2", 3, 4],
+          ["site1", 2, 3],
+          ["site2", 4, 5],
+          ["site1", 1, 2],
+          ["site2", 4, 6],
+        ],
+      )
+    end
+  end
 end
diff --git a/spec/lib/work_queue_spec.rb b/spec/lib/work_queue_spec.rb
index 964212c77d6..f88959ffc3b 100644
--- a/spec/lib/work_queue_spec.rb
+++ b/spec/lib/work_queue_spec.rb
@@ -74,7 +74,7 @@ end
 
 RSpec.describe WorkQueue::FairQueue do
   subject(:queue) do
-    WorkQueue::FairQueue.new(global_limit) { WorkQueue::BoundedQueue.new(per_key_limit) }
+    WorkQueue::FairQueue.new(:key, global_limit) { WorkQueue::BoundedQueue.new(per_key_limit) }
   end
 
   let(:global_limit) { 5 }