#### libremiliacr
#### Copyright(C) 2020-2024 Remilia Scarlet <remilia@posteo.jp>
####
#### This program is free software: you can redistribute it and/or modify
#### it under the terms of the GNU General Public License as published
#### the Free Software Foundation, either version 3 of the License, or
#### (at your option) any later version.
####
#### This program is distributed in the hope that it will be useful,
#### but WITHOUT ANY WARRANTY; without even the implied warranty of
#### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
#### GNU General Public License for more details.
####
#### You should have received a copy of the GNU General Public License
#### along with this program.If not, see<http:####www.gnu.org/licenses/.>
module RemiLib
# A `JobPool` allows you to perform work on an enumerable set of things,
# spreading the work out over a set of fibers.
class JobPool(T)
# The maximum number of jobs that this `JobPool` can handle.
getter workers : Int32 = 1u32
# Holds the errors that occurred during `#run`.
getter errors : Array(Exception) = [] of Exception
# Optional name for the pool.
@name : String? = nil
# Represents a single task for the `JobPool`.
private class Job(T)
def initialize(doneChan : Channel(Tuple(Job, Exception?)), thing : T, name : String?, &block : T -> Nil)
# Spawn the fiber.
spawn(name: name) do
begin
# Call the block, passing the thing to it.
block.call(thing)
# Notify that this job has finished successfully.
doneChan.send({self, nil})
rescue err : Exception
# Notify that this job received an exception.
doneChan.send({self, err})
end
end
end
end
# Creates a new `JobPool` with the number of workers equal to the number of
# logical CPUs in the system.
def initialize(name : String? = nil)
@workers = System.cpu_count.to_i32
end
# Creates a new `JobPool` with the given number of workers. This must be
# greater than zero.
def initialize(@workers : Int32, name : String? = nil)
raise "Workers cannot be zero for a JobPool" if @workers <= 0
end
# :nodoc:
# The time between checks to see if:
#
# 1. A job has finished.
# 2. A slot is open.
#
# This can be tweaked if you want, but 10 milliseconds (the default) is
# probably good enough for most cases.
#
# This is left with a :nodoc: because it probably doesn't need adjusting in
# most cases.
property sleepCheckInterval : Time::Span = 10.milliseconds
# Runs the pool, assigning one element of `things` to each job. Each job
# will call the given block. Any errors that occur during processing will
# be collected in `#errors`; `#errors` is reset each time this is called.
def run(things : Enumerable, &block : T -> Nil)
jobs : Array(Job(T)) = [] of Job(T)
doneChan = Channel(Tuple(Job(T), Exception?)).new(4)
collectorDone = Channel(Bool).new(1)
# When non-zero, all jobs have been submitted.
allSubmitted : Atomic(UInt8) = Atomic(UInt8).new(0u8)
# Number of jobs currently running
jobsRunning : Atomic(UInt64) = Atomic(UInt64).new(0u64)
# Clear out old errors
@errors.clear
# Spawn the collector
spawn do
# Note about the logic: the first expression here may be true when we're
# first starting, but never the second one.
until jobsRunning.get == 0 && allSubmitted.get != 0
select
when msg = doneChan.receive
# Remove from the jobs array as we receive them. Also store any
# errors that may have occurred, then adjust how many are running.
jobs.delete(msg[0])
msg[1].try { |err| @errors << err }
jobsRunning.sub(1)
else
# We specifically use a sleep here because, if we didn't, we could
# run the risk of a deadlock. Basically, the collector could go
# back to `msg = doneChan.receive` and then get stuck before the
# until loop's ending logic becomes true. So we never let select
# statement block indefinitely.
sleep @sleepCheckInterval
end
end
collectorDone.send(true) # Indicate the collector is done
end
# Spawn tasks
things.each do |thing|
# Wait for a slot to open.
while jobsRunning.get > @workers
sleep @sleepCheckInterval
end
# Spawn a new job
jobsRunning.add(1)
jobs << Job(T).new(doneChan, thing, @name, &block)
end
# Mark that we've submitted everything, then wait for the collector to
# finish.
allSubmitted.set(1)
collectorDone.receive
end
end
end