RabbitMQ Despatcher Process
February 21, 2017
I wanted to have a C# process that would listen for entries on a queue, and then reliably despatch them out to other exchanges - one for external consumption, plus a couple of reliable ones for internal usage.
Here's the code I came up with to do this. The intention is to change this into a windows service so that it is always running in my deployed environment.
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace DespatchController { class Program { // Define the queues that will receive reliable messages public static readonly string[] MODULE_QUEUES_OUT = {"DocProd", "CashBook"}; // Define the exchanges that each message will be passed to public static readonly string EXCHANGE_INTERNAL_OUT = "InternalOut"; public static readonly string EXCHANGE_EXTERNAL_OUT = "ExternalOut"; public static readonly string QUEUE_GLOBAL_IN = "Global"; public static readonly string EXCHANGE_MESSAGE_IN = "MessageIn"; /* * Listen on the input queue, and then send out on each of the module queues, * plus the customer external exchange if anyone is listening. */ static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "EnvName", UserName="drumcoder", Password="sekrit" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { // Customer messages out channel.ExchangeDeclare(exchange: EXCHANGE_EXTERNAL_OUT, type: ExchangeType.Topic); // Internal messages out channel.ExchangeDeclare(exchange: EXCHANGE_INTERNAL_OUT, type: ExchangeType.Fanout); // For each of the output queues, declare them and then bind to the internal exchange for (int i = 0; i < MODULE_QUEUES_OUT.Length; i++) { String lQueueName = MODULE_QUEUES_OUT[i]; channel.QueueDeclare(queue: lQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind(queue: lQueueName, exchange: EXCHANGE_INTERNAL_OUT, routingKey: ""); } // Internal messages in channel.QueueDeclare(queue: QUEUE_GLOBAL_IN, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.ExchangeDeclare(exchange: EXCHANGE_MESSAGE_IN, type: ExchangeType.Fanout); channel.QueueBind(queue: QUEUE_GLOBAL_IN, exchange: EXCHANGE_MESSAGE_IN, routingKey: ""); // Define a quality of service setting so that we only read one message off the queue at once. This means that // if we restart this process, no messages are lost channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var props = channel.CreateBasicProperties(); props.Persistent = true; // Publish to the two output exchanges channel.BasicPublish(exchange: EXCHANGE_INTERNAL_OUT, routingKey:ea.RoutingKey, basicProperties:props, body:body); channel.BasicPublish(exchange: EXCHANGE_EXTERNAL_OUT, routingKey:ea.RoutingKey, basicProperties:props, body:body); // Once we've published successfully, acknowledge the message so that it is marked as done. // Failing to do this will cause the message to retry channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; // consume the input queue channel.BasicConsume(queue: QUEUE_GLOBAL_IN, noAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } }