RabbitMQ Despatcher Process

February 21, 2017

I wanted to have a C# process that would listen for entries on a queue, and then reliably despatch them out to other exchanges - one for external consumption, plus a couple of reliable ones for internal usage.

Here's the code I came up with to do this. The intention is to change this into a windows service so that it is always running in my deployed environment.

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DespatchController
{
    class Program
    {
        // Define the queues that will receive reliable messages
        public static readonly string[] MODULE_QUEUES_OUT = {"DocProd", "CashBook"};

        // Define the exchanges that each message will be passed to
        public static readonly string EXCHANGE_INTERNAL_OUT = "InternalOut";
        public static readonly string EXCHANGE_EXTERNAL_OUT = "ExternalOut";

        public static readonly string QUEUE_GLOBAL_IN = "Global";
        public static readonly string EXCHANGE_MESSAGE_IN = "MessageIn";

        /*
         * Listen on the input queue, and then send out on each of the module queues, 
         * plus the customer external exchange if anyone is listening.
         */
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost", 
                                                                       VirtualHost = "EnvName", 
                                                                       UserName="drumcoder", 
                                                                       Password="sekrit" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    // Customer messages out
                    channel.ExchangeDeclare(exchange: EXCHANGE_EXTERNAL_OUT, type: ExchangeType.Topic);

                    // Internal messages out
                    channel.ExchangeDeclare(exchange: EXCHANGE_INTERNAL_OUT, type: ExchangeType.Fanout);

                    // For each of the output queues, declare them and then bind to the internal exchange
                    for (int i = 0; i < MODULE_QUEUES_OUT.Length; i++)
                    {
                        String lQueueName = MODULE_QUEUES_OUT[i];
                        channel.QueueDeclare(queue: lQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                        channel.QueueBind(queue: lQueueName, exchange: EXCHANGE_INTERNAL_OUT, routingKey: "");
                    }

                    // Internal messages in
                    channel.QueueDeclare(queue: QUEUE_GLOBAL_IN, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.ExchangeDeclare(exchange: EXCHANGE_MESSAGE_IN, type: ExchangeType.Fanout);
                    channel.QueueBind(queue: QUEUE_GLOBAL_IN, exchange: EXCHANGE_MESSAGE_IN, routingKey: "");

                    // Define a quality of service setting so that we only read one message off the queue at once.  This means that 
                    // if we restart this process, no messages are lost
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);

                        var props = channel.CreateBasicProperties();
                        props.Persistent = true;

                        // Publish to the two output exchanges
                        channel.BasicPublish(exchange: EXCHANGE_INTERNAL_OUT, routingKey:ea.RoutingKey, basicProperties:props, body:body);
                        channel.BasicPublish(exchange: EXCHANGE_EXTERNAL_OUT, routingKey:ea.RoutingKey, basicProperties:props, body:body);

                        // Once we've published successfully, acknowledge the message so that it is marked as done.  
                        // Failing to do this will cause the message to retry
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };

                    // consume the input queue
                    channel.BasicConsume(queue: QUEUE_GLOBAL_IN, noAck: false, consumer: consumer);

                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
        }
    }
}