AMQP with RabbitMQ and Java Client
June 4, 2010
Here's some example code, similar in functionality to the fanout example at http://www.drumcoder.co.uk/blog/2010/jun/04/amqp-rabbitmq/, but using the standard RabbitMQ Java client instead of python.
This code will interoperate with the python code at the other blog entry.
Client Side Code
Here's the equivalent code to receive messages and print them on the console:
package com.company.rabbitmq.test; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConnectionParameters; import com.rabbitmq.client.QueueingConsumer; public class RabbitClient { public static void main(String[] args) { ConnectionParameters lConnectionParameters = new ConnectionParameters(); lConnectionParameters.setUsername("guest"); lConnectionParameters.setPassword("guest"); lConnectionParameters.setVirtualHost("/"); ConnectionFactory lFactory = new ConnectionFactory(lConnectionParameters); try { Connection lConnection = lFactory.newConnection("localhost", 5672); Channel lChannel = lConnection.createChannel(); String lQueueName = "JavaQueueTest" + args[0]; System.out.println(lQueueName); // Parameters are queue name, passive, durable, exclusive, autoDelete, arguments // See http://www.rabbitmq.com/releases/rabbitmq-java-client/v1.7.2/rabbitmq-java-client-javadoc-1.7.2/ lChannel.queueDeclare(lQueueName, false, true, false, false, null); // Parameters are exchange name, type, passive, durable, autoDelete, arguments lChannel.exchangeDeclare("fanOutExchange", "fanout", false, true, false, null); lChannel.queueBind(lQueueName, "fanOutExchange", ""); boolean lNoAck = false; QueueingConsumer lConsumer = new QueueingConsumer(lChannel); lChannel.basicConsume(lQueueName, lNoAck, lConsumer); while (true) { QueueingConsumer.Delivery lDelivery; try { lDelivery = lConsumer.nextDelivery(); } catch (InterruptedException lIeEx) { continue; } System.out.println(new String(lDelivery.getBody())); lChannel.basicAck(lDelivery.getEnvelope().getDeliveryTag(), false); } } catch (Exception lIoException) { throw new RuntimeException(lIoException); } } }
Server Side Code
Here's the code to send a single message to an exchange from the Java client:
package com.company.rabbitmq.test; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConnectionParameters; public class RabbitTest { public static void main(String[] args) { ConnectionParameters lConnectionParameters = new ConnectionParameters(); lConnectionParameters.setUsername("guest"); lConnectionParameters.setPassword("guest"); lConnectionParameters.setVirtualHost("/"); ConnectionFactory lFactory = new ConnectionFactory(lConnectionParameters); try { Connection lConnection = lFactory.newConnection("localhost", 5672); Channel lChannel = lConnection.createChannel(); byte[] lMessageBodyBytes = "Java Test message!".getBytes(); // Parameters to constructor for new AMQP.BasicProperties are: // (contentType, contentEncoding, headers, deliveryMode, priority, correlationId, replyTo, // expiration, messageId, timestamp, type, userId, appId, clusterId) // Here we're just specifying that the message is persistant AMQP.BasicProperties lMessageProperties = new AMQP.BasicProperties(null, null, null, new Integer(2), null, null, null, null, null, null, null, null, null, null); lChannel.basicPublish("fanOutExchange", "", lMessageProperties, lMessageBodyBytes); lChannel.close(); lConnection.close(); } catch (Exception lIoException) { throw new RuntimeException(lIoException); } } }