Using `pipeline` from `node:stream/promise` for multiple writable sources

439 Views Asked by At

I have a Readable stream in object mode that I'm pushing data in, like this

const getReadStream = () => {
    const stream = new Readable({ objectMode: true, read: () => {} });

    const get = async (page = 1) => {
       const { data } = await client
           .request(...)
           .catch((error) => {
               stream.emit('error', error);
               return { data: undefined };
            });
    
       const results = parseFn(data);
    
       if (results.length > 0) {
           results.forEach((row) => stream.push(row));
           get(page + 1);
       } else {
           stream.push(null);
       }
    };
    
    get();

    return stream;
};

I would want to consume it into some Writable stream, like this

const consumeStream = async () => {

    const readableStream = getReadStream();

    const pipeline1 = pipeline(
        stream,
        transform1,
        transform2,
        someWritableStream,
    );

    if (!certainCondition) {
        return pipeline1;
    }

    const writeStream2 = new Writable({
        objectMode: true,
        write: (rows: any[], _, callback) => {
            somePromise(rows).then(() => callback());
        },
    });

    const pipeline2 = pipeline(
        stream,
        transform3,
        transform4,
        writeStream2,
    );

    return Promise.all([pipeline1, pipeline2]);
};

My question is, in the case where consumeStream proceed with pipeline2 , is it the correct way to implement multiple writable stream with only 1 readable stream (as the number of objects processed by pipeline1 = the number of objects processed by pipeline2. And if it is not, what is the correct way to implement this?

1

There are 1 best solutions below

0
leitning On BEST ANSWER

You're going to have issues with the original stream data being already in progress or potentially ended by the time you start your second pipeline.

If you use a PassThrough stream to duplicate data coming off of the original, then your method will work.

Here's a little script that demonstrates

#!/usr/bin/node

const { promises: fs,
        createReadStream,
        createWriteStream } = require('node:fs'),
             { setTimeout } = require('node:timers/promises'),
 { PassThrough,
   promises: { pipeline } } = require('node:stream'),
                     assert = require('node:assert');

const srcFile = '/tmp/foobar',
      sinkFile1 = '/tmp/foobarSink1',
      sinkFile2 = '/tmp/foobarSink2';

async function mkSrc() {
  for(let i =0; i< 1000; i++) {
    if(i)
      await fs.appendFile(srcFile,'foobar\n');
    else
      await fs.writeFile(srcFile,'foobar\n');
  }
}

function getReadableStream() {
  return createReadStream(srcFile);
}

function getWritableStream(file) {
  return createWriteStream(file);
}

async function chkIt() {
  let chk1 = await fs.readFile(sinkFile1,'utf8'),
      chk2 = await fs.readFile(sinkFile2,'utf8');
  assert.equal( chk1, chk2);
}

async function test1() {
  await mkSrc();
  let stream = getReadableStream();
  let prom1 = pipeline( stream, getWritableStream(sinkFile1));
  await setTimeout(1000);
  let prom2 =  pipeline(stream, getWritableStream(sinkFile2));
  await Promise.all([prom1,prom2]);
  try { 
    await chkIt();
    console.log('Not using passthrough results in same stream data');
  }
  catch(err) { console.error('Not using passthrough results in different stream data') }
}

async function test2() {
  await mkSrc();
  let stream = getReadableStream();
  let passthrough = new PassThrough();
  stream.pipe(passthrough);
  let prom1 = pipeline( stream, getWritableStream(sinkFile1));
  await setTimeout(1000);
  let prom2 = pipeline(passthrough, getWritableStream(sinkFile2));
  await Promise.all([prom1,prom2]);
  try { 
    await chkIt();
    console.log('Using passthrough results in same stream data');
  }
  catch(err) { console.error('Using passthrough results in different stream data') }
}


async function main() {
  await test1();
  await test2();
}
main();

The results printed are

Not using passthrough results in different stream data
Using passthrough results in same stream data

So in your case you would do something like

let stream = getStreamSomehow(),
    passthrough;
if(conditionChk) {
  passthrough = new Passthrough();
  stream.pipe(passthrough);
}
let pipeline1 = pipeline(stream,...);
if(!conditionChk)
  return pipeline1;
let pipeline2 = pipeline(passthrough,...);
doOtherStuff();
return Promise.all([pipeline1,pipeline2]);