Working with streams

Streams are one of the fundamental concepts one has to deal with when working on IO. Streams represent channels where data flows as slices of bytes respecting certain properties (e.g. ordering).

Emilua exposes two concepts to work with streams. Write streams are objects that implement the method write_some():

write_some(self, buffer: byte_span) → integer

Writes buffer into the stream and returns the number of bytes written.

Similarly, read streams are objects that implement the method read_some():

read_some(self, buffer: byte_span) → integer

Reads into buffer and returns the number of bytes read.

Exceptions are used to communicate errors.

When the type of the stream is not informed (i.e. read or write), it’s safe to assume the stream object implements both interfaces. Pipes are unidirectional, and separate classes exist to deal with each. On the other hand, TCP sockets are bidirectional and data can flow from any direction. Furthermore, many sockets allow one to shutdown one communication end so they can work unidirectionally as well.

Short reads and short writes

Streams represent streams of bytes, with no implied message boundaries.

Each operation on a stream roughly maps to a single syscall[1], and it may transfer fewer bytes than requested. This is referred to as a short read or short write.

Reasons why short writes occur include out of buffer space in kernels that don’t expose proactors. The rationale for short reads is more obvious, and it should stay as an exercise for the reader (no pun intended).

To recover from short reads and short writes, one just has to try the operation again adjusting the buffer offsets. For instance, to fully drain the buffer for a write operation:

while #buffer > 0 do
    local nwritten = stream:write_some(buffer)
    buffer = buffer:slice(1 + nwritten)
end

The module stream already contains many of such algorithms. You may come up with your own algorithms as well taking the business rules of your application into consideration (e.g. combining newly arrived data into the next calls to write_some()). Alternatively, if you don’t need portable code, and the underlying system offers extra guarantees, you may do away with some of this complexity.

Layering

Streams of bytes by themselves are hardly useful for application developers. Many patterns exist to have structured data on top:

  • Fixed-length records (binary protocols).

  • Fixed-length header + variably-sized data payload (binary protocols).

  • Records delimited by certain character sequences (textual protocols).

  • Combinations of the above (e.g. HTTP starts with a textual protocol of CRLF-delimited fields, and it might change to a fixed-length payload to read the body, and maybe change yet again to a textual protocol to extract the resulting JSON data).

Given a single protocol might require multiple strategies, it’s important to offer algorithms that don’t monopolize the stream object to themselves. The algorithms should be composable. The algorithms found in the module stream follow this guideline.

This composition of algorithms naturally build layers:

  • Raw IO. The IO interfaces exposed by the OS. There’s no interface for peeking data or putting data back. Once the data is extracted out of the stream, it’s your responsibility to save it until needed.

  • Buffered IO. Just as short reads might happen, so can "long" reads. Upon dispatching the message for processing that includes data until the delimiter, you must be careful to not discard extra data that represents the start of the next message. Buffered IO is built on top of raw IO by managing an user-space buffer (and an associated index for the current message) alongside with the IO object.

  • Formatted IO. Built on top of buffered IO integrating a parser (for input), and/or a generator (for output). Now the user is no longer interacting with slices of bytes, but properly structured data and messages.

It’s always easier to work with high-level formatted IO than low-level raw IO. However, when an implementation for the target protocol doesn’t exist, you may have no other choice.

Emilua offers stream.scanner(3em) for generic formatted textual input.

Composed operations

As it may already be clear by now, many algorithms are compositions of raw IO operations. Unless the IO object synchronizes access on its own (and explicitly says so), you should be careful to not initiate extra IO operations that might affect the already in-flight operations for that object.

Concurrent writers operating on the same IO object is a common gotcha that causes corrupt streams during high-load scenarios (if "atomic" writes are not guaranteed by the underlying system). Suppose you’re generating line-delimited JSON objects on a UNIX stream socket. You’re collecting info from various system services (e.g. "/run/acpid.socket"), and for each event, you generate a new JSON object.

muxing_services_info

In other words, you’re multiplexing information from assorted sources. The same can happen on the web when you’re orchestrating microservices and dumping information on a WebSocket channel. Now, back to our example, if a short write happens, you might end up in the following state:

corrupt_stream_on_composed_short_reads

In other words, one of the messages didn’t fit in the kernel buffer, then stream.write_all() retried the operation to drain the buffer. However there was already another in-flight write operation, and it was scheduled first than buf1:slice(N1)). The end result will be a stream where the second message is inserted in the middle of another message (a corrupt stream):

corrupt_stream_on_composed_short_reads_result
This problem is not exclusive to async IO frameworks. The same behavior can be observed if you code for blocking APIs making use of threads to achieve concurrency.

To solve this problem, you should create a mutex to protect the write end of the stream:

scope(function()
    stream_write_mtx:lock()
    scope_cleanup_push(function() stream_write_mtx:unlock() end)
    stream.write_all(stream, event_json)
end)

Other network frameworks for scripting languages try to solve the problem transparently by making use of an unbounded write buffer under the hood. However that’s solving the issue in the wrong layer. If a write buffer is always used, the network framework can no longer appropriately communicate which user-issued write operation caused an error. The way such frameworks implement this solution is actually way worse as they face back-pressure issues as well, and have to band-aid patch the API all over.

Emilua will not inappropriately entangle all IO layers — raw IO, buffered IO, formatted IO — together. When you do want to make use of shared write buffers, you can write your own socket + the buffer (and mutex) to abstract this pattern in a way that won’t cause problems to your application.

Do notice that such problems don’t exist when composed operations use operations that don’t overlap each other. For instance, you can use stream.read_all() and stream.write_all() on the same object with no synchronization because such use won’t perform concurrent write_some() calls nor concurrent read_some() calls.

Why EOF is an error

Same rationale as Boost.Asio[2]:

  • The end of a stream can cause stream.read_all(3em), stream.read_at_least(3em), and other composed operations to violate their contract (e.g. a read of N bytes may finish early due to EOF).

  • An EOF error may be used to distinguish the end of a stream from a successful read of size 0.


1. That applies to IO objects that expose system resources (e.g. TCP sockets). Higher-level abstractions built in user-space (e.g. TLS sockets) don’t apply.