Importar CSV usando el esquema de Mangosta

Actualmente necesito insertar un archivo CSV grande en una base de datos de Mongo y el orden de los valores debe determinar la clave para la entrada de la base de datos:

Ejemplo de archivo CSV:

9,1557,358,286,Mutantville,4368,2358026,,M,0,0,0,1,0 9,1557,359,147,Wroogny,4853,2356061,,D,0,0,0,1,0 

Código para analizarlo en matrices:

 var fs = require("fs"); var csv = require("fast-csv"); fs.createReadStream("rank.txt") .pipe(csv()) .on("data", function(data){ console.log(data); }) .on("end", function(data){ console.log("Read Finished"); }); 

Salida de código:

 [ '9', '1557', '358', '286', 'Mutantville', '4368', '2358026', '', 'M', '0', '0', '0', '1', '0' ] [ '9', '1557', '359', '147', 'Wroogny', '4853', '2356061', '', 'D', '0', '0', '0', '1', '0' ] 

¿Cómo inserto las matrices en mi esquema de mongoose para entrar en mongo db?

Esquema:

 var mongoose = require("mongoose"); var rankSchema = new mongoose.Schema({ serverid: Number, resetid: Number, rank: Number, number: Number, name: String, land: Number, networth: Number, tag: String, gov: String, gdi: Number, protection: Number, vacation: Number, alive: Number, deleted: Number }); module.exports = mongoose.model("Rank", rankSchema); 

El orden de la matriz debe coincidir con el orden del esquema, por ejemplo, en la matriz, el primer número 9 debe guardarse siempre, ya que clave “serverid” y así sucesivamente. Estoy usando Node.JS

Puede hacerlo con fast-csv obteniendo los headers de la definición del esquema que devolverán las líneas analizadas como “objetos”. Realmente tienes algunos desajustes, así que los he marcado con correcciones:

 const fs = require('mz/fs'); const csv = require('fast-csv'); const { Schema } = mongoose = require('mongoose'); const uri = 'mongodb://localhost/test'; mongoose.Promise = global.Promise; mongoose.set('debug', true); const rankSchema = new Schema({ serverid: Number, resetid: Number, rank: Number, name: String, land: String, // <-- You have this as Number but it's a string networth: Number, tag: String, stuff: String, // the empty field in the csv gov: String, gdi: Number, protection: Number, vacation: Number, alive: Number, deleted: Number }); const Rank = mongoose.model('Rank', rankSchema); const log = data => console.log(JSON.stringify(data, undefined, 2)); (async function() { try { const conn = await mongoose.connect(uri); await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove())); let headers = Object.keys(Rank.schema.paths) .filter(k => ['_id','__v'].indexOf(k) === -1); console.log(headers); await new Promise((resolve,reject) => { let buffer = [], counter = 0; let stream = fs.createReadStream('input.csv') .pipe(csv({ headers })) .on("error", reject) .on("data", async doc => { stream.pause(); buffer.push(doc); counter++; log(doc); try { if ( counter > 10000 ) { await Rank.insertMany(buffer); buffer = []; counter = 0; } } catch(e) { stream.destroy(e); } stream.resume(); }) .on("end", async () => { try { if ( counter > 0 ) { await Rank.insertMany(buffer); buffer = []; counter = 0; resolve(); } } catch(e) { stream.destroy(e); } }); }); } catch(e) { console.error(e) } finally { process.exit() } })() 

Siempre que el esquema se alinee con el CSV proporcionado, entonces está bien. Estas son las correcciones que puedo ver, pero si necesita que los nombres de los campos se alineen de manera diferente, entonces debe ajustar. Pero básicamente había un Number en la posición donde hay una String y esencialmente un campo extra, que supongo es el que está en blanco en el CSV.

Las cosas generales son obtener la matriz de nombres de campo del esquema y pasarla a las opciones al crear la instancia del analizador csv:

 let headers = Object.keys(Rank.schema.paths) .filter(k => ['_id','__v'].indexOf(k) === -1); let stream = fs.createReadStream('input.csv') .pipe(csv({ headers })) 

Una vez que realmente haces eso, obtienes un “Objeto” en lugar de una matriz:

 { "serverid": "9", "resetid": "1557", "rank": "358", "name": "286", "land": "Mutantville", "networth": "4368", "tag": "2358026", "stuff": "", "gov": "M", "gdi": "0", "protection": "0", "vacation": "0", "alive": "1", "deleted": "0" } 

No se preocupe por los “tipos” porque Mongoose emitirá los valores de acuerdo con el esquema.

El rest ocurre dentro del controlador para el evento de data . Para lograr la máxima eficiencia, estamos usando insertMany() para escribir solo en la base de datos una vez cada 10,000 líneas. La forma en que el servidor y los procesos realmente dependen de la versión de MongoDB, pero 10,000 deberían ser bastante razonables en función del número promedio de campos que importaría para una única colección en términos de “compensación” por el uso de memoria y la escritura de un solicitud de red razonable. Haga el número más pequeño si es necesario.

Las partes importantes son marcar estas llamadas como funciones async y await el resultado de insertMany() antes de continuar. También debemos pause() la transmisión y resume() en cada elemento, de lo contrario, corremos el riesgo de sobrescribir el buffer de documentos para insertar antes de que realmente se envíen. La pause() y resume() son necesarias para poner “contrapresión” en la tubería, de lo contrario los elementos simplemente “salen” y activan el evento de data .

Naturalmente, el control de las 10,000 entradas requiere que verifiquemos que tanto en cada iteración como en la finalización de la transmisión para vaciar el búfer y enviar los documentos restantes al servidor.

Eso es realmente lo que quiere hacer, ya que ciertamente no desea disparar una solicitud asíncrona al servidor en “cada” iteración a través del evento de data o esencialmente sin esperar a que se complete cada solicitud. Se saldrá con la no comprobación de “archivos muy pequeños”, pero para cualquier carga del mundo real, seguramente superará la stack de llamadas debido a llamadas asíncronas “en vuelo” que aún no se han completado.


FYI – un package.json utilizado. El mz es opcional, ya que es solo una biblioteca modernizada habilitada para Promise de nodos estándar “incorporados” que estoy acostumbrado a usar. El código es, por supuesto, completamente intercambiable con el módulo fs .

 { "description": "", "main": "index.js", "dependencies": { "fast-csv": "^2.4.1", "mongoose": "^5.1.1", "mz": "^2.7.0" }, "keywords": [], "author": "", "license": "ISC" } 

En realidad, con el Nodo v8.9.x y superior, incluso podemos simplificar esto con una implementación de AsyncIterator través del módulo de stream-to-iterator . Todavía está en el modo Iterator> , pero debería hacerlo hasta que el Nodo v10.x se vuelva estable LTS:

 const fs = require('mz/fs'); const csv = require('fast-csv'); const streamToIterator = require('stream-to-iterator'); const { Schema } = mongoose = require('mongoose'); const uri = 'mongodb://localhost/test'; mongoose.Promise = global.Promise; mongoose.set('debug', true); const rankSchema = new Schema({ serverid: Number, resetid: Number, rank: Number, name: String, land: String, networth: Number, tag: String, stuff: String, // the empty field gov: String, gdi: Number, protection: Number, vacation: Number, alive: Number, deleted: Number }); const Rank = mongoose.model('Rank', rankSchema); const log = data => console.log(JSON.stringify(data, undefined, 2)); (async function() { try { const conn = await mongoose.connect(uri); await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove())); let headers = Object.keys(Rank.schema.paths) .filter(k => ['_id','__v'].indexOf(k) === -1); //console.log(headers); let stream = fs.createReadStream('input.csv') .pipe(csv({ headers })); const iterator = await streamToIterator(stream).init(); let buffer = [], counter = 0; for ( let docPromise of iterator ) { let doc = await docPromise; buffer.push(doc); counter++; if ( counter > 10000 ) { await Rank.insertMany(buffer); buffer = []; counter = 0; } } if ( counter > 0 ) { await Rank.insertMany(buffer); buffer = []; counter = 0; } } catch(e) { console.error(e) } finally { process.exit() } })() 

Básicamente, todo el manejo de “eventos” de la secuencia, la pausa y la reanudación se reemplazan por un simple bucle for :

 const iterator = await streamToIterator(stream).init(); for ( let docPromise of iterator ) { let doc = await docPromise; // ... The things in the loop } 

¡Fácil! Esto se limpia en la implementación posterior del nodo con for..await..of cuando se vuelve más estable. Pero lo anterior funciona bien en la versión especificada y superior.

Al decir @Neil Lunn se necesita encabezado dentro del propio CSV.

Ejemplo utilizando el módulo csvtojson .

 const csv = require('csvtojson'); const csvArray = []; csv() .fromFile(file-path) .on('json', (jsonObj) => { csvArray.push({ name: jsonObj.name, id: jsonObj.id }); }) .on('done', (error) => { if (error) { return res.status(500).json({ error}); } Model.create(csvArray) .then((result) => { return res.status(200).json({result}); }).catch((err) => { return res.status(500).json({ error}); }); }); });