Processing nested streams

632 Views Asked by At

I am trying to generate an output file by joining 2 csv input streams, for each record in csv 1 I want to generate an output for each record in csv 2.

I came across highland while browsing stack overflow for any similar solutions and came across:

Nested stream operations in Highland.js

I have attempted to adjust this to my own problem and so far have this:

    const debug = require('debug')('csvparse');
    const csv = require('fast-csv');
    const fs = require('fs');
    const args = process.argv;
    const h = require('highland');

    const typestream = h(fs.createReadStream(args[2]).pipe(csv({ headers: true, ignoreEmpty: true })));
    const postcodestream = h(fs.createReadStream(args[3]).pipe(csv({ headers: true, ignoreEmpty: true })));

    const pipeline = typestream.flatMap((type) => {
        debug(type);

        return postcodestream.flatMap((postcode) => {
            debug(postcode);

            return h([`${type.type}-${postcode.postcode}\n`]);
        });
    });

    pipeline.pipe(process.stdout);

With the following example inputs csv1:

type,
STREET,
ROAD,

csv2:

postcode,
3456
3446
1234

Id expect output of

STREET-3456
STREET-3446
STREET-1234
ROAD-3456
ROAD-3446
ROAD-1234

But Im just getting:

STREET-3456
STREET-3446
STREET-1234

I can see from the debug statements that i get the out of ROAD once and then it stops.

1

There are 1 best solutions below

0
Mike Bissett On

Ok I figured out my problem, basically I should have been using through for the csv parsing instead of wrapping the pipe and I shoudl have also been creating the fs.createReadStream within the initial flatMap rather then referencing it from a variable (as the stream will have finished after the initial iteration).

Code is now:

#!/usr/bin/node
const debug = require('debug')('csvparse');
const csv = require('fast-csv');
const fs = require('fs');
const args = process.argv;
const h = require('highland');

const pipeline = h(fs.createReadStream(args[2]))
    .through(csv({ headers: true, ignoreEmpty: true }))
    .flatMap((type) => {
        return h(fs.createReadStream(args[3]))
            .through(csv({ headers: true, ignoreEmpty: true }))
            .map((postcode) => {
                return `${type.type}-${postcode.postcode}\n`;
            });
    });

pipeline.pipe(process.stdout);