AMQP with RabbitMQ and py-amqplib

June 4, 2010

RabbitMQ is an open source messaging solution written in Erlang. It's available on the majority of linux distributions, and we're going to use a python client here, called py-amqplib.

Simple Test

This first example is largely based on the code at the "Rabbits and Warrens" link below. This is a simple server and client. Start the client first and it will listen for a message. Then run the server, and this will send a single message to the client. The client will print it. Run the server code multiple times and the client will print each message sent.

client.py

Here's the code for a simple client.

from amqplib import client_0_8 as amqp

# connect to server
lConnection = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False)
lChannel = lConnection.channel()

# Create queue.  Queues receive messages.
# Durable means it'll be recreated at reboot.  Auto delete of false means that it will hang
# around when all of the clients disconnect from it.  If it wasn't durable, then it would be removed when the last
# client disconnected.  If exclusive was true, then only this client would be able to see the queue.  We want
# the server to be able to put stuff into this queue, so we've set that to false.
lChannel.queue_declare(queue="myClientQueue", durable=True, exclusive=False, auto_delete=False)

# Create an exchange.  Exchanges public messages to queues  
# durable and auto_delete are the same as for a queue.
# type indicates the type of exchange we want - valid values are fanout, direct, topic
lChannel.exchange_declare(exchange="myExchange", type="direct", durable=True, auto_delete=False)

# Tie the queue to the exchange.  Any messages arriving at the specified exchange 
# are routed to the specified queue, but only if they arrive with the routing key specified
lChannel.queue_bind(queue="myClientQueue", exchange="myExchange", routing_key="Test")

# Define a function that is called when something is received on the queue
def data_receieved(msg):
     print 'Received: ' + msg.body

# Connect the queue to the callback function
# no_ack defaults to false.  Setting this to true means that the client will acknowledge receipt
# of the message to the server.  The message will be sent again if it isn't acknowledged.
lChannel.basic_consume(queue='myClientQueue', no_ack=True, callback=data_receieved, consumer_tag="TestTag")

# Wait for things to arrive on the queue
while True:
     lChannel.wait()

# unregister the message notification callback
# never called in this example, but this is how you do it.
lChannel.basic_cancel("TestTag")

# Close connection 
lChannel.close()
lConnection.close()

server.py

Here's the code for a simple server.

from amqplib import client_0_8 as amqp

# connect to server
lConnection = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False)
lChannel = lConnection.channel()

# Create a message
lMessage = amqp.Message("Test message!")

# Send message as persistant.  This means it will survive a reboot.
lMessage.properties["delivery_mode"] = 2

# publish the message on the exchange
lChannel.basic_publish(lMessage, exchange="myExchange", routing_key="Test")

# Close connection 
lChannel.close()
lConnection.close()

Fanout Exchanges

For the requirement I have at hand, I need to have a distribution mechanism. The server sends a message and I want multiple clients to receive the same message. All messages need to go to all clients. For this, we'll use a fanout exchange, and not use a routing key.

client.py

This client code is very similar to that listed earlier. The changes are to have the first command line parameter attached to the queue name, and to declare the exchange as being of type fanout. We don't use the routing key as these are not needed with a fanout exchange.

To run this code, run multiple clients using python client.py 1 and python client.py 2 etc and then run the server. The message will arrive at all clients.

from amqplib import client_0_8 as amqp
import sys

# connect to server
lConnection = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False)
lChannel = lConnection.channel()

lQueueName = "myClientQueue" + sys.argv[1]
print lQueueName

# Create queue.  Queues receive messages
lChannel.queue_declare(queue=lQueueName, durable=True, exclusive=False, auto_delete=False)

# Create an exchange.  Exchanges public messages to queues.
lChannel.exchange_declare(exchange="fanOutExchange", type="fanout", durable=True, auto_delete=False)

# Tie the queue to the exchange.  Any messages arriving at the specified exchange 
# are routed to the specified queue.
# we don't need a routing key, as we have a fanout exchange
lChannel.queue_bind(queue=lQueueName, exchange="fanOutExchange")

# Define a function that is called when something is received on the queue
def data_receieved(msg):
     print 'Received: ' + msg.body

# Connect the queue to the callback function
lChannel.basic_consume(queue=lQueueName, no_ack=True, callback=data_receieved, consumer_tag="TestTag")

# Wait for things to arrive on the queue
while True:
     lChannel.wait()

# unregister the message notification callback
# never called in this example, but this is how you do it.
lChannel.basic_cancel("TestTag")

lChannel.close()
lConnection.close()

server.py

The only change here is to remove the routing key and change the exchange name

from amqplib import client_0_8 as amqp

# connect to server
lConnection = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False)
lChannel = lConnection.channel()

# Create a message
lMessage = amqp.Message("Test message!")

# Send message as persistant
lMessage.properties["delivery_mode"] = 2

# Publish the message on the exchange.  
# We don't need a routing key because we're using a fanout exchange.
lChannel.basic_publish(lMessage, exchange="fanOutExchange")

lChannel.close()
lConnection.close()

Acknowlegements

A couple of modifications are required to the client to get it to use reliable messaging and acknowledge each message.

In the client.py, modify the basic_consume call to specify that no_ack is false (this is the default, so you could remove the parameter instead)

lChannel.basic_consume(queue=lQueueName, no_ack=False, callback=data_receieved, consumer_tag="TestTag")

Next, modify the data_received callback to acknowledge the message:

def data_receieved(msg):
     print 'Received: ' + msg.body
     lChannel.basic_ack(msg.delivery_tag)

References