RabbitMQ

Info

I love it...

After years of working with CORBA and not really seeing anything much on the market to replace it, I was forced to look at an alternative in the form of WebServices using a ReSTful api called Jersey - absolute nightmare to work with it.  It turns a simple task of marshalling complex objects across the wire into an Eaton Mess.  It's a mishmash of code that just doesn't work properly, and it's hard to debug and test.

I've been familiar with MOM based technologies since the 1990s (DDE, JMS, MS-MQ, MQ-Series etc).  I had heard about RabbitMQ some time ago but never actually used.  So when I decided to get my teeth into it, I was shocked how easy it was to deploy and work.  It's elegant and easy to work with.

I'm working with the pub/sub model.  The examples given in the RabbitMQ documentation are pretty much outlined in the code that follows

Publisher
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class RabbitMqLogsSender
{
    private static String EXCHANGE_NAME = "logs"; // specify as ReSTful URI param
    private static final String HOST_ID = "192.168.99.100"; // put in a config file

    public static void main(String[] argv) throws Exception
    {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST_ID);   // put this in a config file
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        String message = argv[0];

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }

LINE 18 - channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

Subscriber
import com.rabbitmq.client.*;

import java.io.IOException;

public class RabbitMqLogsReceiver
{
    private static String EXCHANGE_NAME = "logs";
    private static final String HOST_ID = "192.168.99.100";

    public static void main(String[] argv) throws Exception
    {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST_ID);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        
        // Once a channel has been created, yu can send messages to yourself on it...
        String message = "Sending a message to myself to start";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));

        System.out.println(" [*] Waiting for messages on: " + queueName + ". To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel)
        {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body) throws IOException
            {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

LINE 17 - channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

Declaring Exchanges

Calls to declare Queues or Exchanges in RabbitMQ are idempotent.  Each time the api call exchangeDeclare() is called with the same Exchange name, the same exchange is returned that the first call would have returned.  If you attempt to perform an operation on an exchange that has not been created yet, RabbitMQ will throw an exception.  

Design Decision

I want to design the applications so that if publishers or subscribers are used before is created, RabbitMQ will throw an exception.  I have achieved this by removing line 18 of publisher and line 17 of subscriber.  I have then created a new application whose sole purpose is to create the exchange

Exchange Creator
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 *
 * @author Selvyn
 * 
 * I wrote this to test the idea of how exchanges get created and persisted
 * 
 * Once this code runs, if run rabbitmqctl list exchanges, you should the see the 
 * exchange name that was passed in argv[0]
 */
public class RabbitMqLogMgr
{
    private static final String EXCHANGE_NAME = "logs"; // specify as ReSTful URI param
    private static final String HOST_ID = "192.168.99.100"; // put in a config file

    public static void main(String[] argv) throws Exception
    {
        if( argv.length < 1)
        {
            System.err.println("You must specify an exchange name to the application");
            return;
        }
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST_ID);   // put this in a config file
        try (Connection connection = factory.newConnection(); 
                Channel channel = connection.createChannel())
        {
            channel.exchangeDeclare(argv[0], BuiltinExchangeType.FANOUT);
        }
    }
}

Works well.  You have to run this application before the Publisher and Subscriber.  

You don't really have to architect your applications like this, but I wanted to create the illusion to my students that by running this application they were creating the channel through which publish and subscribers would communicate.