mirror of
https://github.com/discourse/discourse.git
synced 2024-11-30 17:55:14 +08:00
112 lines
1.8 KiB
Ruby
112 lines
1.8 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "monitor"
|
|
|
|
module WorkQueue
|
|
class WorkQueueFull < StandardError
|
|
end
|
|
|
|
class ThreadSafeWrapper
|
|
include MonitorMixin
|
|
|
|
def initialize(queue)
|
|
mon_initialize
|
|
|
|
@queue = queue
|
|
@has_items = new_cond
|
|
end
|
|
|
|
def push(task, force:)
|
|
synchronize do
|
|
previously_empty = @queue.empty?
|
|
@queue.push(task, force: force)
|
|
|
|
@has_items.signal if previously_empty
|
|
end
|
|
end
|
|
|
|
def shift(block:)
|
|
synchronize do
|
|
loop do
|
|
if task = @queue.shift
|
|
break task
|
|
elsif block
|
|
@has_items.wait
|
|
else
|
|
break nil
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
def empty?
|
|
synchronize { @queue.empty? }
|
|
end
|
|
|
|
def size
|
|
synchronize { @queue.size }
|
|
end
|
|
end
|
|
|
|
class FairQueue
|
|
attr_reader :size
|
|
|
|
def initialize(limit, &blk)
|
|
@limit = limit
|
|
@size = 0
|
|
@elements = Hash.new { |h, k| h[k] = blk.call }
|
|
end
|
|
|
|
def push(task, force:)
|
|
raise WorkQueueFull if !force && @size >= @limit
|
|
key, task = task.values_at(:key, :task)
|
|
@elements[key].push(task, force: force)
|
|
@size += 1
|
|
nil
|
|
end
|
|
|
|
def shift
|
|
unless @elements.empty?
|
|
key, queue = @elements.shift
|
|
|
|
task = queue.shift
|
|
|
|
@elements[key] = queue unless queue.empty?
|
|
|
|
@size -= 1
|
|
|
|
{ key: key, task: task }
|
|
end
|
|
end
|
|
|
|
def empty?
|
|
@elements.empty?
|
|
end
|
|
end
|
|
|
|
class BoundedQueue
|
|
def initialize(limit)
|
|
@limit = limit
|
|
@elements = []
|
|
end
|
|
|
|
def push(task, force:)
|
|
raise WorkQueueFull if !force && @elements.size >= @limit
|
|
@elements << task
|
|
nil
|
|
end
|
|
|
|
def shift
|
|
@elements.shift
|
|
end
|
|
|
|
def empty?
|
|
@elements.empty?
|
|
end
|
|
|
|
def size
|
|
@elements.size
|
|
end
|
|
end
|
|
end
|