AMQP with RabbitMQ and Java Client

June 4, 2010

Here's some example code, similar in functionality to the fanout example at http://www.drumcoder.co.uk/blog/2010/jun/04/amqp-rabbitmq/, but using the standard RabbitMQ Java client instead of python.

This code will interoperate with the python code at the other blog entry.

Client Side Code

Here's the equivalent code to receive messages and print them on the console:

package com.company.rabbitmq.test;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConnectionParameters;
import com.rabbitmq.client.QueueingConsumer;

public class RabbitClient
{
  public static void main(String[] args)
  {
    ConnectionParameters lConnectionParameters = new ConnectionParameters();
    lConnectionParameters.setUsername("guest");
    lConnectionParameters.setPassword("guest");
    lConnectionParameters.setVirtualHost("/");

    ConnectionFactory lFactory = new ConnectionFactory(lConnectionParameters);

    try
    {
      Connection lConnection = lFactory.newConnection("localhost", 5672);
      Channel lChannel = lConnection.createChannel();

      String lQueueName = "JavaQueueTest" + args[0];
      System.out.println(lQueueName);

      // Parameters are queue name, passive, durable, exclusive, autoDelete, arguments
      // See http://www.rabbitmq.com/releases/rabbitmq-java-client/v1.7.2/rabbitmq-java-client-javadoc-1.7.2/
      lChannel.queueDeclare(lQueueName, false, true, false, false, null);
      // Parameters are exchange name, type, passive, durable, autoDelete, arguments
      lChannel.exchangeDeclare("fanOutExchange", "fanout", false, true, false, null);
      lChannel.queueBind(lQueueName, "fanOutExchange", "");

      boolean lNoAck = false;
      QueueingConsumer lConsumer = new QueueingConsumer(lChannel);
      lChannel.basicConsume(lQueueName, lNoAck, lConsumer);
      while (true) {
          QueueingConsumer.Delivery lDelivery;
          try {
              lDelivery = lConsumer.nextDelivery();
          } catch (InterruptedException lIeEx) {
              continue;
          }
          System.out.println(new String(lDelivery.getBody()));
          lChannel.basicAck(lDelivery.getEnvelope().getDeliveryTag(), false);
      }
    }
    catch (Exception lIoException)
    {
      throw new RuntimeException(lIoException);
    }
  }
}

Server Side Code

Here's the code to send a single message to an exchange from the Java client:

package com.company.rabbitmq.test;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConnectionParameters;

public class RabbitTest
{
  public static void main(String[] args)
  {
    ConnectionParameters lConnectionParameters = new ConnectionParameters();
    lConnectionParameters.setUsername("guest");
    lConnectionParameters.setPassword("guest");
    lConnectionParameters.setVirtualHost("/");

    ConnectionFactory lFactory = new ConnectionFactory(lConnectionParameters);

    try
    {
      Connection lConnection = lFactory.newConnection("localhost", 5672);

      Channel lChannel = lConnection.createChannel();

      byte[] lMessageBodyBytes = "Java Test message!".getBytes();

      // Parameters to constructor for new AMQP.BasicProperties are:
      // (contentType, contentEncoding, headers, deliveryMode, priority, correlationId, replyTo, 
      // expiration, messageId, timestamp, type, userId, appId, clusterId)
      // Here we're just specifying that the message is persistant
      AMQP.BasicProperties lMessageProperties 
       = new AMQP.BasicProperties(null, null, null, new Integer(2), null, null, null, null, null, null, null, null, null, null);
      lChannel.basicPublish("fanOutExchange", "", lMessageProperties, lMessageBodyBytes);
      lChannel.close();
      lConnection.close();
    }
    catch (Exception lIoException)
    {
      throw new RuntimeException(lIoException);
    }
  }
}

Tags: amqp java rabbitmq mq