In a previous article I discussed using Generators for streaming data back to the caller.  The Generators function is a synchronous function that allows lazy iteration over a collection of items as the caller requests them. Even though the standard Generator is synchronous, as the previous article shows, a Generator can be returned from a Promise. This allows you to perform some asynchronous action first and then use the Generator to yield those items one at a time. This is extremely useful for things like performing a database query and then using a streaming API to yield each record.

We run into limitations when we need to perform some asynchronous action inside of the Generator function. We saw an example of how this fails with converting a file stream into a Generator. Fortunately, AsyncGenerators comes to the rescue!

This article will discuss the basics of AsyncGenerators and then dig into where the previous article left off: converting a Stream into an AsyncGenerator.

Meet AsyncGenerator

Let's start with a simple example. AsyncGenerators functions are actually very similar to a synchronous Generator function. Recall that a basic Generator looks something like:

function* numGen() {
    for(let i = 1; i <= 10; i++) {
        yield i;
    }
}

We can use it most easily from a for...of iterator

function run() {
    let gen = numGen();

    for(let num of gen) {
        conosole.log(num);
    }
}

Working with AsyncGenerators is actually pretty straightforward. There are two differences:

  1. The Generator function is marked as async
  2. We use the for await...of iterator syntax

Our AsyncGenerator then becomes something like:

async function* numGen() {
    for(let i = 1; i < 10; i++) {
        yield i;
    }
}

Now this example doesn't do anything asynchronous in the Generator function, but if we wanted to, we could call and await on the completion of some async processing between each yield in the generator function. Or anywhere in the Generator function.

And we now consume the AsyncGenerator with:

async function run() {
    let gen = numGen();
    for await (let num of gen) {
        console.log(num);
    }
}

So let's add some async code into the mix and see what happens. Instead of numGen simply iterating the values 1..10 we will have it retrieve a number from an async function.  

function getNum() {
    return new Promise(resolve => {
        setTimeout(() => {
            resolve(Math.rand());
        }, 100);
    });
}

getNum is a simple async function that resolves with a random number after a short delay.

We change the AsyncGenerator function to now call getNum at each step in the loop!

async function* numGen() {
    for(let i = 1; i < 10; i++) {
        let val = await getNum();
        yield val;
    }
}

Because our AsyncGenerator function is an async function, we can use await to block execution and retrieve a value from our async function getNum. Next we  yield that value by calling yield val. This again blocks execution of the function until a consumer reads a value.

We consume the data in the exact same manner as before using for await...of syntax:

async function run() {
    let gen = numGen();
    for await (let num of gen) {
        console.log(num);
    }
}

Now that you have an understanding of the mechanics of AsyncGenerators we will revisit where we left off in the prior article by trying to convert a file stream into a byte Generator. AsyncGenerators will allow us to get past the 65k limit we encountered with a standard Generator.

Streams to AsyncGenerator

A quick refresher on Node.js streams. A Node.js stream can operate in two modes flowing and paused.  In paused mode,  we directly read data off the stream using the read() method. This gives us finer grained control over the reading process but comes with more complexity. When we call read(), if data is not available on the stream, it will return null and will require us to wait for the readable event before we can read more data.  

For instance, below is a method that uses paused mode to construct a single Buffer from the contents of the stream.

// Reads the data from stream in chunk and returns a Buffer with the
// concatenated chunks of data. We wrap the eventing results in a 
// Promise to signal completion once the reading is finished.
function readToEnd(stream: stream.Readable, chunkSize: number): Promise<Buffer> {
    return new Promise((resolve, reject) => {
        // Capture the chunks and we will combine it at the end.
        let buffers: Buffer[] = [];

        // Read function expression that will read chunks until the
        // Stream is no longer readable. This method gets called each
        // time there is a readable event.
        const read = () => {
            while (stream.readable) {
                // First try to read the chunk size, but if that fails
                // then try reading the remainder of the stream.
                let buf = stream.read(chunkSize) || stream.read();

                // Either add the contents or there was no data and we
                // are no longer readable and need to break.
                if (buf) buffers.push(buf);
                else break;
            }
        };

        // Every readable event we trigger the read method.
        stream.on("readable", read);

        // Reject our promise if we have failures.
        stream.on("error", e => reject(e));

        // Resolve our promise by concatenating all of our chunks into
        // a single Buffer
        stream.on("end", () => resolve(Buffer.concat(buffers)));
    });
}

This code uses the readable event to trigger reads. These reads consume all of the information on the stream until the stream is no longer readable.  When all of the data that the stream will ever have is consumed, the end event fires and we combine the Buffers into a single Buffer.

Looking at this code, you can see that we trigger processing when readable fires. We can abstract the firing of this event to a single method that blocks until the readable event fires.

Let's consider how we would create a signal to wait for the stream to enter the readable state.  The stream is initially not in the readable state. It then transitions to readable when data is available. We can then read chunks of data until we have consumed the information on the stream. The stream will then receive more data and fire the readable event again.  

The cool thing is we can simply wrap this event in a Promise that resolves when the event has fired. We can then await this function and it will block until the event fires.

// Resolves when the stream fires its next `readable` event. We use the 
// event `once` method so that it only ever fires on the next `readable`
// event
async function signalReadable(reader: stream.Readable) {
    return new Promise(resolve => {
        reader.once("readable", resolve);
    });
}

This method will allow us to await signalReadable(stream); and pause the current loop until the stream is readable again.

We will create one other helper that waits for the end event in the same manner.

// Resolves when the stream fires the `end` event. We use the `once`
// method so that the promise only resolves once.
async function signalEnd(reader: stream.Readable) {
    return new Promise(resolve => {
        reader.once("end", resolve);
    });
}

We now have all the pieces we need to convert a stream into an AsyncGenerator. The function below will return an AsyncGenerator that allows us to read from the stream in chunks of data. For example, if we wanted to read 1 byte at a time:

async runner() {
    const byteGen = streamToAsyncGenerator(reader, 1);
    let i = 0;
    for await (const byte of byteGen) {
        console.log(++i, byte);
    }
}

The function below is responsible for delivering this functionality.

// Converts a stream into an AsyncGenerator that allows reading bytes
// of data from the stream in the chunk size specified. This function
// has some similarities to the `streamToGenerator` function.
function streamToAsyncGenerator<T>(
    reader: stream.Readable,
    chunkSize?: number,
): AsyncGenerator<T, void, unknown> {

    // Immediately invoke the AsyncGenerator function which will closure
    // scope the stream and returns the AsyncGenerator instance
    return (async function* genFn() {
    
        // Construct a promise that will resolve when the Stream has
        // ended. We use it below as a conditional resolution of the
        // readable and end events.
        const endPromise = signalEnd(reader);

        // Loop until the readable stream stops being readable! This
        // property is available in Node 12.9. We could also check the
        // status of the endPromise to see if it is resolved yet.
        while (!reader.readableEnded) {
        
            // Next, similar to readToEnd function, we loop on the
            // Stream until we have read all of the data that we
            // can from the stream.
            while (reader.readable) {
            
                // First try to read the chunk size, but if that fails
                // then try reading the remainder of the stream.
                let val = reader.read(chunkSize) || reader.read();

                // Either yield the contents to our generator or there
                // was no data and we are no longer readable and need
                // to wait for more info
                if (val) yield val;
                else break;
            }

            // We are no longer readable and one of two things will
            // happen now: `readable` or `end` will fire. We construct
            // a new `readable` signal to wait for the next signal.
            const readablePromise = signalReadable(reader);

            // We wait for either the `end` or `readable` event to fire
            await Promise.race([endPromise, readablePromise]);
        }
    })();
}

This function immediately invokes our AsyncGenerator function and returns the AsyncGenerator. This AsyncGenerator simply yields the contents of the stream as it becomes available.

The function periodically uses our stream signal functions signalReadable and signalEnded to check the state of the stream when there is no more data to read. This is where the async nature of the AsyncGenerator comes into play.  We use the neat Promise feature Promise.race to wait on either one of these conditions.  

Conclusion

Hopefully you have a better idea of how AsyncGenerators can be used with streams. AsyncGenerators allow us to do the same functionality as a standard Generator but we can call an async function from within the Generator function. As you can see, this works pretty well for converting a traditional stream into

One other neat use case for AsyncGenerators is to union multiple async results together in a single pipeline. In this regard you can use an async function to load the iterate and yield the contents of source A, then make an async call to yield the contents of source B. I've used this quite well for yield values for different message types in our Lightning Network implementation.