node.js how to sync final callback after import into db from a stream

573 Views Asked by At

It sounds like a pretty typical use case for libraries like Q or async, but I could not really figure out what's the best way to do it.

I want to import a CSV file with 150 lines (using node-csv), and create a mongo document for each line. However, the stream parsing seems to finish faster than the 'db inserts', and so I run into the problem that the callback gets called too early.

// importtest.mocha.js
[...]
importer.loadFromCsv (url, function(result) {
 result.length.should.be.equal (150); // nope, it's always around 41
}


// importer.js 
function loadFromCsv (url, callback){
  csv().from.stream(url)
    .on ('record', function(record, index){ 
       new Row({data: record}).save(function() {
         console.log ('saved a row to db');
       }); 
    })
    .on ('end', function() {
      callback (Row.find({})); // E parser finished, but probably not all Row.save()
    });
}

So, could please anybody give me a hint how I could fix that with async/promises so that while the stream parsing / db inserts are async, the final callback will be done only after all inserts have finished?

1

There are 1 best solutions below

0
On BEST ANSWER

As you are inserting many records, you should take care for each of them individually. Here is a not tested code snippet you could try and adapt. Practically you create a list of promises, when all of them get resolved the function passed to then(fn) gets fired. Well, as mentioned in the code, you should take care for records who have errors. Be aware, that the function passed to then(fn) gets executed only when ALL promises get resolved (successfuly). To indicate the promise an error for a record, you should use defer.reject() instead of def.resolve(). Than pass also a function for the onErrorFn placeholder. Its kind of SQL transaction like.

Here goes the code containing comments for you:

var q = require('q');

function loadFromCsv (url, callback){
  // create an array holding all promises
  var csv_promises = [];

  csv().from.stream(url)
    .on ('record', function(record, index){ 
      // create new defer object, per row
      var row_defer = q.defer();

      // make sure, this function gets called, only after the row got saved
      new Row({data: record}).save(function() {
        console.log ('saved a row to db');

        // resolves the promise, per row
        row_defer.resolve(record);

        // todo: take care for an error, per row
      });

      csv_promises.push(row_defer.promise);     // add promise to promise list, per row
    })
    .on ('end', function() {
      // callback (Row.find({})); // E parser finished, but probably not all Row.save()

      // q.all gets resolved and fires passed function as soon as ALL promises in csv_promises array are resolved
      // todo: take care for errors
      q.all(csv_promises).then(function() {
        callback( csv_promises );
      } /*, onErrorFn */ );
}

loadFromCsv( "URL", function(rows) {
    console.log("Treated rows: ", rows.length);
});