How the Node.js async eachLimit works in this situation?

gremo picture gremo · May 8, 2013 · Viewed 8.1k times · Source

I wrote a little async script to batch insert a lot of JSON files into a MongoDB sharded cluster. This is my first time with this module (and I'm still learning Node.js). I don't know if I'm doing it right.

  • The code is the last part of a waterfall (1): previous functions end up with an object with db, coll and files properties.
  • files array contains hundred of file paths and the function to apply to each element of the array is, again, a waterfall (2).
  • Waterfall (2) is made of the following: read, parse, insert. When this waterfall ends (3) I call complete to finalize the processing of a single item in the array, passing the error (if any).

So far so good, correct?

What I can't understand is what happens inside the async.eachLimit callback (4). From the documentation:

A callback which is called after all the iterator functions have finished, or an error has occurred.

That is, when all functions have finished, the next() call (5) ends the script. But the same callback (4) is invoked when a single error occurred, as per documentation. That is my script stops when a fail with a single file happens.

How can I avoid this?

async.waterfall([ // 1
    // ...
    function (obj, next) {
        async.eachLimit(obj.files, 1000,
            function (file, complete) {
                async.waterfall([ // 2
                    function (next) {
                        fs.readFile(file, {}, function (err, data) {
                            next(err, data);
                        });
                    },
                    function (data, next) { // Parse (assuming all well formed)
                        next(null, JSON.parse(data));
                    },
                    function (doc, next) { // Insert
                        obj.coll.insert(doc, {w: 1}, function (err, doc) {
                            next(err);
                        });
                    }
                ], function (err, result) { // 3
                    complete(err);
                });
            },
            function (err) { // 4
                if (err) console.error(err);
                next(null, obj); // 5
            }
        );
    }
], function (err, obj) { // Waterfall end
    if (err) console.error(err);
    obj.db.close(); // Always close the connection
});

Answer

Alberto Zaccagni picture Alberto Zaccagni · May 8, 2013

If you don't want it to break in case of an error you should just invoke the callback with a falsy first parameter, like so (look after // 3). Is this ok with you / did I understand correctly?

async.waterfall([ // 1
    // ...
    function (obj, next) {
        async.eachLimit(obj.files, 1000,
            function (file, complete) {
                async.waterfall([ // 2
                    function (next) {
                        fs.readFile(file, {}, function (err, data) {
                            next(err, data);
                        });
                    },
                    function (data, next) { // Parse (assuming all well formed)
                        next(null, JSON.parse(data));
                    },
                    function (doc, next) { // Insert
                        obj.coll.insert(doc, {w: 1}, function (err, doc) {
                            next(err);
                        });
                    }
                ], function (err, result) { // 3
                    if (err) {
                        console.log(file + ' threw an error');
                        console.log(err);
                        console.log('proceeding with execution');
                    }
                    complete();
                });
            },
            function (err) { // 4
                next(null, obj); // 5
            }
        );
    }
], function (err, obj) { // Waterfall end
    if (err) console.error(err);
    obj.db.close(); // Always close the connection
});