Concurrency Oriented Programming in Lua
ConcurrentLua is a system that implements a concurrency model for the Lua programming language. It is based on the share-nothing asynchronous message-passing model that is employed in the Erlang programming language.
ConcurrentLua extends Lua's coroutines with message-passing primitives, in order to support concurrent programming. Distributed programming is supported transparently with the same message-passing primitives.
ConcurrentLua is implemented as a collection of Lua modules that can be loaded by any Lua program. Most of the code is written in Lua itself, with minor parts written in C.
One of the core elements of ConcurrentLua is the process. A process is a light-weight VM thread, that plays the same role as do processes in an operating system; they don't share memory but instead they communicate through some kind of interprocess communication. These processes can be created and destroyed on demand, and a simple round-robin scheduler passes control to them.
Each process is associated with a mailbox, a message queue for the temporary storage of messages that were sent to the process. A process can check its mailbox for new messages at any time, and if there are any, they can be read in the order of arrival.
Each process is identified by a unique numeric process identifier, or else PID. In addition, aliases or process names can be used instead of PIDs, in order to refer to processes. These aliases and their references are stored in a central repository, the registry. Processes can edit the registry, by adding or deleting entries.
Error handling mechanisms are also provided in the form of monitors and links. With monitors processes can monitor the other processes, and get notified if the monitored processes terminate abnormally. With links processes are bound together, and when one of them terminates abnormally the other one is signalled and terminates, too.
This system also supports distributed programming and all the properties that have been described map naturally onto a distributed system. Distributed processes communicate with the same primitives as local processes.
Distribution is based on a component that is called the node. A node represents a system runtime inside of which processes are executing. Nodes can be connected to each other and communicate, thus forming a virtual network. Distributed processes use this network in order to exchange messages.
Each node has a name associated with it. In order for other nodes to connect to each other by using only this name, a port mapper daemon acts a nameserver. The port mapper daemon has details about the nodes running under the network host that the daemon itself is bound to.
As processes can be created locally, it is also possible to request the creation of processes on remote nodes. A remote process can then be handled as if it was a local process.
If the nodes that form the virtual network are fully connected (every node is connected bidirectionally to each other), global aliases can be used for the processes. The nodes negotiate and maintain a virtual global registry and also keep updated local copies of the registry.
Monitors and links for distributed processes are supported with the same semantics as for local processes. Nodes take care of the task of transparently handling errors between distributed processes. In addition, it is possible for processes to monitor nodes as a whole.
Nodes are required to authenticate before they can communicate. An authenticated node can then be part of the virtual network that the nodes form. A simple security mechanism takes care of this task.
The implementation of ConcurrentLua is based on the Lua component system. The system is organized as a collection of Lua modules and submodules. The main modules are two, and provide the concurrent and distributed programming functionality respectively. One could load only the concurrency module and also for each module there is the option of not loading some of the submodules if the functionality they provide is not needed. A stand-alone port mapper daemon utility is also included.
The processes in the system are implemented with Lua coroutines. A process is actually a Lua coroutine that yields control when the process suspends its execution and resumes control when the process continues its execution.
The scheduling of the processes is still based on the cooperative multithreading model that Lua uses. Processes voluntarily suspend their execution and thus other processes get the chance to run. Nevertheless, the suspending and resuming of processes is partly hidden under a higher level mechanism; a process suspends its execution when waiting for a message to arrive and becomes ready to be resumed when new messages have arrived in its mailbox. A simple round-robin scheduler resumes the processes.
Any type of Lua data, with the exception of memory references, can be sent inside messages. Messages can be booleans, numbers, strings, tables or functions, and any combination of them. Data are automatically serialized on sent and deserialized on receive, and everything is passed by value.
Interprocess communication between nodes, and subsequently between distributed processes, is based on an asynchronous socket handler. This translates to networking model that uses non-blocking sockets and periodic polling. This is the approach that is mostly used today by Lua libraries. Non-blocking semantics should be also used for IO such as files, pipes, etc.
Some examples will provide an introduction to the most essential properties of the system, from process creation and message passing to distributed programming and error handling.
Processes are created using the spawn()
function. The
spawn()
function takes at least one argument; the function that
contains the command set that the process will execute. Any additional
arguments are passed directly as arguments of the function.
The following example demonstrates the creation of a process. The process just prints a message as many times as specified:
concurrent = require 'concurrent' function hello_world(times) for i = 1, times do print('hello world') end print('done') end concurrent.spawn(hello_world, 3) concurrent.loop()
The output would be:
hello world hello world hello world done
First the system is loaded:
concurrent = require 'concurrent'
The function that the process will execute is defined next:
function hello_world(times) for i = 1, times do print('hello world') end print('done') end
A new process is created:
concurrent.spawn(hello_world, 3)
The system's infinite loop is called last:
concurrent.loop()
Processes can exchange messages by using the send()
and
receive()
functions. Also, the self()
function can
be used to get the PID of the calling process.
The following program implements two processes that exchange messages and then terminate:
concurrent = require 'concurrent' function pong() while true do local msg = concurrent.receive() if msg.body == 'finished' then break elseif msg.body == 'ping' then print('pong received ping') concurrent.send(msg.from, { body = 'pong' }) end end print('pong finished') end function ping(n, pid) for i = 1, n do concurrent.send(pid, { from = concurrent.self(), body = 'ping' }) local msg = concurrent.receive() if msg.body == 'pong' then print('ping received pong') end end concurrent.send(pid, { from = concurrent.self(), body = 'finished' }) print('ping finished') end pid = concurrent.spawn(pong) concurrent.spawn(ping, 3, pid) concurrent.loop()
The output would be:
pong received ping ping received pong pong received ping ping received pong pong received ping ping received pong pong finished ping finished
After the pong process is created, the ping process is supplied with the PID of the pong process:
pid = concurrent.spawn(pong) concurrent.spawn(ping, 3, pid)
The ping process sends a message:
concurrent.send(pid, { from = concurrent.self(), body = 'ping' })
The pong process waits for a message to arrive and saves it in a variable when it does:
local msg = concurrent.receive()
The pong process replies:
concurrent.send(msg.from, { body = 'pong' })
The pong process terminates after having received a notification from the ping process.
Instead of using process PIDs for sending messages, process names can also
be used. The register()
function can be used to create an alias
for a process in the registry:
concurrent = require 'concurrent' function pong() while true do local msg = concurrent.receive() if msg.body == 'finished' then break elseif msg.body == 'ping' then print('pong received ping') concurrent.send(msg.from, { body = 'pong' }) end end print('pong finished') end function ping(n) for i = 1, n do concurrent.send('pong', { from = concurrent.self(), body = 'ping' }) local msg = concurrent.receive() if msg.body == 'pong' then print('ping received pong') end end concurrent.send('pong', { from = concurrent.self(), body = 'finished' }) print('ping finished') end pid = concurrent.spawn(pong) concurrent.register('pong', pid) concurrent.spawn(ping, 3) concurrent.loop()
The only change from the previous example is the destination that the ping process sends messages to:
concurrent.send('pong', { from = concurrent.self(), body = 'ping' })
And:
concurrent.send('pong', { from = concurrent.self(), body = 'finished' })
And the pong process now registers its name:
concurrent.register('pong', pid)
Therefore the ping process isn't supplied with the PID of the pong process.
Processes in different nodes can still communicate with the same message passing primitives. Remote processes are denoted by their PID or alias and the node they are executing under. The previous example could be broken into two programs, one for each process.
The code for the pong process:
concurrent = require 'concurrent' function pong() while true do local msg = concurrent.receive() if msg.body == 'finished' then break elseif msg.body == 'ping' then print('pong received ping') concurrent.send(msg.from, { body = 'pong' }) end end print('pong finished') end concurrent.init('pong@gaia') pid = concurrent.spawn(pong) concurrent.register('pong', pid) concurrent.loop() concurrent.shutdown()
And the code for the ping process:
concurrent = require 'concurrent' function ping(n) for i = 1, n do concurrent.send({ 'pong', 'pong@gaia' }, { from = { concurrent.self(), concurrent.node() }, body = 'ping' }) local msg = concurrent.receive() if msg.body == 'pong' then print('ping received pong') end end concurrent.send({ 'pong', 'pong@gaia' }, { from = { concurrent.self(), concurrent.node() }, body = 'finished' }) print('ping finished') end concurrent.spawn(ping, 3) concurrent.init('ping@selene') concurrent.loop() concurrent.shutdown()
The output of the pong process would be:
pong received ping pong received ping pong received ping pong finished
And the output of the ping process would be:
ping received pong ping received pong ping received pong ping finished
In this example the runtime system is running in distributed mode. In order for this to happen, first the port mapper daemon has to be started. This can done by typing in a command line shell:
$ clpmd
The code that initializes the node that the pong process is running on:
concurrent.init('pong@gaia')
And the code for the ping process:
concurrent.init('ping@selene')
The previous two code snippets register to the port mapper daemon, the port that each node is listening to. Both nodes unregister their port with:
concurrent.shutdown()
The only other changes in this example are the destination that the
messages are sent to, along with the introduction of the node()
function that returns the name of the node that the calling process is running
on:
concurrent.send({ 'pong', 'pong@gaia' }, { from = { concurrent.self(), concurrent.node() }, body = 'ping' })
And later:
concurrent.send({ 'pong', 'pong@gaia' }, { from = { concurrent.self(), concurrent.node() }, body = 'finished' })
One approach to handle errors in processes is the notion of linked
processes. Two processes are bound together and if one of them terminates
abnormally the other one terminates, too. The link()
function can
be used to link processes:
concurrent = require 'concurrent' function ping(n, pid) concurrent.link(pid) for i = 1, n do concurrent.send(pid, { from = concurrent.self(), body = 'ping' }) local msg = concurrent.receive() if msg.body == 'pong' then print('ping received pong') end end print('ping finished') concurrent.exit('finished') end function pong() while true do local msg = concurrent.receive() if msg.body == 'ping' then print('pong received ping') concurrent.send(msg.from, { body = 'pong' }) end end print('pong finished') end pid = concurrent.spawn(pong) concurrent.spawn(ping, 3, pid) concurrent.loop()
The output would be:
pong received ping ping received pong pong received ping ping received pong pong received ping ping received pong pong finished
The pong process never reaches its last line, because it terminates when the ping process exits.
The code that links the processes is:
concurrent.link(pid)
The exit()
function is used to make the calling function quit
abnormally:
concurrent.exit('finished')
It is also possible to trap the exit signal of the terminating process. In this case a special message is received:
concurrent = require 'concurrent' concurrent.setoption('trapexit', true) function pong() while true do local msg = concurrent.receive() if msg.signal == 'EXIT' then break elseif msg.body == 'ping' then print('pong received ping') concurrent.send(msg.from, { body = 'pong' }) end end print('pong finished') end function ping(n, pid) concurrent.link(pid) for i = 1, n do concurrent.send(pid, { from = concurrent.self(), body = 'ping' }) local msg = concurrent.receive() if msg.body == 'pong' then print('ping received pong') end end print('ping finished') concurrent.exit('finished') end pid = concurrent.spawn(pong) concurrent.spawn(ping, 3, pid) concurrent.loop()
The output would be:
pong received ping ping received pong pong received ping ping received pong pong received ping ping received pong pong finished ping finished
There is an option related to process linking that can be set with the
setoption()
function, specifically the trapexit
option:
concurrent.setoption('trapexit', true)
Then the pong process receives a special exit message:
if msg.signal == 'EXIT' then break
Alternatively, monitors that are based on notification messages, can be also used for error handling.
A list of all the available functions and their description.
spawn(body, ...)
Creates a process which will execute the body
function. Any extra arguments can be passed to the executing function. The PID
of the new process is returned. In case of error nil
and an error
message are returned.
spawn(node, body, ...)
Creates a process in a remote node which is a
string in the format 'nodename@hostname'
and the new process will
execute the body function. The PID of the new process is returned. In case of
error nil
and an error message are returned.
self()
Returns the PID of the calling process.
isalive(process)
Checks if the process, which can be specified by
PID or by its registered string name, is alive. Returns true
if
the process is alive, and false
otherwise.
exit(reason)
Exits abnormally the calling process with the
specified reason
string as a cause of exit.
receive([timeout])
Receives the next message in the mailbox of the
calling process. If the mailbox is empty it waits indefinitely for a message
to arrive, unless a timeout
number in milliseconds is specified.
A message of any type, that depends on what was sent, is returned.
send(process, message)
Sends to the destination process
a
message
which can be one of: boolean, number, string, table,
function. Returns true
if the message was send successfully, and
false
if not.
sleep(time)
Suspends implicitly the calling process for the
specified time
, the number of milliseconds.
loop([timeout])
Calls the system's infinite loop which executes
the process scheduler until all the processes have terminated, or unless the
specified timeout
, the number of milliseconds, has
expired.
interrupt()
Interrupts the infinite loop of the process scheduler.
step([timeout])
Executes one step of the process scheduler unless
the specified timeout
, the number of milliseconds, has
expired.
tick()
Forwards the system's clock by one tick.
setoption(key, value)
Sets the key
string option to the
specified value
, the type of which depends on the
option.
getoption(key)
Returns the value of the key
string
option.
init(node)
Makes the runtime system a distributed node. The
first argument is the name string of the node
, which can be
either in 'nodename'
or 'nodename@hostname'
format.
If the 'shortnames'
option is set to true
, then
short names are used instead of fully qualified domain names. If the
'connectall'
option is set to false
, then a fully
connected virtual network between the nodes will not be maintained.
shutdown()
Makes the runtime system stop being a distributed node.
node()
Returns the name of the node the calling process is running on.
nodes()
Returns a table with the nodes that the node the calling process is running on is connected to.
isnodealive()
Returns true
if the local node has
been initialized, and false
otherwise.
monitornode(node)
The calling process starts monitoring the
specified node
, which is a string of the format
'nodename@hostname'
.
demonitornode(node)
The calling process stops monitoring the specified
node
, which is a string of the format
'nodename@hostname'
.
setcookie(secret)
Sets the pre-shared secret
key, a
string, also known as the magic cookie, that will be used for node
authentication.
getcookie()
Returns the pre-shared secret key, also known as the magic cookie, that is being used for node authentication.
register(name, pid)
Registers the name
string for the
given process pid
.
unregister(name)
Unregisters the process with the name
string.
whereis(name)
Returns the PID of the process with the registered
name
string.
registered()
Returns a table with all the registered process names.
link(process)
The calling process gets linked with the specified
process
, which can be either a PID, a registered name, or a
remote process. A remote process is a table with two elements, the remote
process PID or registered name and the node's name in the format
'nodename@hostname'
.
The 'trapexit'
option can be set to true
, if
exit signals between linked processes are to be trapped.
unlink(process)
The calling process gets unlinked with the
specified process
, which can be either a PID, a registered name,
or a remote process. A remote process is a table with two elements, the remote
process PID or registered name and the node's name in the format
'nodename@hostname'
.
spawnlink(body, ...)
Creates a process which will execute the body
function and
the calling function also gets linked to the new process. Any extra
arguments can be passed to the executing function. The PID of the new
process is returned. In case of error nil
and an error message
are returned.
The 'trapexit'
option can be set to true
, if
exit signals between linked processes are to be trapped.
spawnlink(node, body, ...)
Creates a process in a remote node
which is a string in the
format 'nodename@hostname'
, the new process will execute the
body
function, and also the calling process gets linked to the
newly created process. The PID of the new process is returned. In case of
error nil
and an error message are returned.
The 'trapexit'
option can set to true
, if exit
signals between linked processes are to be trapped.
monitor(process)
The calling process starts monitoring the
specified process
, which can be either a PID, a registered name,
or a remote process. A remote process is a table with two elements, the remote
process PID or registered name and the node's name in the format
'nodename@hostname'
.
demonitor(process)
The calling process stops monitoring the specified
process
, which can be either a PID, a registered name, or a
remote process. A remote process is a table with two elements, the remote
process PID or registered name and the node's name in the format
'nodename@hostname'
.
spawnmonitor(body, ...)
Creates a process which will execute the
body
function and the calling function also starts monitoring the new
process. Any extra arguments can be passed to the executing function. The PID
of the new process is returned. In case of error nil
and an error
message are returned.
spawnmonitor(node, body, ...)
Creates a process in a remote node
which is a string in the format 'nodename@hostname'
, the new
process will execute the body
function, and also the calling
process starts monitoring the newly created process. The PID of the new
process is returned. In case of error nil
and an error message
are returned.