Login
Artifact [9c04cc6f9b]
Login

Artifact 9c04cc6f9b1381f9f681f3a559afbe7de868f5aa1210358b287595ac8d57eba9:


#### libremiliacr
#### Copyright(C) 2020-2024 Remilia Scarlet <remilia@posteo.jp>
####
#### This program is free software: you can redistribute it and/or modify
#### it under the terms of the GNU General Public License as published
#### the Free Software Foundation, either version 3 of the License, or
#### (at your option) any later version.
####
#### This program is distributed in the hope that it will be useful,
#### but WITHOUT ANY WARRANTY; without even the implied warranty of
#### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
#### GNU General Public License for more details.
####
#### You should have received a copy of the GNU General Public License
#### along with this program.If not, see<http:####www.gnu.org/licenses/.>

module RemiLib
  # A `JobPool` allows you to perform work on an enumerable set of things,
  # spreading the work out over a set of fibers.
  class JobPool(T)
    # The maximum number of jobs that this `JobPool` can handle.
    getter workers : Int32 = 1u32

    # Holds the errors that occurred during `#run`.
    getter errors : Array(Exception) = [] of Exception

    # Optional name for the pool.
    @name : String? = nil

    # Represents a single task for the `JobPool`.
    private class Job(T)
      def initialize(doneChan : Channel(Tuple(Job, Exception?)), thing : T, name : String?, &block : T -> Nil)
        # Spawn the fiber.
        spawn(name: name) do
          begin
            # Call the block, passing the thing to it.
            block.call(thing)

            # Notify that this job has finished successfully.
            doneChan.send({self, nil})
          rescue err : Exception
            # Notify that this job received an exception.
            doneChan.send({self, err})
          end
        end
      end
    end

    # Creates a new `JobPool` with the number of workers equal to the number of
    # logical CPUs in the system.
    def initialize(name : String? = nil)
      @workers = System.cpu_count.to_i32
    end

    # Creates a new `JobPool` with the given number of workers.  This must be
    # greater than zero.
    def initialize(@workers : Int32, name : String? = nil)
      raise "Workers cannot be zero for a JobPool" if @workers <= 0
    end

    # :nodoc:
    # The time between checks to see if:
    #
    # 1. A job has finished.
    # 2. A slot is open.
    #
    # This can be tweaked if you want, but 10 milliseconds (the default) is
    # probably good enough for most cases.
    #
    # This is left with a :nodoc: because it probably doesn't need adjusting in
    # most cases.
    property sleepCheckInterval : Time::Span = 10.milliseconds

    # Runs the pool, assigning one element of `things` to each job.  Each job
    # will call the given block.  Any errors that occur during processing will
    # be collected in `#errors`; `#errors` is reset each time this is called.
    def run(things : Enumerable, &block : T -> Nil)
      jobs : Array(Job(T)) = [] of Job(T)
      doneChan = Channel(Tuple(Job(T), Exception?)).new(4)
      collectorDone = Channel(Bool).new(1)

      # When non-zero, all jobs have been submitted.
      allSubmitted : Atomic(UInt8) = Atomic(UInt8).new(0u8)

      # Number of jobs currently running
      jobsRunning : Atomic(UInt64) = Atomic(UInt64).new(0u64)

      # Clear out old errors
      @errors.clear

      # Spawn the collector
      spawn do
        # Note about the logic: the first expression here may be true when we're
        # first starting, but never the second one.
        until jobsRunning.get == 0 && allSubmitted.get != 0
          select
          when msg = doneChan.receive
            # Remove from the jobs array as we receive them.  Also store any
            # errors that may have occurred, then adjust how many are running.
            jobs.delete(msg[0])
            msg[1].try { |err| @errors << err }
            jobsRunning.sub(1)
          else
            # We specifically use a sleep here because, if we didn't, we could
            # run the risk of a deadlock.  Basically, the collector could go
            # back to `msg = doneChan.receive` and then get stuck before the
            # until loop's ending logic becomes true.  So we never let select
            # statement block indefinitely.
            sleep @sleepCheckInterval
          end
        end
        collectorDone.send(true) # Indicate the collector is done
      end

      # Spawn tasks
      things.each do |thing|
        # Wait for a slot to open.
        while jobsRunning.get > @workers
          sleep @sleepCheckInterval
        end

        # Spawn a new job
        jobsRunning.add(1)
        jobs << Job(T).new(doneChan, thing, @name, &block)
      end

      # Mark that we've submitted everything, then wait for the collector to
      # finish.
      allSubmitted.set(1)
      collectorDone.receive
    end
  end
end