In a previous article I discussed using Generators for streaming data back to the caller. The Generators function is a synchronous function that allows lazy iteration over a collection of items as the caller requests them. Even though the standard Generator is synchronous, as the previous article shows, a Generator can be returned from a Promise. This allows you to perform some asynchronous action first and then use the Generator to yield those items one at a time. This is extremely useful for things like performing a database query and then using a streaming API to yield each record.
We run into limitations when we need to perform some asynchronous action inside of the Generator function. We saw an example of how this fails with converting a file stream into a Generator. Fortunately, AsyncGenerators comes to the rescue!
This article will discuss the basics of AsyncGenerators and then dig into where the previous article left off: converting a Stream into an AsyncGenerator.
Meet AsyncGenerator
Let's start with a simple example. AsyncGenerators functions are actually very similar to a synchronous Generator function. Recall that a basic Generator looks something like:
function* numGen() {
for(let i = 1; i <= 10; i++) {
yield i;
}
}
We can use it most easily from a for...of
iterator
function run() {
let gen = numGen();
for(let num of gen) {
conosole.log(num);
}
}
Working with AsyncGenerators is actually pretty straightforward. There are two differences:
- The Generator function is marked as
async
- We use the
for await...of
iterator syntax
Our AsyncGenerator then becomes something like:
async function* numGen() {
for(let i = 1; i < 10; i++) {
yield i;
}
}
Now this example doesn't do anything asynchronous in the Generator function, but if we wanted to, we could call and await on the completion of some async processing between each yield
in the generator function. Or anywhere in the Generator function.
And we now consume the AsyncGenerator with:
async function run() {
let gen = numGen();
for await (let num of gen) {
console.log(num);
}
}
So let's add some async code into the mix and see what happens. Instead of numGen
simply iterating the values 1..10
we will have it retrieve a number from an async function.
function getNum() {
return new Promise(resolve => {
setTimeout(() => {
resolve(Math.rand());
}, 100);
});
}
getNum
is a simple async function that resolves with a random number after a short delay.
We change the AsyncGenerator function to now call getNum
at each step in the loop!
async function* numGen() {
for(let i = 1; i < 10; i++) {
let val = await getNum();
yield val;
}
}
Because our AsyncGenerator function is an async
function, we can use await
to block execution and retrieve a value from our async function getNum
. Next we yield that value by calling yield val
. This again blocks execution of the function until a consumer reads a value.
We consume the data in the exact same manner as before using for await...of
syntax:
async function run() {
let gen = numGen();
for await (let num of gen) {
console.log(num);
}
}
Now that you have an understanding of the mechanics of AsyncGenerators we will revisit where we left off in the prior article by trying to convert a file stream into a byte Generator. AsyncGenerators will allow us to get past the 65k limit we encountered with a standard Generator.
Streams to AsyncGenerator
A quick refresher on Node.js streams. A Node.js stream can operate in two modes flowing
and paused
. In paused
mode, we directly read data off the stream using the read()
method. This gives us finer grained control over the reading process but comes with more complexity. When we call read()
, if data is not available on the stream, it will return null
and will require us to wait for the readable
event before we can read more data.
For instance, below is a method that uses paused
mode to construct a single Buffer from the contents of the stream.
// Reads the data from stream in chunk and returns a Buffer with the
// concatenated chunks of data. We wrap the eventing results in a
// Promise to signal completion once the reading is finished.
function readToEnd(stream: stream.Readable, chunkSize: number): Promise<Buffer> {
return new Promise((resolve, reject) => {
// Capture the chunks and we will combine it at the end.
let buffers: Buffer[] = [];
// Read function expression that will read chunks until the
// Stream is no longer readable. This method gets called each
// time there is a readable event.
const read = () => {
while (stream.readable) {
// First try to read the chunk size, but if that fails
// then try reading the remainder of the stream.
let buf = stream.read(chunkSize) || stream.read();
// Either add the contents or there was no data and we
// are no longer readable and need to break.
if (buf) buffers.push(buf);
else break;
}
};
// Every readable event we trigger the read method.
stream.on("readable", read);
// Reject our promise if we have failures.
stream.on("error", e => reject(e));
// Resolve our promise by concatenating all of our chunks into
// a single Buffer
stream.on("end", () => resolve(Buffer.concat(buffers)));
});
}
This code uses the readable
event to trigger reads. These reads consume all of the information on the stream until the stream is no longer readable. When all of the data that the stream will ever have is consumed, the end
event fires and we combine the Buffers into a single Buffer.
Looking at this code, you can see that we trigger processing when readable
fires. We can abstract the firing of this event to a single method that blocks until the readable
event fires.
Let's consider how we would create a signal to wait for the stream to enter the readable
state. The stream is initially not in the readable
state. It then transitions to readable
when data is available. We can then read chunks of data until we have consumed the information on the stream. The stream will then receive more data and fire the readable
event again.
The cool thing is we can simply wrap this event in a Promise that resolves when the event has fired. We can then await this function and it will block until the event fires.
// Resolves when the stream fires its next `readable` event. We use the
// event `once` method so that it only ever fires on the next `readable`
// event
async function signalReadable(reader: stream.Readable) {
return new Promise(resolve => {
reader.once("readable", resolve);
});
}
This method will allow us to await signalReadable(stream);
and pause the current loop until the stream is readable again.
We will create one other helper that waits for the end
event in the same manner.
// Resolves when the stream fires the `end` event. We use the `once`
// method so that the promise only resolves once.
async function signalEnd(reader: stream.Readable) {
return new Promise(resolve => {
reader.once("end", resolve);
});
}
We now have all the pieces we need to convert a stream into an AsyncGenerator. The function below will return an AsyncGenerator that allows us to read from the stream in chunks of data. For example, if we wanted to read 1 byte at a time:
async runner() {
const byteGen = streamToAsyncGenerator(reader, 1);
let i = 0;
for await (const byte of byteGen) {
console.log(++i, byte);
}
}
The function below is responsible for delivering this functionality.
// Converts a stream into an AsyncGenerator that allows reading bytes
// of data from the stream in the chunk size specified. This function
// has some similarities to the `streamToGenerator` function.
function streamToAsyncGenerator<T>(
reader: stream.Readable,
chunkSize?: number,
): AsyncGenerator<T, void, unknown> {
// Immediately invoke the AsyncGenerator function which will closure
// scope the stream and returns the AsyncGenerator instance
return (async function* genFn() {
// Construct a promise that will resolve when the Stream has
// ended. We use it below as a conditional resolution of the
// readable and end events.
const endPromise = signalEnd(reader);
// Loop until the readable stream stops being readable! This
// property is available in Node 12.9. We could also check the
// status of the endPromise to see if it is resolved yet.
while (!reader.readableEnded) {
// Next, similar to readToEnd function, we loop on the
// Stream until we have read all of the data that we
// can from the stream.
while (reader.readable) {
// First try to read the chunk size, but if that fails
// then try reading the remainder of the stream.
let val = reader.read(chunkSize) || reader.read();
// Either yield the contents to our generator or there
// was no data and we are no longer readable and need
// to wait for more info
if (val) yield val;
else break;
}
// We are no longer readable and one of two things will
// happen now: `readable` or `end` will fire. We construct
// a new `readable` signal to wait for the next signal.
const readablePromise = signalReadable(reader);
// We wait for either the `end` or `readable` event to fire
await Promise.race([endPromise, readablePromise]);
}
})();
}
This function immediately invokes our AsyncGenerator function and returns the AsyncGenerator. This AsyncGenerator simply yields the contents of the stream as it becomes available.
The function periodically uses our stream signal functions signalReadable
and signalEnded
to check the state of the stream when there is no more data to read. This is where the async nature of the AsyncGenerator comes into play. We use the neat Promise feature Promise.race
to wait on either one of these conditions.
Conclusion
Hopefully you have a better idea of how AsyncGenerators can be used with streams. AsyncGenerators allow us to do the same functionality as a standard Generator but we can call an async function from within the Generator function. As you can see, this works pretty well for converting a traditional stream into
One other neat use case for AsyncGenerators is to union multiple async results together in a single pipeline. In this regard you can use an async function to load the iterate and yield the contents of source A, then make an async call to yield the contents of source B. I've used this quite well for yield values for different message types in our Lightning Network implementation.