...
Code Block |
---|
language | java |
---|
title | Publisher |
---|
linenumbers | true |
---|
|
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class RabbitMqLogsSender
{
private static String EXCHANGE_NAME = "logs"; // specify as ReSTful URI param
private static final String HOST_ID = "192.168.99.100"; // put in a config file
public static void main(String[] argv) throws Exception
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST_ID); // put this in a config file
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message = argv[0];
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
|
LINE 18 - channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
Code Block |
---|
language | java |
---|
title | Subscriber |
---|
linenumbers | true |
---|
|
import com.rabbitmq.client.*;
import java.io.IOException;
public class RabbitMqLogsReceiver
{
private static String EXCHANGE_NAME = "logs";
private static final String HOST_ID = "192.168.99.100";
public static void main(String[] argv) throws Exception
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST_ID);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
// Once a channel has been created, yu can send messages to yourself on it...
String message = "Sending a message to myself to start";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [*] Waiting for messages on: " + queueName + ". To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException
{
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
|
LINE XX - channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);