I am trying to generate an output file by joining 2 csv input streams, for each record in csv 1 I want to generate an output for each record in csv 2.
I came across highland while browsing stack overflow for any similar solutions and came across:
Nested stream operations in Highland.js
I have attempted to adjust this to my own problem and so far have this:
const debug = require('debug')('csvparse');
const csv = require('fast-csv');
const fs = require('fs');
const args = process.argv;
const h = require('highland');
const typestream = h(fs.createReadStream(args[2]).pipe(csv({ headers: true, ignoreEmpty: true })));
const postcodestream = h(fs.createReadStream(args[3]).pipe(csv({ headers: true, ignoreEmpty: true })));
const pipeline = typestream.flatMap((type) => {
debug(type);
return postcodestream.flatMap((postcode) => {
debug(postcode);
return h([`${type.type}-${postcode.postcode}\n`]);
});
});
pipeline.pipe(process.stdout);
With the following example inputs csv1:
type,
STREET,
ROAD,
csv2:
postcode,
3456
3446
1234
Id expect output of
STREET-3456
STREET-3446
STREET-1234
ROAD-3456
ROAD-3446
ROAD-1234
But Im just getting:
STREET-3456
STREET-3446
STREET-1234
I can see from the debug statements that i get the out of ROAD once and then it stops.
Ok I figured out my problem, basically I should have been using through for the csv parsing instead of wrapping the pipe and I shoudl have also been creating the fs.createReadStream within the initial flatMap rather then referencing it from a variable (as the stream will have finished after the initial iteration).
Code is now: