Pages

Tuesday, 25 February 2014

AMQP support for WSO2 ESB 4.8

Introduction

WSO2 ESB is a High Performance, Light weight, Open Source Enterprise Service Bus. It also has inbuilt support for integrating different technologies which uses different transports protocols. Some of the well known transports that WSO2 ESB supports are HTTP, HTTPS, POP, IMAP, SMTP, JMS, FIX, TCP, UDP, FTPS, SFTP, CIFS, MLLP, SMS.

AMQP is an application layer level messaging protocol for message oriented architecture. It operates like the same way as HTTP, FTP, SMTP etc, to make systems inter-operate with each other. It address the issues that are faced by systems where the inter-operability is achieved by using well defined API’s (e.g JMS). For example, if your system wants to talk to another system over JMS, you have to implement the JMS API. Whereas AMQP is a wire-level protocol which describe the format of the data that is sent across the network. So irrespective of the implementation language, any system/tool that can create/send/consume/read the AMQP messages which ad-hear to the AMQP data format, gets the ability to inter-operate with each other.

RabbitMQ Java Client is one such tool which allows you to send or receive AMQP messages. Apart from this, RabbitMQ is an AMQP broker implementation, which can be used as an AMQP broker too. In this post i will be explaining about the new AMQP transport for WSO2 ESB, which is implemented using the




The above scenario is considered here. This will demonstrate the ability of  WSO2 ESB to consume/publish AMQP messages from/to an AMQP broker. The “Sender” here can be anything, which is capable of publishing messages to an AMQP queue. Similarly the “Receiver” can be anything, which can consume messages form an AMQP queue. In this post I will be using RabbitMQ Java Client library in sending/receiving AMQP messages in both Sender and Receiver implementation.

A proxy service in ESB will be listening to Q1. When there is a message available in Q1, ESB will consume it. If the proxy defined in such a way that the messages should be sent to an AMQP destination , say Q2, then the consumed messages will be sent to the Q2. The proxy service configuration for this scenario is explained later in this post.

Installing RabbitMQ AMQP transport feature into WSO2 ESB

The RabbitMQ AMQP transport is developed as a separate module for transports project. It is also available as an installable p2 feature, which can be installed via the WSO2 Feature Manager in WSO2 ESB.
Following are the steps to install this feature.

1. Start the ESB server.

2. Download and unzip the p2-repo.zip file to some location. Copy the path (say /home/esb/p2-repo).

3. Go to Configure > Features from the management console view and add a new local repository by giving the path copied above.

4. Select the added repository and tick “Show only the latest versions” and click on Find features. You will see the “Axis2 Transport RabbitMQ AMQP” feature listed. Select that and install it.

5. After successful installation, shutdown the server.
Alternatively you can just copy both the jars found in plugins directory to {ESB_HOME}/repository/components/dropins/ directory.
Now the next thing is to configure the transport (Listener and Sender) in axis2.xml
Configuring RabbitMQ AMQP Transport in WSO2 ESB

1. Add the following configuration items to axis2.xml found in {ESB_HOME}/repository/conf/axis2/axis2.xml

(i) Under transport listeners section add the following RabbitMQ transport listener.

<transportReceiver name="rabbitmq" class="org.apache.axis2.transport.rabbitmq.RabbitMQListener">
   <parameter name="AMQPConnectionFactory" locked="false">
      <parameter name="rabbitmq.server.host.name" locked="false">192.168.0.3</parameter>
      <parameter name="rabbitmq.server.port" locked="false">5672</parameter>
      <parameter name="rabbitmq.server.user.name" locked="false">user</parameter>
      <parameter name="rabbitmq.server.password" locked="false">abc123</parameter>
   </parameter>
</transportReceiver>

The parameters are self explanatory, which are used to create connection to AMQP broker. You can define any number of connection factories under <transportReceiver/> definition.

(ii) Under transport senders section add the following RabbitMQ transport sender.
<transportSender name="rabbitmq"
class="org.apache.axis2.transport.rabbitmq.RabbitMQSender"/>
This is the transport sender which is used for sending AMQP messages out to a queue.

2. Start the ESB server.

Creating a proxy service which works with RabbitMQ AMQP transport
A sample proxy service which consumes and sends AMQP messages from and
to an RabbitMQ AMQP broker

<proxy xmlns="http://ws.apache.org/ns/synapse" name="AMQPProxy"
transports="rabbitmq" statistics="disable" trace="disable"
startOnLoad="true">
   <target>
      <inSequence>
         <log level="full"/>
         <property name="OUT_ONLY" value="true"/>
         <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
      </inSequence>
      <endpoint>
         <address
         uri="rabbitmq:/AMQPProxy?rabbitmq.server.host.name=192.168.0.3&amp; rabbitmq.server.port=5672&amp; rabbitmq.server.user.name=user&amp; rabbitmq.server.password=abc123&amp; rabbitmq.queue.name=queue2&amp; rabbitmq.exchange.name=exchange2"/>
      </endpoint>
   </target>
   <parameter name="rabbitmq.queue.name">queue1</parameter>
   <parameter name="rabbitmq.exchange.name">exchange1</parameter>
   <parameter name="rabbitmq.connection.factory">AMQPConnectionFactory</parameter>
   <description></description>
</proxy>

1.  Transport configuration (transports=”rabbitmq”)
The proxy is defined under amqp transport as OUT_ONLY proxy service, where it does not expect a response from the endpoint.

2. RabbitMQ AMQP Connection factory configuration (AMQPConnectionFactory)
In here we specify that the name of connection factory which used to listen to the queue to consume messages as a parameter. This connection factory is defined in the AMQP transport listener configuration of axis2.xml. This proxy reads message from the specified queue and sends the messages to the defined in the endpoint address.

3. RabbitMQ Queue name to listen for messages (rabbitmq.queue.name)
This is the queue name that the listener configuration for AMQPConnectionFactory will be listening on. If no name is specfied, then it is assumed the the name of the Proxy Service used as the queue name.

4. RabbitMQ Endpoint configuration
 <address
         uri="rabbitmq:/AMQPProxy?rabbitmq.server.host.name=192.168.0.3&amp; rabbitmq.server.port=5672&amp; rabbitmq.server.user.name=user&amp; rabbitmq.server.password=abc123&amp; rabbitmq.queue.name=queue2&amp; rabbitmq.exchange.name=exchange2"/>

The endpoint address should be of the format specified as above. All the parameters needed for the proxy service to send messages to endpoint should be given in the address uri format as mentioned above. If the exchange name is not specified, then as default, an empty value will be used as the exchange name.

5. RabbitMQ AMQP Transport properties
rabbitmq.server.host.name – Host name of the RabbitMQ server running on.
rabbitmq.server.port – Port value on which the server is running.
rabbitmq.server.user.name – User name to connect to RabbitMQ server.
rabbitmq.server.password – Password of the account to connect to RabbitMQ server.
rabbitmq.server.virtual.host – Virtual host name of the RabbitMQ server running on, if any.
rabbitmq.queue.name – Queue name to send or consume messages.
rabbitmq.exchange.name – Exchange name the queue is bound.

There can be situations where you only want to send AMQP messages out OR only want receive AMQP messages. The above proxy can be changed to suite both of the above scenarios. Also you can change it to work with different transport as-well. For example, a proxy which listen messages over AMQP and send over HTTP or JMS, etc.

Sample Proxy Service in operation

Note : This feature is currently tested for soap messages which are sent and consumed from AMQP broker queues with content type “text/xml”.

1. When the proxy service is created by following the above steps and deployed in ESB, it will be listening to the queue specified in rabbitmq.connection.factory under RabbitMQ AMQP Transport Listener. When there is a message available in queue, it will be consumed by the listener.

2. The consumed message will be then be sent to the endpoint queue specified in the Endpoint configuration of proxy.

A sample java client to send soap xml message to an AMQP queue

In a previous post of mine, I explained on how to use RabbitMQ Java client to send/receive messages. In here i’m using the same way to send and receive messages from/to an RabbitMQ queue.
Note : Give the correct queue name for publishing messages. This will be the same queue where the AMQP Transport listener will be listening on.

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setUsername(userName);
factory.setPassword(password);
factory.setPort(port);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueBind(queueName, exchangeName, routingKey);

// The message to be sent
String message = "<soapenv:Envelope
                  xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope">\n" +
                 "<soapenv:Header/>\n" +
                 "<soapenv:Body>\n" +
                 "  <p:greet xmlns:p=\"http://greet.service.kishanthan.org\">\n" +
                 "     <in>" + name + "</in>\n" +
                 "  </p:greet>\n" +
                 "</soapenv:Body>\n" +
                 "</soapenv:Envelope>";

// Populate the AMQP message properties
AMQP.BasicProperties.Builder builder = new
AMQP.BasicProperties().builder();
builder.messageId(messageID);
builder.contentType("text/xml");
builder.replyTo(replyToAddress);
builder.correlationId(correlationId);
builder.contentEncoding(contentEncoding);

// Custom user properties
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("SOAP_ACTION", "greet");
builder.headers(headers);

// Publish the message to exchange
channel.basicPublish(exchangeName, queueName, builder.build(), message.getBytes());

A sample java client to consume the message from an AMQP queue

Note : Give the correct queue name for consuming messages. In here the queue name will be the one configured in the Endpoint configuration of the proxy service.

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
factory.setUsername(userName);
factory.setPassword(password);
factory.setPort(port);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueBind(queueName, exchangeName, routingKey);

// Create the consumer
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);

// Start consuming messages
while (true) {
   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
   String message = new String(delivery.getBody());
}

The above two java clients can be used to test the scenario where you publish a message to a RabbitMQ AMQP queue, which is consumed by the ESB and published to another queue, which in turns consumed by a java client. When running both the above clients, you will get the messages sent by the Sender client at Receiver side. If the above works as expected, then you have configured RabbitMQ AMQP transport correctly in WSO2 ESB.

Conclusion

This new RabbitMQ AMQP transport implementation will solve the issue of calling an AMQP broker, such as RabbitMQ, directly without the need to use different transport mechanisms, such as JMS. The underlying protocol used in this transport is AMQP.


No comments:

Post a Comment