evento que no se dispara en el flujo legible

Estoy tratando de implementar una cascada asíncrona, estoy usando dos funciones para crear la primera función y las otras funciones para crear la matriz de funciones en cascada. mi método de servicio está esperando un objeto de flujo desde la base de datos, que estoy pasando de otro archivo, luego lo estoy procesando en mi caída de agua, básicamente estoy transformando el resultado en función de las funciones que estoy obteniendo de mi db, entonces, ¿qué sucede? es que cuando solicito mi servicio, la cascada se activa, mi primera función es rans, pero cuando se trata de la segunda función, no se activa ningún evento (lectura, final) para mi función non_transformer, que estoy devolviendo del método returnnontransformer.

No estoy seguro de poder explicar el problema exactamente porque implica mucho código y no puedo pegarlo todo aquí.

en eso() {

this.arr = [{ "Date": "2015-05-04 09:52:55.000", "ID": null } , { "Date": "2016-05-09 07:03:38.000", "ID": null } , { "Date": "2009-09-28 12:30:59.000", "ID": null } , { "Date": "2015-06-13 08:32:09.000", "ID": null } , { "Date": "2012-12-14 14:36:19.000", "ID": null } , { "Date": "2016-04-27 12:42:21.000", "ID": null } , { "Date": "2016-10-02 22:48:08.000", "ID": null } , { "Date": "2015-06-13 08:32:06.000", "ID": null } , { "Date": "2015-08-26 21:44:11.000", "ID": null } , { "Date": "2015-05-22 09:36:24.000", "ID": null } , { "Date": "2013-09-14 08:57:14.000", "ID": null } , { "Date": "2014-05-03 08:09:33.000", "ID": null } , { "Date": "2007-11-05 10:18:14.000", "ID": null } , { "Date": "2011-06-21 22:48:10.000", "ID": null } , { "Date": "2017-08-12 07:54:14.000", "ID": null } , { "Date": "2015-05-16 08:36:52.000", "ID": null } , { "Date": "2016-10-06 16:21:58.000", "ID": null } , { "Date": "2015-11-10 13:44:02.000", "ID": null } , { "Date": "2015-11-20 11:41:52.000", "ID": null } , { "Date": "2017-06-27 10:01:31.000", "ID": null } , { "Date": "2017-06-09 14:06:18.000", "ID": null } , { "Date": "2015-05-16 12:02:17.000", "ID": null } , { "Date": "2006-08-02 13:44:09.000", "ID": 11309146 } , { "Date": "2015-02-23 18:25:14.000", "ID": null } , { "Date": "2003-08-21 16:03:56.000", "ID": null } , { "Date": "2013-12-14 10:35:34.000", "ID": null } , { "Date": "2015-11-13 15:37:45.000", "ID": null } , { "Date": "2013-06-19 12:44:10.000", "ID": null } , { "Date": "2009-03-12 13:35:08.000", "ID": null } , { "Date": "2017-10-06 08:16:03.000", "ID": null } , { "Date": "2017-09-08 07:53:26.000", "ID": null } , { "Date": "2016-04-27 12:49:47.000", "ID": null } , { "Date": "2013-12-14 10:39:39.000", "ID": null } , { "Date": "2014-12-16 09:38:29.000", "ID": null } , { "Date": "2013-11-14 11:46:11.000", "ID": null } , { "Date": "2011-04-30 17:46:41.000", "ID": null } , { "Date": "2013-03-27 10:51:21.000", "ID": null } , { "Date": "2015-03-23 11:00:17.000", "ID": null } , { "Date": "2013-01-31 12:42:51.000", "ID": null } , { "Date": "2017-07-31 08:07:28.000", "ID": null } , { "Date": "2015-05-28 14:08:00.000", "ID": null } , { "Date": "2016-04-27 12:43:42.000", "ID": null } , { "Date": "2017-01-27 14:23:36.000", "ID": null } , { "Date": "2015-12-18 13:15:50.000", "ID": null } , { "Date": "2014-02-22 10:22:12.000", "ID": null } , { "Date": "2011-05-07 01:49:42.000", "ID": null } , { "Date": "2017-01-27 11:51:54.000", "ID": null } , { "Date": "2003-06-02 16:21:46.000", "ID": null } , { "Date": "2015-05-01 17:39:33.000", "ID": null } , { "Date": "2003-07-24 17:07:29.000", "ID": null } , { "Date": "2017-04-07 07:52:21.000", "ID": null } , { "Date": "2005-11-23 16:25:46.000", "ID": null } , { "Date": "2016-05-06 08:40:53.000", "ID": null } , { "Date": "2014-06-21 07:48:04.000", "ID": null } , { "Date": "2013-09-09 10:28:22.000", "ID": null } , { "Date": "2016-10-28 07:51:13.000", "ID": null } , { "Date": "2014-09-02 15:21:37.000", "ID": null }] const convertNostreamToStreamPromise = self.convertNostreamToStream(resultObject); convertNostreamToStreamPromise.then(function (promiseStreamObject) { this.streamResponse = promiseStreamObject; }) log.info("started transformer chaning"); } 

así que en el código anterior estoy convirtiendo un objeto de matriz en el objeto de flujo. este es el método de servicio que esperará el objeto de flujo.

  service(req, streamObject) { let tarr=["name:"version","name:version"] return new Promise(function (fulfill, reject) { const transformResultPromise = self.transformResult(transformenrArray, req, streamObject); transformResultPromise.then(function (resultStream) { fulfill(resultStream); }, function (err) { reject(err); }) }) } createFirstFunction(index, transformenrArray, stream) { console.log("inside createFirstFunction"); const self = this; console.log("stream+++ " + stream); return function (callback) { const streamObject = stream; const transformerId = transformenrArray[index].split(':')[0]; const transformerVersion = transformenrArray[index].split(':')[1]; var transformResponse = self.tesobj.service(transformerId, transformerVersion, streamObject, (transformerFunction, transformerType) => { transformerFunction = self.returnStreamingTransformerFunction(); if(transformerFunction instanceof Error){ } else { if (transformerType === 'streaming') { let streamParser = JSONStream.parse('*'); streamParser.on('data', transformerFunction); let toPassStreamObject = streamObject.pipe(JSONStream.stringify()) .pipe(streamParser) toPassStreamObject.on('data', function (data) { console.log("first function " + data); }) if (index === transformenrArray.length - 1) { callback(null, toPassStreamObject); } else { callback(null, index + 1, transformenrArray, toPassStreamObject); } } else { console.log("Non streaming +++ "); transformerFunction(streamObject, function (resultObject) { const convertNostreamToStreamPromise = self.convertNostreamToStream(resultObject); convertNostreamToStreamPromise.then(function (promiseStreamObject) { if (index === transformenrArray.length - 1) { console.log("calling second function "); callback(null, promiseStreamObject); } else { console.log("calling second function "); callback(null, index + 1, transformenrArray, promiseStreamObject); } }, function (err) { callback(null, err); }) }); } } }) } } createSecondFunction(index, transformenrArray, stream) { const self = this; return function (index, transformenrArray, stream, callback) { const streamObject = stream; const transformerId = transformenrArray[index].split(':')[0]; const transformerVersion = transformenrArray[index].split(':')[1]; var transformResponse = self.tesobj.service(transformerId, transformerVersion, streamObject, (transformerFunction, transformerType) => { transformerFunction = self.returnNonStreamingTransformerFunction(); transformerType = 'non_streaming'; console.log(transformerFunction.toString()); if (transformerFunction instanceof Error) { } else { if (transformerType === 'streaming') { streamObject.on('data', function (data) { console.log() }) let streamParser = JSONStream.parse('*'); console.log("streaming +++ " + streamObject); console.log(transformerFunction); streamParser.on('data', transformerFunction); let toPassStreamObject = streamObject.pipe(JSONStream.stringify()) .pipe(streamParser) toPassStreamObject.on('data', function (data) { console.log("first function " + data); }) if (index === transformenrArray.length - 1) { callback(null, toPassStreamObject); } else { callback(null, index + 1, transformenrArray, toPassStreamObject); } } else { console.log("streaming in second " + streamObject); streamObject.on('data', function (data) { console.log("insde second " + data); }) transformerFunction(streamObject, function (resultObject) { console.log("inside transformer function callback "); const convertNostreamToStreamPromise = self.convertNostreamToStream(resultObject); convertNostreamToStreamPromise.then(function (promiseStreamObject) { if (index === transformenrArray.length - 1) { log.info("transformation complete"); callback(null, promiseStreamObject); } else { callback(null, index + 1, transformenrArray, promiseStreamObject); } }, function (err) { callback(null, err); }) }); } } }) } } returnStreamingTransformerFunction() { return function (data) { data['Date'] = data["date"]; delete data['date']; } } returnNonStreamingTransformerFunction() { return function (streamObject, cb) { var result = []; console.log(streamObject); let dataObject = []; console.log("TEST+++++ "); streamObject.on('end', function () { console.log("Inside End"); result = _.chain(dataObject) .groupBy('date') .map((group, key) => ({ key, val: _.sumBy(group, 'id') })) .value(); result.forEach(function (data) { data['Dateee'] = data['key']; data['sum'] = data['val']; delete data['key']; delete data['val']; }); console.log("TEST++++ 2 "); cb(result); }); } } 

He intentado replicar mi transformador en dos funciones y devolverlas, cuando llamo a mi servicio por primera vez, no obtengo nada, este espera el evento final del transformador sin transmisión al que se llama en crear una segunda función.

 streamObject.on('end', function () 

cuando llamo al servicio por segunda vez sin reiniciarlo, el evento se activa y se envía la respuesta, no sé por qué no ocurre por primera vez.

No estoy seguro de poder explicar el problema a través de este código ficticio.