Proper way to consume NodeJS stream into buffer and write stream

andykais picture andykais · Jul 17, 2018 · Viewed 9.6k times · Source

I have a need to pipe a readable stream into both a buffer (to be converted into a string) and a file. The stream is coming from node-fetch.

NodeJS streams have two states: paused and flowing. From what I understand, as soon as a 'data' listener is attached, the stream will change to flowing mode. I want to make sure the way I am reading a stream will not lose any bytes.

Method 1: piping and reading from 'data':

fetch(url).then(
  response =>
    new Promise(resolve => {
      const buffers = []
      const dest = fs.createWriteStream(filename)
      response.body.pipe(dest)
      response.body.on('data', chunk => buffers.push(chunk))
      dest.on('close', () => resolve(Buffer.concat(buffers).toString())
    })
)

Method 2: using passthrough streams:

const { PassThrough } = require('stream')
fetch(url).then(
  response =>
    new Promise(resolve => {
      const buffers = []
      const dest = fs.createWriteStream(filename)
      const forFile = new PassThrough()
      const forBuffer = new PassThrough()
      response.body.pipe(forFile).pipe(dest)
      response.body.pipe(forBuffer)
      forBuffer.on('data', chunk => buffers.push(chunk))
      dest.on('close', () => resolve(Buffer.concat(buffers).toString())
    })
)

Is the second method required so there is no lost data? Is the second method wasteful since two more streams could be buffered? Or, is there another way to fill a buffer and write stream simultaneously?

Answer

Marcos Casagrande picture Marcos Casagrande · Jul 17, 2018

You won't miss any data, since .pipe internally calls src.on('data') and writes any chunk to the target stream.

So any chunk written to your dest stream, will also be emitted to response.body.on('data') where you're buffering the chunks. In any case, you should listen to 'error' events and reject if any error occurs.

And While your second mode will work, you don't need it.


This is a chunk of code from the .pipe function

  src.on('data', ondata);
  function ondata(chunk) {
    debug('ondata');
    var ret = dest.write(chunk);
    debug('dest.write', ret);
    if (ret === false) {
      // If the user unpiped during `dest.write()`, it is possible
      // to get stuck in a permanently paused state if that write
      // also returned false.
      // => Check whether `dest` is still a piping destination.
      if (((state.pipesCount === 1 && state.pipes === dest) ||
           (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)) &&
          !cleanedUp) {
        debug('false write response, pause', state.awaitDrain);
        state.awaitDrain++;
      }
      src.pause();
    }
  }