AMQP with RabbitMQ and ActionScript

June 14, 2010

A requirement in a current project is to be able to receive messages from the RabbitMQ server in a Flex client. I managed to do this using as3-amqp library available on github, but the documentation is very sparse and most of this was worked out from the source. The old blog entry about this (http://hopper.squarespace.com/blog/2008/3/24/as3-amqp-client-first-cut.html) is woefully out of date and doesn't work with the latest version of the library.

The as3-amqp library is available from http://github.com/0x6e6562/as3-amqp/.

I'm not an ActionScript developer, I'm sure some of this code could be improved.

Installation

Get hold of the code using git, and then run ant in the directory with a build.xml. This will build the required amqp.swc library file.

Code

Here's some basic code for receiving messages from the RabbitMQ server, and displaying an alert with the message contents.

First you'll need to create a PushSerializer.as class:

package
{
    import flash.utils.IDataInput;
    import flash.utils.IDataOutput;

    import mx.controls.Alert;

    import org.amqp.patterns.Serializer;

    public class PushSerializer implements Serializer
    {
        public function serialize(o:*, stream:IDataOutput):void 
        {
            // Don't need to implement this - we're not pushing messages from Flex to the server
        }

        public function deserialize(stream:IDataInput):* 
        {
            var lReturn : String = "";
            try
            {
                while(true)
                {
                    lReturn += stream.readUTFBytes(1);
                }   
            }
            catch(lError:Error)
            {
                // do nothing
            }

            return lReturn;
        }
    }
}

Next, we need to write code to deal with the message returned - here we're just going to print an alert with the contents of the message.

import org.amqp.Connection;
import org.amqp.ConnectionParameters;
import org.amqp.SessionManager;
import org.amqp.patterns.SubscribeClient;
import org.amqp.patterns.impl.SubscribeClientImpl;
import org.amqp.patterns.CorrelatedMessageEvent;
import org.amqp.patterns.Serializer;

private var sessionManager : SessionManager;
private var serializer:Serializer = new PushSerializer();
private var amqpConnection:Connection = null;

private function onConsumeAmqpMessage(event:CorrelatedMessageEvent):void
{
  var lMessage:String = event.result;
  Alert.show(lMessage);
}

onPreinitialize is called from the preinitialize attribute of <mx:Application>

private function onPreinitialize():void
{
  // Create AMQP Queue and connect to exchange
  var lState:ConnectionParameters = new ConnectionParameters();
  lState.username = "guest";
  lState.password = "guest";
  lState.vhostpath = "/";
  lState.serverhost = "localhost";             
  this.amqpConnection = new Connection(lState);
  var lSubscribeClient:SubscribeClientImpl = new SubscribeClientImpl(this.amqpConnection); 
  lSubscribeClient.serializer = this.serializer;  
  lSubscribeClient.exchange="fanOutExchange";
  lSubscribeClient.exchangeType="fanout";
  lSubscribeClient.subscribe("", onConsumeAmqpMessage);       
}

subscribe() normally takes a routing key as the first parameter, but as we're using a fanout exchange here we don't need to specify one.

Security Sandbox Violation

If you run this from within Flex Builder, it should work fine. If you then deploy your application and try and access a remote RabbitMQ server, it'll probably fail with:

Error #2044: Unhandled securityError:. text=Error #2048: Security sandbox violation: 
http://localhost:8080/application.swf cannot load data from amqp-server:5672.

To get around this, you need to serve a crossdomain.xml file on port 843 from the amqp-server machine. This can be done easily using the code from http://github.com/dansimpson/amqp-js/tree/master/policy-server/. Download the three source files from here, and then run:

python server.py --port=843 --file=/path/to/crossdomain.xml