mirror of
https://github.com/discourse/discourse.git
synced 2025-01-06 00:34:13 +08:00
112 lines
1.8 KiB
Ruby
112 lines
1.8 KiB
Ruby
|
# frozen_string_literal: true
|
||
|
|
||
|
require "monitor"
|
||
|
|
||
|
module WorkQueue
|
||
|
class WorkQueueFull < StandardError
|
||
|
end
|
||
|
|
||
|
class ThreadSafeWrapper
|
||
|
include MonitorMixin
|
||
|
|
||
|
def initialize(queue)
|
||
|
mon_initialize
|
||
|
|
||
|
@queue = queue
|
||
|
@has_items = new_cond
|
||
|
end
|
||
|
|
||
|
def push(task, force:)
|
||
|
synchronize do
|
||
|
previously_empty = @queue.empty?
|
||
|
@queue.push(task, force: force)
|
||
|
|
||
|
@has_items.signal if previously_empty
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def shift(block:)
|
||
|
synchronize do
|
||
|
loop do
|
||
|
if task = @queue.shift
|
||
|
break task
|
||
|
elsif block
|
||
|
@has_items.wait
|
||
|
else
|
||
|
break nil
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def empty?
|
||
|
synchronize { @queue.empty? }
|
||
|
end
|
||
|
|
||
|
def size
|
||
|
synchronize { @queue.size }
|
||
|
end
|
||
|
end
|
||
|
|
||
|
class FairQueue
|
||
|
attr_reader :size
|
||
|
|
||
|
def initialize(limit, &blk)
|
||
|
@limit = limit
|
||
|
@size = 0
|
||
|
@elements = Hash.new { |h, k| h[k] = blk.call }
|
||
|
end
|
||
|
|
||
|
def push(task, force:)
|
||
|
raise WorkQueueFull if !force && @size >= @limit
|
||
|
key, task = task.values_at(:key, :task)
|
||
|
@elements[key].push(task, force: force)
|
||
|
@size += 1
|
||
|
nil
|
||
|
end
|
||
|
|
||
|
def shift
|
||
|
unless @elements.empty?
|
||
|
key, queue = @elements.shift
|
||
|
|
||
|
task = queue.shift
|
||
|
|
||
|
@elements[key] = queue unless queue.empty?
|
||
|
|
||
|
@size -= 1
|
||
|
|
||
|
{ key: key, task: task }
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def empty?
|
||
|
@elements.empty?
|
||
|
end
|
||
|
end
|
||
|
|
||
|
class BoundedQueue
|
||
|
def initialize(limit)
|
||
|
@limit = limit
|
||
|
@elements = []
|
||
|
end
|
||
|
|
||
|
def push(task, force:)
|
||
|
raise WorkQueueFull if !force && @elements.size >= @limit
|
||
|
@elements << task
|
||
|
nil
|
||
|
end
|
||
|
|
||
|
def shift
|
||
|
@elements.shift
|
||
|
end
|
||
|
|
||
|
def empty?
|
||
|
@elements.empty?
|
||
|
end
|
||
|
|
||
|
def size
|
||
|
@elements.size
|
||
|
end
|
||
|
end
|
||
|
end
|