The concept of Iterators and Generators have been in JavaScript for a while now. This article will discuss how you can use a Generator to efficiently stream data to a consumer.

Generators are a mechanism for creating and returning enumerable data (e.g. an array of numbers) but doing it lazily (one value at a time).

// A Generator function that returns a Generator object
// that yields the numbers 0 to 9 one at at time
function* createGenerator() {
    for (let i = 0; i < 10; i++) {
        yield i;
    }
}

A Generator function is designated in JavaScript by using the function* keyword. This function will implicitly return a Generator object. The Generator function also allows the yield keyword usage inside the function. The yield keyword blocks execution until the consumer wants to read the yielded item.

For example, we use our Generator function to create a Generator to read our items. The Generator object will have a next() method that returns an object of the form { done?: boolean, value: T }

// creates a Generator that we can call until there is 
// nothing left on the Generator
const gen1 = createGenerator();

// loop until we are done
while (true) {
    // obtain the next value from the Generator
    const result = gen1.next();

    // exit the loop if the Generator is complete
    if (result.done) break;

    // otherwise print the value yielded by the 
    // Generator
    console.log(result.value);
}

Generators and Iterators are intimately linked. We can use an Iterator to read the values from the Generator instead of directly reading the next() result.

// creates a Generator that we can use Iterator syntax 
// to read all of the values from
const gen2 = createGenerator();

// read the values as if it is was a standard Iterator
for (const i of gen2) {
    console.log(i);
}

Now that we have gone through an overview on how Generators work, we will talk about streaming data using them.

Streaming Data with Generators

I think one of the most powerful uses of Generators is for keeping memory usage low by streaming results one record at a time. This is something I've used to great success in C# using yield return.

Conceptually, you may be thinking this is similar to streams and you would be right. It does behaves similar to a stream in that data is written and read from the stream while keeping the number of items in memory low.

There are a few differences. Streams allow for multiple data subscribers (or consumers) but a Generator is imperative code and only allows a single receiver of data.

Generators are also a simpler mechanism as there are no need to manage backpressure or concern yourself with flowing or paused mode of a stream. Basically with streams, it's turtles all the way (and by turtles I mean piped streams). Or you need to understand the finer details of streams to prevent a memory blow out.

Lastly, with Generators the producer is blocked until the consumer wants data.

Streams to Generators

The first thing we need to understand is how we can combine streams and Generators. The two actually fit together really well. We just need to ensure that our stream is in paused mode and that we can read data by using the read method. More on this is a second.

So let's look at an example where create a Readable object stream.

import stream from "stream";

function createReadStream(): stream.Readable {
    const reader = new stream.Readable({ objectMode: true });
    reader.push({ id: 1 });
    reader.push({ id: 2 });
    reader.push({ id: 3 });
    reader.push(null);
    return reader;
}

This method just constructs a new Readable Stream, pushes three items in the stream and closes it by pushing null. We can now use the reader and pluck out the three values from a consumer by either attaching the data event, piping the stream, or calling the read method.

Streams are by default in paused mode until the data event or pipe is attached. Since we want the stream in paused mode, we should not attach a data event.

When the stream is in paused mode, the readable event will be fired when there is data available the read method will return results.

Knowing this, we can create a helper method that uses the read method on the stream to convert the items into a generator

// Converts a Readable stream into a Generator that 
// yields the type specified. The Generator will read
// from the stream until there is no more data left
function streamToGenerator<T>(
    reader: stream.Readable,
    chunkSize?: number,
): Generator<T, void, unknown> {

    // Immediately invoke the Generator function which 
    // will closure scope the stream and returns the 
    // Generator instance
    return (function* genFn() {

        // Loop until there is nothing left to on the
        // stream
        while (true) {

            // Try to read data from the stream. An 
            // optional chunkSize can be provided but
            // for object streams, only a single result
            // will be returned.
            const val = reader.read(chunkSize);

            // The stream will be out of data when it 
            // returns a null value. We break the loop.
            if (val === null) break;

            // Otherwise, yield the value that was read
            // from the stream.
            else yield val;
        }
    })();
}

This above function creates and executes a Generator function and returns the resulting Generator. The Generator reads from the stream using the stream's read method until no more data can be read. The read method will return null when it is out of data.

We can then combine the above:

const reader = createReadStream();
const generator = streamToGenerator(reader);

for (const item of generator) {
    console.log(item);
}
{ id: 1 }
{ id: 2 }
{ id: 3 }

There is a problem however. In this contrived example, the stream is ready to rock and roll. That is when we created the stream and used push to add our first item, the stream was ready to use.

In the real world, the stream may not yet be ready. To work around this we're going to combine Promises with Generators.

Returning a Generator from an Async Function

As we've show, you can return a Generator from a function. This means you can also resolve a Generator in a Promise and return a Generator from an async function.

Consider the example below. We do some async task, then return a Generator using an immediately invoked Generator function (just as we did above).

// Resolve the promise after `ms` timeout
const wait = (ms) => new Promise(resolve => setTimeout(resolve, ms));

// Performs an async action, then after the async 
// action has completed, return a Generator that yields 
// some values.
async function asyncNums(ms: number): Promise<Generator<number, void unknown>> {
    await wait(100);
    
    return (function* gen() {
        for (let i = 0; i < 10; i++) {
            yield i;
        }
    })();
}

Consuming the data from this isn't much different. The only difference is that we must await the Promise until it resolves our Generator.

// Consuming it is not really different, we just need to
// await on the Promise to resolve our Generator. Then
// we use the Generator as we normally would.
async function run() {
    const numGen = await asyncNums();
    for (const i of numGen) {
        console.log("num", i);
    }
}
run().catch(console.error);

Combining Things

Let's consider a real world stream. What if wanted to use a Generator to stream bytes from a file. We need to wait until the fs.ReadStream object is ready to be consumed. We can use Promises to resolve once the stream is ready to go.

import fs = require("fs");

// Resolve the Promise once the stream has readable 
// data. Note that this is not production worth code 
// as a file stream read in 65k chunks and will fire 
// the readable event multiple times.
function asyncReadStream(path: string): Promise<fs.ReadStream> {
    return new Promise((resolve, reject) => {
        const sr = fs.createReadStream(path);
        sr.once("readable", () => resolve(sr));
        sr.once("err", err => reject(err));
    });
}

We can then combine this with our streamToGenerator function from above!

async function run(path: string) {
    // Waits for the promise to resolve with the 
    // Readable stream
    const reader = await asyncReadStream(path);

    // Convert the stream into a Generator
    const byteGen = streamToGenerator(reader, 1);

    // Read each byte!
    let i = 0;
    for (const byte of byteGen) {
        console.log(++i, byte);
    }
}

run("/Users/bmancini/test").catch(console.error);

The above pattern will work well when the stream fires a single readable event. This is fantastic for streaming database records or other data through your system.

Unfortunately for streams that fire multiple readable events (files larger than 65k) you will need to use another technique... perhaps an async Generator!