RabbitMQ is a well known messaging broker. It gives a common platform for sending and receiving messages. RabbitMQ has some good tutorials in five programming languages. This post wants to show a little example about a message receiver likes a bridge between two message queues. Actually it's very simple, the receiver send messages from the first messaging queue, after the processing of messages it sends them the second messaging queue.
The skeleton of our example is:
Producer#1 -> Queue#1 -> Worker (in another perspective Producer#2) -> Queue#2 -> Receiver
Producer#1 -> Queue#1
The first part is establishment of the first messaging queue and producer code sends messages to our message receiver (let be "worker"). This part is almost same as official tutorial.
public class Producer { private static final String TASK_QUEUE_NAME = "first_task_queue"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = "Hello from Queue#1"; channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" Sent '" + message + "'" + "to Queue#1"); channel.close(); connection.close(); } }
Queue#1 -> Worker (in another perspective Producer#2) -> Queue#2
The second part is heart of our example, this part receives messages from the first queue and send it to the second queue. The worker code has two different queue definitions.
public class Worker { private static final String FIRST_TASK_QUEUE_NAME = "first_task_queue"; private static final String SECOND_TASK_QUEUE_NAME = "second_task_queue"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException {
The first queue definition.
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(FIRST_TASK_QUEUE_NAME, true, false, false, null); System.out.println(" Waiting for messages from Queue#1."); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(FIRST_TASK_QUEUE_NAME, false, consumer);
The second queue definition.
ConnectionFactory factory2 = new ConnectionFactory(); factory2.setHost("localhost"); Connection connection2 = factory2.newConnection(); Channel channel2 = connection2.createChannel(); channel2.queueDeclare(SECOND_TASK_QUEUE_NAME, true, false, false, null);
Receiving from Queue#1 and sending to Queue#2
while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" Received '" + message + "'" + "from Queue#1"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); channel2.basicPublish( "", SECOND_TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" Sent '" + message + "'" + "to Queue#2"); } } }
And the final part is an ordinary receiver. Official tutorials are very helpful and simple, so in my opinion RabbitMQ is a simple tool for sending and receiving messages. I use it on different a producer machine, five worker machines and a saver machine.
No comments:
Post a Comment