Login
concurrency.cr at tip
Login

File src/remilib/concurrency.cr from the latest check-in


     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
   100
   101
   102
   103
   104
   105
   106
   107
   108
   109
   110
   111
   112
   113
   114
   115
   116
   117
   118
   119
   120
   121
   122
   123
   124
   125
   126
   127
   128
   129
   130
   131
   132
   133
#### libremiliacr
#### Copyright(C) 2020-2024 Remilia Scarlet <remilia@posteo.jp>
####
#### This program is free software: you can redistribute it and/or modify
#### it under the terms of the GNU General Public License as published
#### the Free Software Foundation, either version 3 of the License, or
#### (at your option) any later version.
####
#### This program is distributed in the hope that it will be useful,
#### but WITHOUT ANY WARRANTY; without even the implied warranty of
#### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
#### GNU General Public License for more details.
####
#### You should have received a copy of the GNU General Public License
#### along with this program.If not, see<http:####www.gnu.org/licenses/.>

module RemiLib
  # A `JobPool` allows you to perform work on an enumerable set of things,
  # spreading the work out over a set of fibers.
  class JobPool(T)
    # The maximum number of jobs that this `JobPool` can handle.
    getter workers : Int32 = 1u32

    # Holds the errors that occurred during `#run`.
    getter errors : Array(Exception) = [] of Exception

    # Optional name for the pool.
    @name : String? = nil

    # Represents a single task for the `JobPool`.
    private class Job(T)
      def initialize(doneChan : Channel(Tuple(Job, Exception?)), thing : T, name : String?, &block : T -> Nil)
        # Spawn the fiber.
        spawn(name: name) do
          begin
            # Call the block, passing the thing to it.
            block.call(thing)

            # Notify that this job has finished successfully.
            doneChan.send({self, nil})
          rescue err : Exception
            # Notify that this job received an exception.
            doneChan.send({self, err})
          end
        end
      end
    end

    # Creates a new `JobPool` with the number of workers equal to the number of
    # logical CPUs in the system.
    def initialize(name : String? = nil)
      @workers = System.cpu_count.to_i32
    end

    # Creates a new `JobPool` with the given number of workers.  This must be
    # greater than zero.
    def initialize(@workers : Int32, name : String? = nil)
      raise "Workers cannot be zero for a JobPool" if @workers <= 0
    end

    # :nodoc:
    # The time between checks to see if:
    #
    # 1. A job has finished.
    # 2. A slot is open.
    #
    # This can be tweaked if you want, but 10 milliseconds (the default) is
    # probably good enough for most cases.
    #
    # This is left with a :nodoc: because it probably doesn't need adjusting in
    # most cases.
    property sleepCheckInterval : Time::Span = 10.milliseconds

    # Runs the pool, assigning one element of `things` to each job.  Each job
    # will call the given block.  Any errors that occur during processing will
    # be collected in `#errors`; `#errors` is reset each time this is called.
    def run(things : Enumerable, &block : T -> Nil)
      jobs : Array(Job(T)) = [] of Job(T)
      doneChan = Channel(Tuple(Job(T), Exception?)).new(4)
      collectorDone = Channel(Bool).new(1)

      # When non-zero, all jobs have been submitted.
      allSubmitted : Atomic(UInt8) = Atomic(UInt8).new(0u8)

      # Number of jobs currently running
      jobsRunning : Atomic(UInt64) = Atomic(UInt64).new(0u64)

      # Clear out old errors
      @errors.clear

      # Spawn the collector
      spawn do
        # Note about the logic: the first expression here may be true when we're
        # first starting, but never the second one.
        until jobsRunning.get == 0 && allSubmitted.get != 0
          select
          when msg = doneChan.receive
            # Remove from the jobs array as we receive them.  Also store any
            # errors that may have occurred, then adjust how many are running.
            jobs.delete(msg[0])
            msg[1].try { |err| @errors << err }
            jobsRunning.sub(1)
          else
            # We specifically use a sleep here because, if we didn't, we could
            # run the risk of a deadlock.  Basically, the collector could go
            # back to `msg = doneChan.receive` and then get stuck before the
            # until loop's ending logic becomes true.  So we never let select
            # statement block indefinitely.
            sleep @sleepCheckInterval
          end
        end
        collectorDone.send(true) # Indicate the collector is done
      end

      # Spawn tasks
      things.each do |thing|
        # Wait for a slot to open.
        while jobsRunning.get > @workers
          sleep @sleepCheckInterval
        end

        # Spawn a new job
        jobsRunning.add(1)
        jobs << Job(T).new(doneChan, thing, @name, &block)
      end

      # Mark that we've submitted everything, then wait for the collector to
      # finish.
      allSubmitted.set(1)
      collectorDone.receive
    end
  end
end