How to write batches of data in NodeJS stream pipeline?

1.3k Views Asked by At

I have a function in which I read CSV file as a readable stream using the "pipeline" method, splitting it by rows and transforming the data of each row, then I add the data to an array. When the pipeline is finished, I insert all the data to a database.

This is the relevant part of the code:

pipeline(storageStream as Readable, split(), this.FilterPipe(), this.MapData(result));

public MapData(result: Array<string>): MapStream {
    return mapSync((filteredData: string) => {
      const trimmed: string = filteredData.trim();
      if (trimmed.length !== 0) {
        result.push(trimmed);
      }
    });
}

We have encountered sometimes with memory limits since we uploaded a big amount of very large CSV files, so we have decided to try to split the logic into insertion batches so we won't use a lot of memory at the same time.

So I thought to handle the readed data by batches, in which per every batch (let's say 100 rows in the file), I will trigger the "MapData" function and insert the result array to the DB.

Is there any option to add a condition so the MapData will be triggered every X rows? Or, if there is any other solution that might meet the requirement?

Thanks in advance!

1

There are 1 best solutions below

2
Heiko Theißen On BEST ANSWER

The following code shows a transform stream that buffers incoming objects (or arrays of objects) until it has 100 of them and then pushes them onwards as an array:

var t = new stream.Transform({
  objectMode: true,
  transform(chunk, encoding, callback) {
    this.buffer = (this.buffer || []).concat(chunk);
    if (this.buffer.length >= 100) {
      this.push(this.buffer);
      this.buffer = [];
    }
    callback();
  },
  flush(callback) {
    if (this.buffer.length > 0) this.push(this.buffer);
    callback();
  }
}).on("data", console.log);
for (var i = 0; i < 250; i++) t.write(i);
t.end();

You can include such a transform stream in your pipeline.

And here's the same in Typescript. It can very probably be done more elegantly, but I am no Typescript expert.

class MyTransform extends Transform {
  buffer: Array<any>;
}
var t = new MyTransform({
  objectMode: true,
  transform(chunk, encoding, callback) {
    var that = this as MyTransform;
    that.buffer = (that.buffer || []).concat(chunk);
    if (that.buffer.length >= 100) {
      this.push(that.buffer);
      that.buffer = [];
    }
    callback();
  },
  flush(callback) {
    var that = this as MyTransform;
    if (that.buffer.length > 0) this.push(that.buffer);
    callback();
  }
}).on("data", console.log);
for (var i = 0; i < 250; i++) t.write(i);
t.end();