CMT

CMT
Login

CMT, which stands for Coroutine (or Cooperative, Collaborative) Multi-Threading (or Tasking), is a module providing abstractions over Lua coroutines (aka. VM threads1) to, for example, nicely build and synchronize asynchronous operations.

An asynchronous operation relative to a system thread can be synchronous relative to a coroutine.

General Rule

There are multiple ways to use coroutines; in this case, which is the general case for all my projects, they are used as a way to make asynchronous operations synchronous.

It assumes that a general rule is followed for interoperability:

Only the entity or system that yielded the coroutine should be the one responsible for resuming it.

This prevents resuming conflicts, so that different yielding operations can be used together.

Install

See src/, rockspecs/ or luarocks.

Warning: PUC Lua 5.1 is not supported (lack of xpcall arguments and yield across pcall).

Concept

Task

A task represents an asynchronous operation; it may be a standalone handle or an asynchronous function call wrapped as a coroutine.

Other operations can wait for the task completion, e.g. from multiple coroutines. This may be used to design complex asynchronous dependencies.

Mutex

A mutex (Mutual Exclusion) is useful even for cooperative VM threads (coroutines). While the fundamental operations are not executed in parallel, higher asynchronous operations semantically can.

Warning: VM thread mutexes, as for OS thread mutexes, must be carefully used to avoid deadlocks (in this case, between coroutines).

Semaphore

A semaphore for coroutines is also useful to implement some synchronization patterns.

In this case, the terminology of the semaphore is about the management of resource units. E.g. demand, supply and units instead of POSIX's wait, post and value.

API

Warning: To prevent coroutine resume errors from being silently handled, they are propagated to the caller (e.g. task completion or mutex unlocking) and will interrupt resuming of the other waiting coroutines. Instead of catching errors from the resuming side, it is probably better to catch them from the coroutines themselves, where it matters. This is already handled by the async() API which propagates the errors to the task handle.

Note: Resume errors are recursively propagated using debug.traceback, which may result in multiple stack tracebacks.

Warning: Task wait/completion, mutex lock/unlocking or semaphore demand/supply, as with callbacks, may transfer the execution to "third-party" code; thus the execution state must be carefully analyzed.

Task

A task is a table where the array part is the list of waiting coroutines/callbacks.

When done, a field is added:

cmt.async(f, ...)

Asynchronous operation.

No arguments: create a standalone task handle.

With arguments: create a task wrapping an asynchronous function call. I.e. it executes the passed function as a coroutine, like a detached job.

Return created task.

task:wait([callback])

Wait for task completion (still usable when done).

No arguments (sync): yield the current coroutine if the task is not done yet. It returns the task return values or propagates the task error.

With arguments (async):

task(ok, ...)

Task return (completion or termination).

Waiting coroutines/callbacks are resumed in the same order of task:wait calls. Subsequent calls will throw an error.

task:complete(...)

Complete task (equivalent to task(true, ...)).

task:error(err)

Terminate task with an error (equivalent to task(false, err)).

task:done()

Check if the task is done (completed or terminated with an error). Return boolean.

Mutex

A mutex is a table where the array part is the list of locking coroutines, the first being the active one followed by the waiting ones.

cmt.mutex([mode])

Create a mutex.

mutex:lock()

Lock the mutex.

mutex:unlock()

Unlock the mutex.

Waiting coroutines are resumed in the same order of mutex:lock calls.

mutex:locked()

Check if the mutex is locked. Return boolean

Semaphore

A semaphore is a table where the array part is the list of demanding/waiting coroutines.

cmt.semaphore(units)

Create a semaphore.

semaphore:supply()

Supply a unit.

Waiting coroutines are resumed in the same order of demand calls, one per supply call.

semaphore:demand()

Demand a unit.

Yield the current coroutine if no unit is available.

Examples

Basic usage

If we have an asynchronous process, like fetching an URL:

local cmt = require("cmt")
async = cmt.async

-- Create the async download function.
function download(url)
  local task = async() -- create task
  http_request(url, function(ok, content_or_error)
    task(ok, content_or_error) -- not simplified for clarity
  end)
  return task:wait() -- wait for the returned values
end

-- Download 10 URLs synchronously.
local download_task = async(function()
  for i=1,10 do
    local content = download("http://foo.bar/"..i..".txt")
    print(content)
  end
end)

Mutex

If we have an asynchronous process which saves data to a SQL database:

local cmt = require("cmt")
async = cmt.async

local txn = cmt.mutex()

-- Save the state of something using a transaction.
-- query() could be asynchronous too.
function save(thing)
  txn:lock()
  query("START TRANSACTION")
  query("UPDATE ...")
  some_async_task()
  query("UPDATE ...")
  some_async_task()
  query("UPDATE ...")
  query("COMMIT")
  txn:unlock()
end

Now save(thing) can be called from parallel (not fundamentally) tasks without corrupting the transaction.

Semaphore

If we have work to queue, but only 4 processing units are available:

local cmt = require("cmt")
async = cmt.async

local UNITS = 4
local sem = cmt.semaphore(UNITS)

local function some_async_operation(i, callback)
  -- ...
end

-- release the claimed unit when done
local function finished() sem:supply() end

local task = async(function()
  -- do all the work
  for i=1,1e3 do
    sem:demand() -- claim a unit
    some_async_operation(i, finished)
  end
  -- reclaim all units: wait end of processing
  for i=1,UNITS do sem:demand() end
end)

  1. ^ At the exception of VM threads which are not coroutines, e.g. the main thread.