El mensaje con letra muerta no se consume en RabbitMQ y en el nodo mediante AMQP.Node

Quiero recibir un mensaje después de cierto tiempo en uno de mis trabajadores. Decidí ir con Node y RabbitMQ después de descubrir los llamados intercambios de letras muertas.

El mensaje parece ser enviado a la cola en DeadExchange, pero el consumidor nunca recibe el mensaje después del tiempo transcurrido en el WorkQueue en el WorkExchange. ¿O el bindQueue está desactivado, o la letra muerta no funciona?

He probado muchos valores diferentes ahora. ¿Alguien por favor puede señalar lo que me estoy perdiendo?

var amqp = require('amqplib'); var url = 'amqp://dev.rabbitmq.com'; amqp.connect(url).then(function(conn) { //Subscribe to the WorkQueue in WorkExchange to which the "delayed" messages get dead-letter'ed (is that a verb?) to. return conn.createChannel().then(function(ch) { return ch.assertExchange('WorkExchange', 'direct').then(function() { return ch.assertQueue('WorkQueue', { autoDelete: false, durable: true }) }).then(function() { return ch.bindQueue('WorkQueue', 'WorkExchange', ''); }).then(function() { console.log('Waiting for consume.'); return ch.consume('WorkQueue', function(msg) { console.log('Received message.'); console.log(msg.content.toString()); ch.ack(msg); }); }); }) }).then(function() { //Now send a test message to DeadExchange to a random (unique) queue. return amqp.connect(url).then(function(conn) { return conn.createChannel(); }).then(function(ch) { return ch.assertExchange('DeadExchange', 'direct').then(function() { return ch.assertQueue('', { arguments: { 'x-dead-letter-exchange': 'WorkExchange', 'x-message-ttl': 2000, 'x-expires': 10000 } }) }).then(function(ok) { console.log('Sending delayed message'); return ch.sendToQueue(ok.queue, new Buffer(':)')); }); }) }).then(null, function(error) { console.log('error\'ed') console.log(error); console.log(error.stack); }); 

Estoy usando amqp.node ( https://github.com/squaremo/amqp.node ) que es amqplib en npm. Aunque node-amqp ( https://github.com/postwait/node-amqp ) parece ser mucho más popular, no implementa el protocolo completo y hay algunos problemas pendientes relacionados con la reconexión.

dev.rabbitmq.com está ejecutando RabbitMQ 3.1.3.

Este es un código de trabajo. Cuando un mensaje gasta más que ttl en DeadExchange, se envía a WorkExchange. La clave del éxito es definir la clave de enrutamiento correcta. La cola de intercambio a la que desea enviar la publicación ttl, debe estar delimitada por una clave de enrutamiento (nota: no predeterminada), y el valor de los atributos de “x-dead-letter-letter-key-key” debe coincidir con esa clave de ruta.

 var amqp = require('amqplib'); var url = 'amqp://localhost'; amqp.connect(url).then(function(conn) { //Subscribe to the WorkQueue in WorkExchange to which the "delayed" messages get dead-letter'ed (is that a verb?) to. return conn.createChannel().then(function(ch) { return ch.assertExchange('WorkExchange', 'direct').then(function() { return ch.assertQueue('WorkQueue', { autoDelete: false, durable: true }) }).then(function() { return ch.bindQueue('WorkQueue', 'WorkExchange', 'rk1'); }).then(function() { console.log('Waiting for consume.'); return ch.consume('WorkQueue', function(msg) { console.log('Received message.'); console.log(msg.content.toString()); ch.ack(msg); }); }); }) }).then(function() { //Now send a test message to DeadExchange to DEQ queue. return amqp.connect(url).then(function(conn) { return conn.createChannel(); }).then(function(ch) { return ch.assertExchange('DeadExchange', 'direct').then(function() { return ch.assertQueue('DEQ', { arguments: { 'x-dead-letter-exchange': 'WorkExchange', 'x-dead-letter-routing-key': 'rk1', 'x-message-ttl': 15000, 'x-expires': 100000 } }) }).then(function() { return ch.bindQueue('DEQ', 'DeadExchange', ''); }).then(function() { console.log('Sending delayed message'); return ch.publish('DeadExchange', '', new Buffer("Over the Hills and Far Away!")); }); }) }).then(null, function(error) { console.log('error\'ed') console.log(error); console.log(error.stack); }); 

Hubo un error en Channel # assertQueue en AMQP.Node que se solucionó, consulte https://github.com/squaremo/amqp.node/commit/3749c66b448875d2df374e6a89946c0bdd0cb918 . La solución está en GitHub pero no en npm todavía.

Intereting Posts