rabbitMQ是一種高性能的消息隊列,支持或者說它實現了AMQP協議(advanced message queue protocol高級消息隊列協議)。
下面簡單講一講一個小例子。我們首先要部署好rabbitMQ,然后實現一個生產者—消費者,生產者向rabbit中發布一個消息,消費者去rabbit取這個消息,在正確收到這個消息后,消費者會通過返回隊列回寫通知生產者自己收到了消息。
windows下部署rabbit非常簡單,先安裝erlang運行時,然后安裝rabbitMQ安裝文件即可,都是exe的,很簡單。然后找到rabbit的sbin目錄里的bat即可啟動rabbitMQ。
下面是producer—consumer代碼:
package com.hzfi.rabbitmq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; public class Producer { private final static String QUEUE_NAME = "myQueue"; //上送隊列 public static void main(String[] args) throws IOException, TimeoutException{ String replyQueueName = null; //返回隊列名 ConnectionFactory connFactory = null; Connection conn = null; Channel channel = null; try{ connFactory = new ConnectionFactory(); connFactory.setHost("localhost"); conn = connFactory.newConnection(); channel = conn.createChannel(); //返回隊列 replyQueueName = channel.queueDeclare().getQueue(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); String corrId = java.util.UUID.randomUUID().toString(); //用來表示返回隊列結果的id,唯一 BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build(); String msg = "linyang@hzfi.cn"; channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicPublish("", QUEUE_NAME, props, msg.getBytes()); System.out.println("producer has published: \"" + msg + "\""); while(true){ Thread.sleep(1000); Delivery delivery = consumer.nextDelivery(); System.out.println("from server reply:" + new String(delivery.getBody())); } }catch(IOException ioe){ ioe.printStackTrace(); }catch(TimeoutException toe){ toe.printStackTrace(); } catch (ShutdownSignalException e) { e.printStackTrace(); } catch (ConsumerCancelledException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ if(channel!=null) channel.close(); if(conn!=null) conn.close(); } } }
package com.hzfi.rabbitmq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; public class Consumer { private final static String QUEUE_NAME = "myQueue"; public static void main(String[] args) throws IOException, TimeoutException{ ConnectionFactory connFactory = null; Connection conn = null; Channel channel = null; try{ connFactory = new ConnectionFactory(); connFactory.setHost("localhost"); conn = connFactory.newConnection(); channel = conn.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("listening for event message..."); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while(true){ Thread.sleep(1000); Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties reply_props = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build(); String msg = new String(delivery.getBody(),"utf-8"); System.out.println("receive msg:" + msg); String retMsg = "ok, give you reply:" + new String(msg.getBytes(),"utf-8"); System.out.println("Consumer中的返回隊列名" + props.getReplyTo()); channel.basicPublish( "", props.getReplyTo(), reply_props, retMsg.getBytes()); } }catch(IOException ioe){ ioe.printStackTrace(); }catch(TimeoutException toe){ toe.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ if(channel!=null) channel.close(); if(conn!=null) conn.close(); } } }
開啟RabbitMQ的后台管理服務(是個web頁面)
\sbin>rabbitmq-plugins enable rabbitmq_management
訪問地址 http://localhost:15672/ id/psw: guest/guest
可以對隊列,用戶,權限等進行管理,例如,默認情況下密碼是任意,如上代碼所示,ConnectionFactory僅僅設置了主機名,並未設置用戶名和密碼。
我們可以新建或修改一個用戶名和密碼,如下圖:
這樣,我們上面的代碼也要做相應的調整:
ConnectionFactory connFactory = new ConnectionFactory(); connFactory.setHost("localhost"); connFactory.setUsername("guest"); connFactory.setPassword("123");