domingo, 16 de noviembre de 2014

Mensajes con retraso en RabbitMQ

Introducción

Desde la llegada de los servicios de mensajería basados en AMQP (Advanced Message Queuing Protocol o Protocolo avanzado de colas de mensajería), la comunicación entre procesos se ha simplificado y ha permitido que ejecutar acciones asíncronas sea mucho más fácil y sencillo. En este caso vamos a tratar con RabbitMQ.

Este no es un tutorial de iniciación, pues consideraremos que el lector tiene un mínimo conocimiento del sistema de mensajería y lo tiene instalado y listo para usar.

Entrando en materia

Uno de los problemas que nos podemos encontrar con RabbitMQ es que no permite elegir cuando consumir un mensaje o si este debe ser consumido a partir de cierto momento, por lo que tenemos que emular ese comportamiento a través de los DLX (Dead Letter Exchanges). Los DLX permiten que cuando un mensaje caduque, este sea reenviardo a otro exchange. Un mensaje caduca cuando no es consumido en un TTL (tiempo de vida) especificado en el propio mensaje.

Conociendo estos procesos, podríamos crear una estructura tal y como podemos ver en la siguiente figura:
En el diagrama, podemos ver como configuramos dos exchanges. El primero de ellos Consumer Exchange, nos servirá como exchange para consumir los mensajes que queramos, por lo que ahí será donde pondremos las distintas colas junto con nuestos consumers para procesar los mensajes. Por otra parte tenemos el Delayer Exchange, donde tenemos una sola cola sin consumer, que no hará nada y servirá simplemente como repositorio donde se almacenarán los mensajes a la espera de que caduquen y ser enviados al Consumer Exchange.

Teniendo en cuenta esta estructura, podemos enviar mensajes al Delayer Exchange con un TTL para indicar en que momento será este mensaje reenviado al Consumer Exchange y así ser procesado.

Este es el código PHP para probar el sistema:

Productor

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * Configura tu conexión con RabbitMQ.
 */
$host  = '127.0.0.1';
$port  = 5672;
$user  = 'guest';
$password = 'guest';

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// Declaramos el exchange y las colas de consumo.
$channel->exchange_declare('consumer_exchange', 'fanout', false, true, false);
$channel->queue_declare('consumer_queue', false, false, false, false);
$channel->queue_bind('consumer_queue', 'consumer_exchange');

// Declaramos el exchange y las colas de retardo.
$channel->exchange_declare('delayer_exchange, 'fanout', false, true, false);
$channel->queue_declare('delayer_queue', false, false, false, false, array("x-dead-letter-exchange" => array("S", "consumer_exchange")));
$channel->queue_bind('delayer_queue', 'delayer_exchange);

// Enviamos un mensaje con un TTL de 5 segundos (5000 milisegundos).
$msg = new AMQPMessage('Fecha de creación: ' . date("Y-m-d H:i:s"), array("x-expires" => 5000)));
$channel->basic_publish($msg, 'delayer_exchange');
echo "Mensaje enviado";

$channel->close();
$connection->close();
?>

Consumidor

<?php
/**
 * Consumidor de mensajes.
 */
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;

/**
 * Configura tu conexión con RabbitMQ.
 */
$host  = '127.0.0.1';
$port  = 5672;
$user  = 'guest';
$password = 'guest';

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// Declaramos el exchange y la cola de consumo.
$channel->exchange_declare('consumer_exchange', 'fanout', false, true, false);
$channel->queue_declare('consumer_queue', false, false, false, false);
$channel->queue_bind('consumer_queue', 'consumer_exchange');

// Función procesadora de mensajes.
$callback = function($msg) {
 echo "La diferencia entre " . date("Y-m-d H:i:s") . " y " . $msg->body . " no debería ser nunca inferior a 5 segundos.\n";

 // Liberamos el mensaje
 $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('consumer_queue', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
 $channel->wait();
}

$channel->close();
$connection->close();
Este sistema se puede utilizar también para hacer reintentos espaciados en el tiempo, haciendo que el mensaje se reenvíe al Delayer Exchange en caso de que en ese momento no se pueda procesar el mensaje y se quiera hacer en un futuro.

Conclusión

Con este sistema tenemos la ventaja de olvidarnos de procesos activos de espera y evitar malgastar recursos del sistema, lo cual puede llegar a bloquear otros mensajes. Por otra parte, este sistema no asegura que el mensaje sea consumido en un tiempo exacto, y solo permite saber a partir de cuando será consumido (TTL).

Para quien necesite realizar esto con diversas colas y no quiera tener infinidad de exchanges, puede modificar el Delayer Exchange para que sea de tipo topic o headers y así definir los distintos DLX a nivel de cola.