ConcurrentLua logo

ConcurrentLua

Concurrency Oriented Programming in Lua

Overview

ConcurrentLua is a system that implements a concurrency model for the Lua programming language. It is based on the share-nothing asynchronous message-passing model that is employed in the Erlang programming language.

ConcurrentLua extends Lua's coroutines with message-passing primitives, in order to support concurrent programming. Distributed programming is supported transparently with the same message-passing primitives.

ConcurrentLua is implemented as a collection of Lua modules that can be loaded by any Lua program. Most of the code is written in Lua itself, with minor parts written in C.

Model description

One of the core elements of ConcurrentLua is the process. A process is a light-weight VM thread, that plays the same role as do processes in an operating system; they don't share memory but instead they communicate through some kind of interprocess communication. These processes can be created and destroyed on demand, and a simple round-robin scheduler passes control to them.

Each process is associated with a mailbox, a message queue for the temporary storage of messages that were sent to the process. A process can check its mailbox for new messages at any time, and if there are any, they can be read in the order of arrival.

Each process is identified by a unique numeric process identifier, or else PID. In addition, aliases or process names can be used instead of PIDs, in order to refer to processes. These aliases and their references are stored in a central repository, the registry. Processes can edit the registry, by adding or deleting entries.

Error handling mechanisms are also provided in the form of monitors and links. With monitors processes can monitor the other processes, and get notified if the monitored processes terminate abnormally. With links processes are bound together, and when one of them terminates abnormally the other one is signalled and terminates, too.

This system also supports distributed programming and all the properties that have been described map naturally onto a distributed system. Distributed processes communicate with the same primitives as local processes.

Distribution is based on a component that is called the node. A node represents a system runtime inside of which processes are executing. Nodes can be connected to each other and communicate, thus forming a virtual network. Distributed processes use this network in order to exchange messages.

Each node has a name associated with it. In order for other nodes to connect to each other by using only this name, a port mapper daemon acts a nameserver. The port mapper daemon has details about the nodes running under the network host that the daemon itself is bound to.

As processes can be created locally, it is also possible to request the creation of processes on remote nodes. A remote process can then be handled as if it was a local process.

If the nodes that form the virtual network are fully connected (every node is connected bidirectionally to each other), global aliases can be used for the processes. The nodes negotiate and maintain a virtual global registry and also keep updated local copies of the registry.

Monitors and links for distributed processes are supported with the same semantics as for local processes. Nodes take care of the task of transparently handling errors between distributed processes. In addition, it is possible for processes to monitor nodes as a whole.

Nodes are required to authenticate before they can communicate. An authenticated node can then be part of the virtual network that the nodes form. A simple security mechanism takes care of this task.

Implementation details

The implementation of ConcurrentLua is based on the Lua component system. The system is organized as a collection of Lua modules and submodules. The main modules are two, and provide the concurrent and distributed programming functionality respectively. One could load only the concurrency module and also for each module there is the option of not loading some of the submodules if the functionality they provide is not needed. A stand-alone port mapper daemon utility is also included.

The processes in the system are implemented with Lua coroutines. A process is actually a Lua coroutine that yields control when the process suspends its execution and resumes control when the process continues its execution.

The scheduling of the processes is still based on the cooperative multithreading model that Lua uses. Processes voluntarily suspend their execution and thus other processes get the chance to run. Nevertheless, the suspending and resuming of processes is partly hidden under a higher level mechanism; a process suspends its execution when waiting for a message to arrive and becomes ready to be resumed when new messages have arrived in its mailbox. A simple round-robin scheduler resumes the processes.

Any type of Lua data, with the exception of memory references, can be sent inside messages. Messages can be booleans, numbers, strings, tables or functions, and any combination of them. Data are automatically serialized on sent and deserialized on receive, and everything is passed by value.

Interprocess communication between nodes, and subsequently between distributed processes, is based on an asynchronous socket handler. This translates to networking model that uses non-blocking sockets and periodic polling. This is the approach that is mostly used today by Lua libraries. Non-blocking semantics should be also used for IO such as files, pipes, etc.

Introduction

Some examples will provide an introduction to the most essential properties of the system, from process creation and message passing to distributed programming and error handling.

Creating processes

Processes are created using the spawn() function. The spawn() function takes at least one argument; the function that contains the command set that the process will execute. Any additional arguments are passed directly as arguments of the function.

The following example demonstrates the creation of a process. The process just prints a message as many times as specified:

concurrent = require 'concurrent'

function hello_world(times)
    for i = 1, times do print('hello world') end
    print('done')
end

concurrent.spawn(hello_world, 3)

concurrent.loop()

The output would be:

hello world
hello world
hello world
done

First the system is loaded:

concurrent = require 'concurrent'

The function that the process will execute is defined next:

function hello_world(times)
    for i = 1, times do print('hello world') end
    print('done')
end

A new process is created:

concurrent.spawn(hello_world, 3)

The system's infinite loop is called last:

concurrent.loop()

Exchanging messages

Processes can exchange messages by using the send() and receive() functions. Also, the self() function can be used to get the PID of the calling process.

The following program implements two processes that exchange messages and then terminate:

concurrent = require 'concurrent'

function pong()
    while true do
        local msg = concurrent.receive()
        if msg.body == 'finished' then
            break
        elseif msg.body == 'ping' then
            print('pong received ping')
            concurrent.send(msg.from, { body = 'pong' })
        end
    end
    print('pong finished')
end

function ping(n, pid)
    for i = 1, n do
        concurrent.send(pid, { from = concurrent.self(), body = 'ping' })
        local msg = concurrent.receive()
        if msg.body == 'pong' then print('ping received pong') end
    end
    concurrent.send(pid, { from = concurrent.self(), body = 'finished' })
    print('ping finished')
end

pid = concurrent.spawn(pong)
concurrent.spawn(ping, 3, pid)

concurrent.loop()

The output would be:

pong received ping
ping received pong
pong received ping
ping received pong
pong received ping
ping received pong
pong finished
ping finished

After the pong process is created, the ping process is supplied with the PID of the pong process:

pid = concurrent.spawn(pong)
concurrent.spawn(ping, 3, pid)

The ping process sends a message:

concurrent.send(pid, { from = concurrent.self(), body = 'ping' })

The pong process waits for a message to arrive and saves it in a variable when it does:

local msg = concurrent.receive()

The pong process replies:

concurrent.send(msg.from, { body = 'pong' })

The pong process terminates after having received a notification from the ping process.

Registering process names

Instead of using process PIDs for sending messages, process names can also be used. The register() function can be used to create an alias for a process in the registry:

concurrent = require 'concurrent'

function pong()
    while true do
        local msg = concurrent.receive()
        if msg.body == 'finished' then
            break
        elseif msg.body == 'ping' then
            print('pong received ping')
            concurrent.send(msg.from, { body = 'pong' })
        end
    end
    print('pong finished')
end

function ping(n)
    for i = 1, n do
        concurrent.send('pong', { from = concurrent.self(), body = 'ping' })
        local msg = concurrent.receive()
        if msg.body == 'pong' then print('ping received pong') end
    end
    concurrent.send('pong', { from = concurrent.self(), body = 'finished' })
    print('ping finished')
end

pid = concurrent.spawn(pong)
concurrent.register('pong', pid)
concurrent.spawn(ping, 3)

concurrent.loop()

The only change from the previous example is the destination that the ping process sends messages to:

concurrent.send('pong', { from = concurrent.self(), body = 'ping' })

And:

concurrent.send('pong', { from = concurrent.self(), body = 'finished' })

And the pong process now registers its name:

concurrent.register('pong', pid)

Therefore the ping process isn't supplied with the PID of the pong process.

Distributed message passing

Processes in different nodes can still communicate with the same message passing primitives. Remote processes are denoted by their PID or alias and the node they are executing under. The previous example could be broken into two programs, one for each process.

The code for the pong process:

concurrent = require 'concurrent'

function pong()
    while true do
        local msg = concurrent.receive()
        if msg.body == 'finished' then
            break
        elseif msg.body == 'ping' then
            print('pong received ping')
            concurrent.send(msg.from, { body = 'pong' })
        end
    end
    print('pong finished')
end

concurrent.init('pong@gaia')

pid = concurrent.spawn(pong)

concurrent.register('pong', pid)
concurrent.loop()
concurrent.shutdown()

And the code for the ping process:

concurrent = require 'concurrent'

function ping(n)
    for i = 1, n do
        concurrent.send({ 'pong', 'pong@gaia' },
                        { from = { concurrent.self(), concurrent.node() },
                          body = 'ping' })
        local msg = concurrent.receive()
        if msg.body == 'pong' then print('ping received pong') end
    end
    concurrent.send({ 'pong', 'pong@gaia' },
                    { from = { concurrent.self(), concurrent.node() },
                      body = 'finished' })
    print('ping finished')
end

concurrent.spawn(ping, 3)

concurrent.init('ping@selene')
concurrent.loop()
concurrent.shutdown()

The output of the pong process would be:

pong received ping
pong received ping
pong received ping
pong finished

And the output of the ping process would be:

ping received pong
ping received pong
ping received pong
ping finished

In this example the runtime system is running in distributed mode. In order for this to happen, first the port mapper daemon has to be started. This can done by typing in a command line shell:

$ clpmd

The code that initializes the node that the pong process is running on:

concurrent.init('pong@gaia')

And the code for the ping process:

concurrent.init('ping@selene')

The previous two code snippets register to the port mapper daemon, the port that each node is listening to. Both nodes unregister their port with:

concurrent.shutdown()

The only other changes in this example are the destination that the messages are sent to, along with the introduction of the node() function that returns the name of the node that the calling process is running on:

concurrent.send({ 'pong', 'pong@gaia' },
                { from = { concurrent.self(), concurrent.node() },
                  body = 'ping' })

And later:

concurrent.send({ 'pong', 'pong@gaia' },
                { from = { concurrent.self(), concurrent.node() },
                  body = 'finished' })

Handling error

One approach to handle errors in processes is the notion of linked processes. Two processes are bound together and if one of them terminates abnormally the other one terminates, too. The link() function can be used to link processes:

concurrent = require 'concurrent'

function ping(n, pid)
    concurrent.link(pid)
    for i = 1, n do
        concurrent.send(pid, { from = concurrent.self(), body = 'ping' })
        local msg = concurrent.receive()
        if msg.body == 'pong' then print('ping received pong') end
    end
    print('ping finished')
    concurrent.exit('finished')
end

function pong()
    while true do
        local msg = concurrent.receive()
        if msg.body == 'ping' then
            print('pong received ping')
            concurrent.send(msg.from, { body = 'pong' })
        end
    end
    print('pong finished')
end

pid = concurrent.spawn(pong)
concurrent.spawn(ping, 3, pid)

concurrent.loop()

The output would be:

pong received ping
ping received pong
pong received ping
ping received pong
pong received ping
ping received pong
pong finished

The pong process never reaches its last line, because it terminates when the ping process exits.

The code that links the processes is:

concurrent.link(pid)

The exit() function is used to make the calling function quit abnormally:

concurrent.exit('finished')

It is also possible to trap the exit signal of the terminating process. In this case a special message is received:

concurrent = require 'concurrent'

concurrent.setoption('trapexit', true)

function pong()
    while true do
        local msg = concurrent.receive()
        if msg.signal == 'EXIT' then
            break
        elseif msg.body == 'ping' then
            print('pong received ping')
            concurrent.send(msg.from, { body = 'pong' })
        end
    end
    print('pong finished')
end

function ping(n, pid)
    concurrent.link(pid)
    for i = 1, n do
        concurrent.send(pid, { from = concurrent.self(), body = 'ping' })
        local msg = concurrent.receive()
        if msg.body == 'pong' then print('ping received pong') end
    end
    print('ping finished')
    concurrent.exit('finished')
end

pid = concurrent.spawn(pong)
concurrent.spawn(ping, 3, pid)

concurrent.loop()

The output would be:

pong received ping
ping received pong
pong received ping
ping received pong
pong received ping
ping received pong
pong finished
ping finished

There is an option related to process linking that can be set with the setoption() function, specifically the trapexit option:

concurrent.setoption('trapexit', true)

Then the pong process receives a special exit message:

if msg.signal == 'EXIT' then
    break

Alternatively, monitors that are based on notification messages, can be also used for error handling.

Reference

A list of all the available functions and their description.

Processes

spawn(body, ...)

Creates a process which will execute the body function. Any extra arguments can be passed to the executing function. The PID of the new process is returned. In case of error nil and an error message are returned.

spawn(node, body, ...)

Creates a process in a remote node which is a string in the format 'nodename@hostname' and the new process will execute the body function. The PID of the new process is returned. In case of error nil and an error message are returned.

self()

Returns the PID of the calling process.

isalive(process)

Checks if the process, which can be specified by PID or by its registered string name, is alive. Returns true if the process is alive, and false otherwise.

exit(reason)

Exits abnormally the calling process with the specified reason string as a cause of exit.

Messages

receive([timeout])

Receives the next message in the mailbox of the calling process. If the mailbox is empty it waits indefinitely for a message to arrive, unless a timeout number in milliseconds is specified. A message of any type, that depends on what was sent, is returned.

send(process, message)

Sends to the destination process a message which can be one of: boolean, number, string, table, function. Returns true if the message was send successfully, and false if not.

Scheduling

sleep(time)

Suspends implicitly the calling process for the specified time, the number of milliseconds.

loop([timeout])

Calls the system's infinite loop which executes the process scheduler until all the processes have terminated, or unless the specified timeout, the number of milliseconds, has expired.

interrupt()

Interrupts the infinite loop of the process scheduler.

step([timeout])

Executes one step of the process scheduler unless the specified timeout, the number of milliseconds, has expired.

tick()

Forwards the system's clock by one tick.

Options

setoption(key, value)

Sets the key string option to the specified value, the type of which depends on the option.

getoption(key)

Returns the value of the key string option.

Node

init(node)

Makes the runtime system a distributed node. The first argument is the name string of the node, which can be either in 'nodename' or 'nodename@hostname' format.

If the 'shortnames' option is set to true, then short names are used instead of fully qualified domain names. If the 'connectall' option is set to false, then a fully connected virtual network between the nodes will not be maintained.

shutdown()

Makes the runtime system stop being a distributed node.

node()

Returns the name of the node the calling process is running on.

nodes()

Returns a table with the nodes that the node the calling process is running on is connected to.

isnodealive()

Returns true if the local node has been initialized, and false otherwise.

monitornode(node)

The calling process starts monitoring the specified node, which is a string of the format 'nodename@hostname'.

demonitornode(node)

The calling process stops monitoring the specified node, which is a string of the format 'nodename@hostname'.

Security

setcookie(secret)

Sets the pre-shared secret key, a string, also known as the magic cookie, that will be used for node authentication.

getcookie()

Returns the pre-shared secret key, also known as the magic cookie, that is being used for node authentication.

Registering

register(name, pid)

Registers the name string for the given process pid.

unregister(name)

Unregisters the process with the name string.

whereis(name)

Returns the PID of the process with the registered name string.

registered()

Returns a table with all the registered process names.

Linking

link(process)

The calling process gets linked with the specified process, which can be either a PID, a registered name, or a remote process. A remote process is a table with two elements, the remote process PID or registered name and the node's name in the format 'nodename@hostname' .

The 'trapexit' option can be set to true, if exit signals between linked processes are to be trapped.

unlink(process)

The calling process gets unlinked with the specified process, which can be either a PID, a registered name, or a remote process. A remote process is a table with two elements, the remote process PID or registered name and the node's name in the format 'nodename@hostname'.

spawnlink(body, ...)

Creates a process which will execute the body function and the calling function also gets linked to the new process. Any extra arguments can be passed to the executing function. The PID of the new process is returned. In case of error nil and an error message are returned.

The 'trapexit' option can be set to true, if exit signals between linked processes are to be trapped.

spawnlink(node, body, ...)

Creates a process in a remote node which is a string in the format 'nodename@hostname', the new process will execute the body function, and also the calling process gets linked to the newly created process. The PID of the new process is returned. In case of error nil and an error message are returned.

The 'trapexit' option can set to true, if exit signals between linked processes are to be trapped.

Monitoring

monitor(process)

The calling process starts monitoring the specified process, which can be either a PID, a registered name, or a remote process. A remote process is a table with two elements, the remote process PID or registered name and the node's name in the format 'nodename@hostname'.

demonitor(process)

The calling process stops monitoring the specified process, which can be either a PID, a registered name, or a remote process. A remote process is a table with two elements, the remote process PID or registered name and the node's name in the format 'nodename@hostname'.

spawnmonitor(body, ...)

Creates a process which will execute the body function and the calling function also starts monitoring the new process. Any extra arguments can be passed to the executing function. The PID of the new process is returned. In case of error nil and an error message are returned.

spawnmonitor(node, body, ...)

Creates a process in a remote node which is a string in the format 'nodename@hostname', the new process will execute the body function, and also the calling process starts monitoring the newly created process. The PID of the new process is returned. In case of error nil and an error message are returned.