CMT, which stands for Coroutine (or Cooperative, Collaborative) Multi-Threading (or Tasking), is a module providing abstractions over Lua coroutines (aka. VM threads1) to, for example, nicely build and synchronize asynchronous operations.
An asynchronous operation relative to a system thread can be synchronous relative to a coroutine.
General Rule
There are multiple ways to use coroutines; in this case, which is the general case for all my projects, they are used as a way to make asynchronous operations synchronous.
It assumes that a general rule is followed for interoperability:
Only the entity or system that yielded the coroutine should be the one responsible for resuming it.
This prevents resuming conflicts, so that different yielding operations can be used together.
Install
See src/
, rockspecs/
or
luarocks.
Warning: PUC Lua 5.1 is not supported (lack of xpcall arguments and yield across pcall).
Concept
Task
A task represents an asynchronous operation; it may be a standalone handle or an asynchronous function call wrapped as a coroutine.
Other operations can wait for the task completion, e.g. from multiple coroutines. This may be used to design complex asynchronous dependencies.
Mutex
A mutex (Mutual Exclusion) is useful even for cooperative VM threads (coroutines). While the fundamental operations are not executed in parallel, higher asynchronous operations semantically can.
Warning: VM thread mutexes, as for OS thread mutexes, must be carefully used to avoid deadlocks (in this case, between coroutines).
Semaphore
A semaphore for coroutines is also useful to implement some synchronization patterns.
In this case, the terminology of the semaphore is about the management of
resource units. E.g. demand
, supply
and units
instead of POSIX's wait
,
post
and value
.
API
Warning: To prevent coroutine resume errors from being silently handled,
they are propagated to the caller (e.g. task completion or mutex unlocking) and
will interrupt resuming of the other waiting coroutines. Instead of catching
errors from the resuming side, it is probably better to catch them from the
coroutines themselves, where it matters. This is already handled by the
async()
API which propagates the errors to the task handle.
Note: Resume errors are recursively propagated using debug.traceback
,
which may result in multiple stack tracebacks.
Warning: Task wait/completion, mutex lock/unlocking or semaphore demand/supply, as with callbacks, may transfer the execution to "third-party" code; thus the execution state must be carefully analyzed.
Task
A task is a table where the array part is the list of waiting coroutines/callbacks.
When done, a field is added:
- task.r: packed task values
(ok, ...)
(seetable.pack()
)
cmt.async(f, ...)
Asynchronous operation.
No arguments: create a standalone task handle.
With arguments: create a task wrapping an asynchronous function call. I.e. it executes the passed function as a coroutine, like a detached job.
- f: function
- ...: arguments
Return created task.
task:wait([callback])
Wait for task completion (still usable when done).
No arguments (sync): yield the current coroutine if the task is not done yet. It returns the task return values or propagates the task error.
With arguments (async):
- callback(task): called when the task is done (completion or error)
task(ok, ...)
Task return (completion or termination).
Waiting coroutines/callbacks are resumed in the same order of task:wait
calls.
Subsequent calls will throw an error.
- (ok, ...): Common soft error handling interface. When ok is truthy, ... holds the return values, otherwise an error message.
task:complete(...)
Complete task (equivalent to task(true, ...)
).
- ...: task return values
task:error(err)
Terminate task with an error (equivalent to task(false, err)
).
- err: error message
task:done()
Check if the task is done (completed or terminated with an error). Return boolean.
Mutex
A mutex is a table where the array part is the list of locking coroutines, the first being the active one followed by the waiting ones.
- mutex.locks: number of active thread locks
- mutex.reentrant: exist/true if reentrant
cmt.mutex([mode])
Create a mutex.
- mode:
"reentrant"
to make the mutex reentrant
mutex:lock()
Lock the mutex.
mutex:unlock()
Unlock the mutex.
Waiting coroutines are resumed in the same order of mutex:lock
calls.
mutex:locked()
Check if the mutex is locked. Return boolean
Semaphore
A semaphore is a table where the array part is the list of demanding/waiting coroutines.
- semaphore.units: amount of available units
cmt.semaphore(units)
Create a semaphore.
- units: initial amount of units, must be
>= 0
semaphore:supply()
Supply a unit.
Waiting coroutines are resumed in the same order of demand
calls, one per
supply
call.
semaphore:demand()
Demand a unit.
Yield the current coroutine if no unit is available.
Examples
Basic usage
If we have an asynchronous process, like fetching an URL:
local cmt = require("cmt")
async = cmt.async
-- Create the async download function.
function download(url)
local task = async() -- create task
http_request(url, function(ok, content_or_error)
task(ok, content_or_error) -- not simplified for clarity
end)
return task:wait() -- wait for the returned values
end
-- Download 10 URLs synchronously.
local download_task = async(function()
for i=1,10 do
local content = download("http://foo.bar/"..i..".txt")
print(content)
end
end)
Mutex
If we have an asynchronous process which saves data to a SQL database:
local cmt = require("cmt")
async = cmt.async
local txn = cmt.mutex()
-- Save the state of something using a transaction.
-- query() could be asynchronous too.
function save(thing)
txn:lock()
query("START TRANSACTION")
query("UPDATE ...")
some_async_task()
query("UPDATE ...")
some_async_task()
query("UPDATE ...")
query("COMMIT")
txn:unlock()
end
Now save(thing)
can be called from parallel (not fundamentally) tasks without
corrupting the transaction.
Semaphore
If we have work to queue, but only 4 processing units are available:
local cmt = require("cmt")
async = cmt.async
local UNITS = 4
local sem = cmt.semaphore(UNITS)
local function some_async_operation(i, callback)
-- ...
end
-- release the claimed unit when done
local function finished() sem:supply() end
local task = async(function()
-- do all the work
for i=1,1e3 do
sem:demand() -- claim a unit
some_async_operation(i, finished)
end
-- reclaim all units: wait end of processing
for i=1,UNITS do sem:demand() end
end)
- ^ At the exception of VM threads which are not coroutines, e.g. the main thread.