What's the proper way to handle back-pressure in a node.js Transform stream?

Scott Saad picture Scott Saad · Dec 25, 2013 · Viewed 9.4k times · Source

Intro

These are my first adventures in writing node.js server side. It's been fun so far but I'm having some difficulty understanding the proper way to implement something as it relates to node.js streams.

Problem

For test and learning purposes I'm working with large files whose content is zlib compressed. The compressed content is binary data, each packet being 38 bytes in length. I'm trying to create a resulting file that looks almost identical to the original file except that there is an uncompressed 31 byte header for every 1024 38 byte packets.

original file content (decompressed)

+----------+----------+----------+----------+
| packet 1 | packet 2 |  ......  | packet N |
| 38 bytes | 38 bytes |  ......  | 38 bytes |
+----------+----------+----------+----------+

resulting file content

+----------+--------------------------------+----------+--------------------------------+
| header 1 |    1024 38 byte packets        | header 2 |    1024 38 byte packets        |
| 31 bytes |       zlib compressed          | 31 bytes |       zlib compressed          |
+----------+--------------------------------+----------+--------------------------------+

As you can see, it's somewhat of a translation problem. Meaning, I'm taking some source stream as input and then slightly transforming it into some output stream. Therefore, it felt natural to implement a Transform stream.

The class simply attempts to accomplish the following:

  1. Takes stream as input
  2. zlib inflates the chunks of data to count the number of packets, putting together 1024 of them, zlib deflating, and prepending a header.
  3. Passes the new resulting chunk on through the pipeline via this.push(chunk).

A use case would be something like:

var fs = require('fs');
var me = require('./me'); // Where my Transform stream code sits
var inp = fs.createReadStream('depth_1000000');
var out = fs.createWriteStream('depth_1000000.out');
inp.pipe(me.createMyTranslate()).pipe(out);

Question(s)

Assuming Transform is a good choice for this use case, I seem to be running into a possible back-pressure issue. My call to this.push(chunk) within _transform keeps returning false. Why would this be and how to handle such things?

Answer

Mike Lippert picture Mike Lippert · May 5, 2017

This question from 2013 is all I was able to find on how to deal with "back pressure" when creating node Transform streams.

From the node 7.10.0 Transform stream and Readable stream documentation what I gathered was that once push returned false, nothing else should be pushed until _read was called.

The Transform documentation doesn't mention _read except to mention that the base Transform class implements it (and _write). I found the information about push returning false and _read being called in the Readable stream documentation.

The only other authoritative comment I found on Transform back pressure only mentioned it as an issue, and that was in a comment at the top of the node file _stream_transform.js.

Here's the section about back pressure from that comment:

// This way, back-pressure is actually determined by the reading side,
// since _read has to be called to start processing a new chunk.  However,
// a pathological inflate type of transform can cause excessive buffering
// here.  For example, imagine a stream where every byte of input is
// interpreted as an integer from 0-255, and then results in that many
// bytes of output.  Writing the 4 bytes {ff,ff,ff,ff} would result in
// 1kb of data being output.  In this case, you could write a very small
// amount of input, and end up with a very large amount of output.  In
// such a pathological inflating mechanism, there'd be no way to tell
// the system to stop doing the transform.  A single 4MB write could
// cause the system to run out of memory.
//
// However, even in such a pathological case, only a single written chunk
// would be consumed, and then the rest would wait (un-transformed) until
// the results of the previous transformed chunk were consumed.

Solution example

Here's the solution I pieced together to handle the back pressure in a Transform stream which I'm pretty sure works. (I haven't written any real tests, which would require writing a Writable stream to control the back pressure.)

This is a rudimentary Line transform which needs work as a line transform but does demonstrate handling the "back pressure".

const stream = require('stream');

class LineTransform extends stream.Transform
{
    constructor(options)
    {
        super(options);

        this._lastLine = "";
        this._continueTransform = null;
        this._transforming = false;
        this._debugTransformCallCount = 0;
    }

    _transform(chunk, encoding, callback)
    {
        if (encoding === "buffer")
            return callback(new Error("Buffer chunks not supported"));

        if (this._continueTransform !== null)
            return callback(new Error("_transform called before previous transform has completed."));

        // DEBUG: Uncomment for debugging help to see what's going on
        //console.error(`${++this._debugTransformCallCount} _transform called:`);

        // Guard (so we don't call _continueTransform from _read while it is being
        // invoked from _transform)
        this._transforming = true;

        // Do our transforming (in this case splitting the big chunk into lines)
        let lines = (this._lastLine + chunk).split(/\r\n|\n/);
        this._lastLine = lines.pop();

        // In order to respond to "back pressure" create a function
        // that will push all of the lines stopping when push returns false,
        // and then resume where it left off when called again, only calling
        // the "callback" once all lines from this transform have been pushed.
        // Resuming (until done) will be done by _read().
        let nextLine = 0;
        this._continueTransform = () =>
            {
                let backpressure = false;
                while (nextLine < lines.length)
                {

                    if (!this.push(lines[nextLine++] + "\n"))
                    {
                        // we've got more to push, but we got backpressure so it has to wait.
                        if (backpressure)
                            return;

                        backpressure = !this.push(lines[nextLine++] + "\n");
                    }
                }

                // DEBUG: Uncomment for debugging help to see what's going on
                //console.error(`_continueTransform ${this._debugTransformCallCount} finished\n`);

                // All lines are pushed, remove this function from the LineTransform instance
                this._continueTransform = null;
                return callback();
            };

        // Start pushing the lines
        this._continueTransform();

        // Turn off guard allowing _read to continue the transform pushes if needed.
        this._transforming = false;
    }

    _flush(callback)
    {
        if (this._lastLine.length > 0)
        {
            this.push(this._lastLine);
            this._lastLine = "";
        }

        return callback();
    }

    _read(size)
    {
        // DEBUG: Uncomment for debugging help to see what's going on
        //if (this._transforming)
        //    console.error(`_read called during _transform ${this._debugTransformCallCount}`);

        // If a transform has not pushed every line yet, continue that transform
        // otherwise just let the base class implementation do its thing.
        if (!this._transforming && this._continueTransform !== null)
            this._continueTransform();
        else
            super._read(size);
    }
}

I tested the above by running it with the DEBUG lines uncommented on a ~10000 line ~200KB file. Redirect stdout or stderr to a file (or both) to separate the debugging statements from the expected output. (node test.js > out.log 2> err.log)

const fs = require('fs');
let inStrm = fs.createReadStream("testdata/largefile.txt", { encoding: "utf8" });
let lineStrm = new LineTransform({ encoding: "utf8", decodeStrings: false });
inStrm.pipe(lineStrm).pipe(process.stdout);

Helpful debugging hint

While writing this initially I didn't realize that _read could be called before _transform returned, so I hadn't implemented the this._transforming guard and I was getting the following error:

Error: no writecb in Transform class
    at afterTransform (_stream_transform.js:71:33)
    at TransformState.afterTransform (_stream_transform.js:54:12)
    at LineTransform._continueTransform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:44:13)
    at LineTransform._transform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:46:21)
    at LineTransform.Transform._read (_stream_transform.js:167:10)
    at LineTransform._read (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:56:15)
    at LineTransform.Transform._write (_stream_transform.js:155:12)
    at doWrite (_stream_writable.js:331:12)
    at writeOrBuffer (_stream_writable.js:317:5)
    at LineTransform.Writable.write (_stream_writable.js:243:11)

Looking at the node implementation I realized that this error meant that the callback given to _transform was being called more than once. There wasn't much information to be found about this error either so I thought I'd include what I figured out here.