Durable Queue in RabbitMQ with C#
February 16, 2017
This post outlines a durable messages approach using RabbitMQ as transport. If the receiver crashes, the message it was processing will be presented again when it restarts.
This is a many receivers to one sender situation. Each message will be processed once by one of the processors, and the messages will be load balanced between them.
Sender code is as follows.
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace RabbitMqDurableSender { class Program { private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); } static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "DurableQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "DurableQueue", basicProperties: properties, body: body); } } } } }
And here's the receiver code
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace RabbitMqDurableReceiver { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) {
First we declare the queue. Note that the queue was also declared in the sender
channel.QueueDeclare(queue: "DurableQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
We then declare that we will only prefetch one item off the queue. This will ensure that we can pick up a message again if the consumer fails.
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
We then declare our code to handle each message. In this case we wait five seconds for each dot in the message.
var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 5000); Console.WriteLine(" [x] Done");
Once we're done processing we Ack the message so that it is cleared off the queue and not reprocessed.
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); };
Finally, sit and wait for entries to appear on the queue
channel.BasicConsume(queue: "DurableQueue", noAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } }