Interruption API
Emilua also provides an interruption API that you can use to cancel fibers (you might use it to free resources from fibers stuck in IO requets that might never complete).
The main question that an interruption API needs to answer is how to keep the application in a consistent state. A consistent state is a knowledge that is part of the application and the programmer assumptions, not a knowledge encoded in emilua source code itself. So it is okay to offload some of the responsibility on the application itself.
One dumb’n’quick example that illustrates the problem of a consistent state follows:
local m = mutex.new()
local f = spawn(function()
m:lock()
sleep(2)
m:unlock()
end)
sleep(1)
f:interrupt()
m:lock()
Before a fiber can be discarded at interruption, it needs to restore state invariants and free resources. The GC would be hopeless in the previous example (and many more) because the mutex is shared and still reachable even if we collect the interrupted fiber’s stack. There are other reasons why we can’t rely on the GC for the job.
Windows approach to thread cancellation would be a contract. This contract
requires the programmer to never call a blocking function directly — always
using WaitForMultipleObjects()
. And another rule: pass a cancellation handle
along the call chain for other functions that need to perform blocking calls.
Conceptually, this solution is just the same as Go’s:
select {
case job <- queue:
// ... do job ...
case <- ctx.Done():
// goroutine cancelled
}
The difference being that Go’s Context
is part of the standard library and a
contract everybody adopts. The lesson here is that cancellation is part of the
runtime, or else it just doesn’t work. In Emilua, the runtime is extended to
provide cancellation API inspired by POSIX’s thread cancellation.
I’ve looked many environments, and the only difference I’ve observed between the terms cancellation and interruption is that interruption is used to convey the property of cancellation being implemented in terms of exceptions. When I refer to fiber cancellation within the Emilua runtime, I’ll stick to the term interruption. |
The rest of this document will gloss over many details, but as long as you stay on the common case, you won’t need to keep most of these details in mind (sensible defaults) and for the details that you do need to remember, there is a smaller “recap” section at the end.
Do not copy-paste code snippets surrounded by WARNING blocks. They’re most likely to break your program. Do read the manual to the end. These code snippets are there as intermediate steps for the general picture. |
The lua exception model
It is easy to find a try-catch construct in mainstream languages like so:
try {
// code that might err
} catch (Exception e) {
// error handler
}
// other code
And here’s lua translation of this pattern:
local ok = pcall(function()
-- code that might err
end)
if not ok then
-- error handler
end
-- other code
The main difference here is that lua’s exception mechanism doesn’t integrate
tightly with the type system (and that’s okay). So the catch
-block is always
a catch-all really. Also, the structure initially suggests we don’t need special
syntax for a finally
block:
try {
// code that might err
} catch (Exception e) {
// error handler
} finally {
// cleanup handler
}
// other code
local ok = pcall(function()
-- code that might err
end)
if not ok then
-- error handler
end
-- cleanup handler
-- other code
In sloppy terms, the interruption API just re-schedules the fiber to be resumed but with the fiber stack slightly modified to throw an exception when execution proceeds. This property will trigger stack unwinding to call all the error & cleanup handlers in the reverse order that they were registered.
The interruption protocol
The fiber handle returned by the spawn()
function is the heart to communicate
intent to interrupt a fiber. To better accommodate support for structured
concurrency and not introduce avoidable co-dependency between them, we follow
the POSIX thread cancellation model (Java’s confusing state machine is
ignored). Long story short, once a fiber has been interrupted, it cannot be
un-interrupted.
To interrupt a fiber, just call the interrupt()
function from a fiber handle:
fib:interrupt()
You can only interrupt joinable fibers (but the function is safe to call with any handle at any time). |
Afterwards, you can safely join()
or detach()
the target fiber:
fib:join()
-- ...or
fib:detach()
If you don’t detach a fiber, the GC will do it for you.
It’s that easy. Your fiber doesn’t need to know the target fiber’s internal state and the target fiber doesn’t need to know your fiber' internal state. On the other end, to handle an interruption request is a little trickier.
Handling interruption requests
The key concept required to understand the interruption’s flow is the interruption point. Understand this, and you’ll have learnt how to handle interruption requests.
DefinitionAn interruption point configures a point in your application where it is allowed for the Emilua runtime to stop normal execution flow and raise an exception to trigger stack unwinding if an interruption request from another fiber has been received. |
When the possibility of interruption is added to the table, your mental model has to take into account that calls to certain functions now might throw an error for no other reason but rewind the stack before freeing the fiber.
The only places that are allowed to serve as interruption points are calls to
suspending functions (plus the pcall()
family and coroutine.resume()
for
reasons soon to be explained).
-- this snippet has no interruption points
-- exceptions are never raised here
local i = 0
while true do
i = i + 1
end
The following function doesn’t need to worry about leaving the object self
in
an inconsistent state if the fiber gets interrupted. And the reason for this is
quite simple: this function doesn’t have interruption points (which is usually
the case for functions that are purely compute-bound). It won’t ever be
interrupted in the middle of its work.
function mt:new_sample(sample)
self.mean_ = self.a * sample + (1 - self.a) * self.mean_
self.f = self.a + (1 - self.a) * self.f
end
Functions that suspend the fiber (e.g. IO and functions from the
condition_variable
module) configure interruption points. The function echo
defined below has interruption points.
function echo(sock, buf)
local nread = sock:read(buf) (1)
sock:write(buf, nread) (2)
end
Now take the following code to orchestrate the interaction between two fibers.
local child_fib = spawn(function()
local buf = buffer.new(1024)
echo(global_sock, buf)
end)
child_fib:interrupt()
The mother-fiber doesn’t have interruption points, so it executes til
the end. The child_fib
fiber calls echo()
and echo()
will in turn act as
an interruption point (i.e. the property of being an interruption point
propagates up to the caller functions).
this_fiber.yield() can be used to introduce interruption points for
fibers that otherwise would have none.
|
The mother-fiber doesn’t call any suspending function, so it’ll run until the end and only yields execution back to other fibers when it does end. At the last line, an interruption request is sent to the child fiber. The runtime’s scheduler doesn’t guarantee when the interruption request will be delivered and can schedule execution of the remaining fibers with plenty of freedom given we’re not using any synchronization primitives.
In this simple scenario, it’s quite likely that the interruption request will be
delivered pretty quickly and the call to sock:read()
inside echo()
will
suspend child_fib
just to awake it again but with an exception being raised
instead of the result being returned. The exception will unwind the whole stack
and the fiber finishes.
Any of the interruption points can serve for the fiber to act on the
interruption request. Another possible point where these mechanisms would be
triggered is the sock:write()
suspending function.
The uncaught-hook isn’t called when the exception is fiber_interrupted
so you don’t really have to care about trapping interruption exceptions. You’re
free to just let the stack fully unwind.
|
|
To register a cleanup handler in case the fiber gets interrupted, all you need to do is handle the raised exceptions.
A fiber is always either interrupted or not interrupted. A fiber doesn’t go back to the un-interrupted state. Once the fiber has been interrupted, it’ll stay in this state. The task in hand is to rewind the stack calling the cleanup handlers to keep the application state consistent after the GC collect the fiber — all done by the Emilua runtime.
So you can’t call more suspending functions after the fiber gets interrupted:
local ok, ex = pcall(function()
-- lots of IO ops (1)
end)
if not ok then
watchdog_sock:write(errored_msg) (2)
error(ex)
end
1 | Lots of interruption points. All swallowed by pcall() . |
2 | If fiber gets interrupted at #1 , it won’t init any IO operation here but
instead throw another fiber_interrupted exception. |
The previous snippet has an error. To properly achieve the desired behaviour, you have to temporally disable interruptions in the cleanup handler like so:
local ok, ex = pcall(function()
-- lots of IO ops
end)
if not ok then
this_fiber.disable_interruption()
pcall(function()
watchdog_sock:write(errored_msg)
end)
this_fiber.restore_interruption()
error(ex)
end
this_fiber.restore_interruption() has to be called as many times as
this_fiber.disable_interruption() has been called to restore interruptibility.
|
It looks messy, but this behaviour actually helps the common case to stay clean. Were not for these choices, a common fiber that doesn’t have to handle interruption like the following would accidentally swallow an interruption request and never get collected:
local ok = false
while not ok do
ok = pcall(function()
my_udp_sock:send(notify_msg)
end)
end
And the pcall()
family in itself also configures an interruption point exactly
to make sure that loops like this won’t prevent the fiber from being properly
interrupted. pcall()
family and coroutine.resume()
are the only functions
which aren’t suspending functions but introduce interruption points
nevertheless.
It is guaranteed that This guarantee is useful to build certain concurrency patterns. |
The scope()
facility
The control flow for the common case is good, but handling interruptions right
now is tricky to say the least. To make matters less error-prone, the scope()
family of functions exist.
-
scope()
-
scope_cleanup_push()
-
scope_cleanup_pop()
The scope()
function receives a closure and executes it, but it maintains a
list of cleanup handlers to be called on the exit path (be it reached by the
common exit flow or by a raised exception). When you call it, the list of
cleanup handlers is empty, and you can use scope_cleanup_push()
to register
cleanup handlers. They are executed in the reverse order in which they were
registered. The handlers are called with the interruptions disabled, so you
don’t need to disable them yourself.
It is safe to have nested scope() s.
|
One of the previous examples can now be rewritten as follows:
local child_fib = spawn(function()
local buf = buffer.new(1024)
global_sock_mutex:lock()
scope_cleanup_push(function() global_sock_mutex:unlock() end)
echo(global_sock, buf)
end)
A hairy situation happens when a cleanup handler itself throws an error. The reason why the default uncaught-hook doesn’t terminate the VM when secondary fibers fail is that cleanup handlers are trusted to keep the program invariants. Once a cleanup handler fails we can no longer hold this assumption. Once a cleanup handler itself throws an error, the VM is terminated[1] (there’s no way to recover from this error without context, and conceptually by the time uncaught hooks are executed, the context was already lost). If you need some sort of protection against one complex module that will fail now and then, run it in a separate actor. In C++ this scenario is analogous to a destructor throwing an exception when
the destructor itself was triggered by an exception-provoked stack
unwinding. And the result is the same,
|
If you want to call the last registered cleanup handler and pop it from the
list, just call scope_cleanup_pop()
. scope_cleanup_pop()
receives an
optional argument informing whether the cleanup handler must be executed after
removed from the list (defaulting to true
).
scope(function()
scope_cleanup_push(function()
watchdog_sock:write(errored_msg)
end)
-- lots of IO ops
scope_cleanup_pop(false)
end)
Every fiber has an implicit root scope so you don’t need to always create one
yourself. The standard lua’s pcall()
is also modified to act as a scope which
is a lot of convenience for you.
Given pcall() is also an interruption point, examples written
enclosed in WARNING blocks from the previous section had bugs related to
maintaining invariants and the scope() family is the safest way to register
cleanup handlers.
|
IO objects
It’s not unrealistic to share a single IO object among multiple fibers. The following snippets are based (the original code was not lua’s) on real-world code:
while true do
sleep(20)
write_mutex:lock()
scope_cleanup_push(function() write_mutex:unlock() end)
local ok = pcall(function() ws:ping() end)
if not ok then
return
end
scope_cleanup_pop()
end
while true do
local ok = pcall(function()
-- `app` may call `write_mutex:lock()`
app:consume_subscriptions()
end)
if not ok then
return
end
-- uses `condition_variable`
app:wait_on_subscriptions()
end
local buffer = buffer.new(1024)
while true do
local ok = pcall(function()
local nread = ws:read(buffer)
-- `app` may call `write_mutex:lock()`
app:on_ws_read(buffer, nread)
end)
if not ok then
break
end
end
f1:interrupt()
f2:interrupt()
this_fiber.disable_interruption()
f1:join()
f2:join()
A fiber will never be interrupted in the middle (tricky concept to define) of some IO operation. If a fiber suspended on some IO operation and it was successfully interrupted, it means the operation is not delivered at all and can be tried again later as if it never happened in the first place. The following artificial example illustrates this guarantee (restricting the IO object to a single fiber to keep the code sample small and easy to follow):
scope_cleanup_push(function()
my_sctp_sock:write(checksum.shutdown_msg)
end)
while true do
sleep(20)
my_sctp_sock:write(broadcast_msg)
checksum:update(broadcast_msg)
end
If the interruption request arrives when the fiber is suspended at
my_sctp_sock:write()
, the runtime will schedule cancellation of the underlying
IO operation and only resume the fiber when the reply for the cancellation
request arrives. At this point, if the original IO operation already succeeded,
fiber_interrupted
exception won’t be raised so you have a chance to examine
the result and the interruption handling will be postponed to the next
interruption point.
The pcall() family actually provides the same fundamental
guarantee. Once it starts executing the argument passed, it won’t throw any
fiber_interrupted exception so you have a chance to examine the result of the
executed code. The pcall() family only checks for interruption requests before
executing the argument.
|
Some IO objects might use relaxed semantics here to avoid expensive implementations. For instance, HTTP sockets might close the underlying TCP socket if you cancel an IO operation to avoid bookkeeping state. Refer to their documentation to check when the behaviour uses relaxed semantics. All in all, they should never block indefinitely. That’s a guarantee you can rely on. Preferably, they won’t use a timeout to react on cancellations either (that would be just bad). |
User-level coroutines
Interruptibility is not a property from the coroutine. The coroutine can be created in one fiber, started in a second fiber and resumed in a third one. Interruptibility is a property from the fiber. |
fibonacci = coroutine.create(function()
local a, b = 0, 1
while true do
a, b = b, a + b
coroutine.yield(a)
end
end)
coroutine.resume()
swallows exceptions raised within the coroutine, just like
pcall()
. Therefore, the runtime guarantees coroutine.resume()
enjoys the
same properties found in pcall()
:
-
coroutine.resume()
is an interruption point. -
coroutine.resume()
only checks for interruption requests before resuming the coroutine (i.e. the interruption notification is not fully asynchronous). -
Like
pcall()
,coroutine.create()
will also create a newscope()
for the closure. However, this scope (and any nested one) is independent from the parent fiber and tied not to the enclosing parent fiber’s lexical scopes but to the coroutine lifetime.
We can’t guarantee deterministic resumption of zombie coroutines to (re-)deliver
interruption requests (nor should). Therefore, if the GC collects any of your
unreachable coroutines with remaining scope_cleanup_pop()
to be done, it does
nothing besides collecting the coroutine stack. You have to prepare your code to
cope with this non-guarantee otherwise you most likely will have buggy code.
local co = coroutine.create(function()
m:lock()
-- this handler will never be called
scope_cleanup_push(function() m:unlock() end)
coroutine.yield()
end)
coroutine.resume(co)
The safe bet is to just structure the code in a way that there is no need to
call scope_cleanup_push()
within user-created coroutines.
Recap
The fiber handle returned by spawn()
has an interrupt()
member-function that
can be used to interrupt joinable fibers. The fiber only gets interrupted at
interruption points. To preserve invariants your app relies on, register cleanup
handlers with scope_cleanup_push()
.
The relationship between user-created coroutines and interruptions is tricky. Therefore, you should avoid creating (either manually or through some abstraction) cleanup handlers within them.
this_fiber.disable_interruption()
local numbers = {8, 42, 38, 111, 2, 39, 1}
local sleeper = spawn(function()
local children = {}
scope_cleanup_push(function()
for _, f in pairs(children) do
f:interrupt()
end
end)
for _, n in pairs(numbers) do
children[#children + 1] = spawn(function()
sleep(n)
print(n)
end)
end
for _, f in pairs(children) do
f:join()
end
end)
local sigwaiter = spawn(function()
local sigusr1 = signals.new(signals.SIGUSR1)
sigusr1:wait()
sleeper:interrupt()
end)
sleeper:join()
sigwaiter:interrupt()