For the latest stable version, please use Emilua API 0.10! |
Working with streams
Streams are one of the fundamental concepts one has to deal with when working on IO. Streams represent channels where data flows as slices of bytes respecting certain properties (e.g. ordering).
Emilua exposes two concepts to work with streams. Write streams are objects that
implement the method write_some()
:
write_some(self, buffer: byte_span) → integer
-
Writes
buffer
into the stream and returns the number of bytes written.
Similarly, read streams are objects that implement the method read_some()
:
read_some(self, buffer: byte_span) → integer
-
Reads into
buffer
and returns the number of bytes read.
Exceptions are used to communicate errors.
When the type of the stream is not informed (i.e. read or write), it’s safe to assume the stream object implements both interfaces. Pipes are unidirectional, and separate classes exist to deal with each. On the other hand, TCP sockets are bidirectional and data can flow from any direction. Furthermore, many sockets allow one to shutdown one communication end so they can work unidirectionally as well.
Short reads and short writes
Streams represent streams of bytes, with no implied message boundaries.
Each operation on a stream roughly maps to a single syscall[1], and it may transfer fewer bytes than requested. This is referred to as a short read or short write.
Reasons why short writes occur include out of buffer space in kernels that don’t expose proactors. The rationale for short reads is more obvious, and it should stay as an exercise for the reader (no pun intended).
To recover from short reads and short writes, one just has to try the operation again adjusting the buffer offsets. For instance, to fully drain the buffer for a write operation:
while #buffer > 0 do
local nwritten = stream:write_some(buffer)
buffer = buffer:slice(1 + nwritten)
end
The module stream
already contains many of such algorithms. You may come up
with your own algorithms as well taking the business rules of your application
into consideration (e.g. combining newly arrived data into the next calls to
write_some()
). Alternatively, if you don’t need portable code, and the
underlying system offers extra guarantees, you may do away with some of this
complexity.
Layering
Streams of bytes by themselves are hardly useful for application developers. Many patterns exist to have structured data on top:
-
Fixed-length records (binary protocols).
-
Fixed-length header + variably-sized data payload (binary protocols).
-
Records delimited by certain character sequences (textual protocols).
-
Combinations of the above (e.g. HTTP starts with a textual protocol of CRLF-delimited fields, and it might change to a fixed-length payload to read the body, and maybe change yet again to a textual protocol to extract the resulting JSON data).
Given a single protocol might require multiple strategies, it’s important to
offer algorithms that don’t monopolize the stream object to themselves. The
algorithms should be composable. The algorithms found in the module stream
follow this guideline.
This composition of algorithms naturally build layers:
-
Raw IO. The IO interfaces exposed by the OS. There’s no interface for peeking data or putting data back. Once the data is extracted out of the stream, it’s your responsibility to save it until needed.
-
Buffered IO. Just as short reads might happen, so can "long" reads. Upon dispatching the message for processing that includes data until the delimiter, you must be careful to not discard extra data that represents the start of the next message. Buffered IO is built on top of raw IO by managing an user-space buffer (and an associated index for the current message) alongside with the IO object.
-
Formatted IO. Built on top of buffered IO integrating a parser (for input), and/or a generator (for output). Now the user is no longer interacting with slices of bytes, but properly structured data and messages.
It’s always easier to work with high-level formatted IO than low-level raw IO. However, when an implementation for the target protocol doesn’t exist, you may have no other choice.
Emilua offers stream.scanner(3em) for generic formatted textual input.
Composed operations
As it may already be clear by now, many algorithms are compositions of raw IO operations. Unless the IO object synchronizes access on its own (and explicitly says so), you should be careful to not initiate extra IO operations that might affect the already in-flight operations for that object.
Concurrent writers operating on the same IO object is a common gotcha that
causes corrupt streams during high-load scenarios (if "atomic" writes are not
guaranteed by the underlying system). Suppose you’re generating line-delimited
JSON objects on a UNIX stream socket. You’re collecting info from various system
services (e.g. "/run/acpid.socket"
), and for each event, you generate a new
JSON object.
In other words, you’re multiplexing information from assorted sources. The same can happen on the web when you’re orchestrating microservices and dumping information on a WebSocket channel. Now, back to our example, if a short write happens, you might end up in the following state:
In other words, one of the messages didn’t fit in the kernel buffer, then
stream.write_all()
retried the operation to drain the buffer. However there
was already another in-flight write operation, and it was scheduled first than
buf1:slice(N1))
. The end result will be a stream where the second message is
inserted in the middle of another message (a corrupt stream):
This problem is not exclusive to async IO frameworks. The same behavior can be observed if you code for blocking APIs making use of threads to achieve concurrency. |
To solve this problem, you should create a mutex to protect the write end of the stream:
scope(function()
stream_write_mtx:lock()
scope_cleanup_push(function() stream_write_mtx:unlock() end)
stream.write_all(stream, event_json)
end)
Other network frameworks for scripting languages try to solve the problem transparently by making use of an unbounded write buffer under the hood. However that’s solving the issue in the wrong layer. If a write buffer is always used, the network framework can no longer appropriately communicate which user-issued write operation caused an error. The way such frameworks implement this solution is actually way worse as they face back-pressure issues as well, and have to band-aid patch the API all over.
Emilua will not inappropriately entangle all IO layers — raw IO, buffered IO, formatted IO — together. When you do want to make use of shared write buffers, you can write your own socket + the buffer (and mutex) to abstract this pattern in a way that won’t cause problems to your application.
Do notice that such problems don’t exist when composed operations use operations
that don’t overlap each other. For instance, you can use stream.read_all()
and
stream.write_all()
on the same object with no synchronization because such use
won’t perform concurrent write_some()
calls nor concurrent read_some()
calls.
Why EOF is an error
Same rationale as Boost.Asio[2]:
-
The end of a stream can cause stream.read_all(3em), stream.read_at_least(3em), and other composed operations to violate their contract (e.g. a read of N bytes may finish early due to EOF).
-
An EOF error may be used to distinguish the end of a stream from a successful read of size 0.