Iterando sobre un cursor mongodb en serie (esperando las devoluciones de llamada antes de pasar al siguiente documento)

Usando mongoskin, puedo hacer una consulta como esta, que devolverá un cursor:

myCollection.find({}, function(err, resultCursor) { resultCursor.each(function(err, result) { } } 

Sin embargo, me gustaría llamar a algunas funciones asíncronas para cada documento, y solo pasar al siguiente elemento en el cursor después de que esto haya devuelto la llamada (similar a la estructura de cada Serie en el módulo async.js). P.ej:

 myCollection.find({}, function(err, resultCursor) { resultCursor.each(function(err, result) { externalAsyncFunction(result, function(err) { //externalAsyncFunction completed - now want to move to next doc }); } } 

¿Cómo podría hacer esto?

Gracias

ACTUALIZAR:

No quiero usar toArray() ya que se trata de una gran operación por lotes y es posible que los resultados no se ajusten a la memoria de una sola vez.

Si no desea cargar todos los resultados en la memoria usando toArray, puede iterar usando el cursor con algo como lo siguiente.

 myCollection.find({}, function(err, resultCursor) { function processItem(err, item) { if(item === null) { return; // All done! } externalAsyncFunction(item, function(err) { resultCursor.nextObject(processItem); }); } resultCursor.nextObject(processItem); } 

Un enfoque más moderno que utiliza async / await :

 const cursor = db.collection("foo").find({}); while(await cursor.hasNext()) { const doc = await cursor.next(); // process doc here } 

Notas:

  • Esto puede ser incluso más sencillo cuando llegan los iteradores asíncronos .
  • Probablemente querrá agregar try / catch para la comprobación de errores.
  • La función que contiene debe ser async o el código debe estar envuelto en (async function() { ... })() ya que utiliza await .
  • Si lo desea, agregue a la await new Promise(resolve => setTimeout(resolve, 1000)); (pausa durante 1 segundo) al final del ciclo while para mostrar que sí procesa los documentos uno tras otro.

Esto funciona con un conjunto de datos grande utilizando setImmediate:

 var cursor = collection.find({filter...}).cursor(); cursor.nextObject(function fn(err, item) { if (err || !item) return; setImmediate(fnAction, item, arg1, arg2, function() { cursor.nextObject(fn); }); }); function fnAction(item, arg1, arg2, callback) { // Here you can do whatever you want to do with your item. return callback(); } 

Si alguien está buscando una forma Prometida de hacer esto (en lugar de usar devoluciones de llamada de nextObject), aquí está. Estoy usando Node v4.2.2 y mongo driver v2.1.7. Esta es una especie de versión Cursor.forEach() de Cursor.forEach() :

 function forEachSeries(cursor, iterator) { return new Promise(function(resolve, reject) { var count = 0; function processDoc(doc) { if (doc != null) { count++; return iterator(doc).then(function() { return cursor.next().then(processDoc); }); } else { resolve(count); } } cursor.next().then(processDoc); }); } 

Para usar esto, pase el cursor y un iterador que opere en cada documento de forma asíncrona (como lo haría para Cursor.forEach). El iterador debe devolver una promesa, como hacen la mayoría de las funciones del controlador nativo de mongodb.

Digamos que quieres actualizar todos los documentos en la test colección. Así es como lo harías:

 var theDb; MongoClient.connect(dbUrl).then(function(db) { theDb = db; // save it, we'll need to close the connection when done. var cur = db.collection('test').find(); return forEachSeries(cur, function(doc) { // this is the iterator return db.collection('test').updateOne( {_id: doc._id}, {$set: {updated: true}} // or whatever else you need to change ); // updateOne returns a promise, if not supplied a callback. Just return it. }); }) .then(function(count) { console.log("All Done. Processed", count, "records"); theDb.close(); }) 

Puedes hacer algo como esto usando el lib de async. El punto clave aquí es verificar si el documento actual es nulo. Si lo es, significa que has terminado.

 async.series([ function (cb) { cursor.each(function (err, doc) { if (err) { cb(err); } else if (doc === null) { cb(); } else { console.log(doc); array.push(doc); } }); } ], function (err) { callback(err, array); }); 

Podrías usar un futuro:

 myCollection.find({}, function(err, resultCursor) { resultCursor.count(Meteor.bindEnvironment(function(err,count){ for(var i=0;i 

Puede obtener el resultado en una Array e iterar usando una función recursiva, algo como esto.

 myCollection.find({}).toArray(function (err, items) { var count = items.length; var fn = function () { externalAsyncFuntion(items[count], function () { count -= 1; if (count) fn(); }) } fn(); }); 

Editar:

Esto solo es aplicable para conjuntos de datos pequeños, para los más grandes debe usar los cursores como se menciona en otras respuestas.

Podrías usar setTimeOut’s simples. Este es un ejemplo de escritura de tipos en nodejs (estoy usando promesas a través del módulo ‘cuándo’, pero también se puede hacer sin ellas):

  import mongodb = require("mongodb"); var dbServer = new mongodb.Server('localhost', 27017, {auto_reconnect: true}, {}); var db = new mongodb.Db('myDb', dbServer); var util = require('util'); var when = require('when'); //npm install when var dbDefer = when.defer(); db.open(function() { console.log('db opened...'); dbDefer.resolve(db); }); dbDefer.promise.then(function(db : mongodb.Db){ db.collection('myCollection', function (error, dataCol){ if(error) { console.error(error); return; } var doneReading = when.defer(); var processOneRecordAsync = function(record) : When.Promise{ var result = when.defer(); setTimeout (function() { //simulate a variable-length operation console.log(util.inspect(record)); result.resolve('record processed'); }, Math.random()*5); return result.promise; } var runCursor = function (cursor : MongoCursor){ cursor.next(function(error : any, record : any){ if (error){ console.log('an error occurred: ' + error); return; } if (record){ processOneRecordAsync(record).then(function(r){ setTimeout(function() {runCursor(cursor)}, 1); }); } else{ //cursor up doneReading.resolve('done reading data.'); } }); } dataCol.find({}, function(error, cursor : MongoCursor){ if (!error) { setTimeout(function() {runCursor(cursor)}, 1); } }); doneReading.promise.then(function(message : string){ //message='done reading data' console.log(message); }); }); });