轉載請注明出處
0.目錄
1.簡介
本篇博文介紹了在windows平台下安裝RabbitMQ Server端,並用JAVA代碼實現收發消息
2.安裝RabbitMQ
Windows平台安裝完成后如圖
3.啟用RabbitMQ Web控制台
RabbitMQ提供一個控制台,用於管理和監控RabbitMQ,默認是不啟動的,需要運行以下命令進行啟動
- 點擊上圖的Rabbit Command Prompt,打開rabbitMQ控制台
- 在官方介紹管理控制台的頁面,可以看到,輸入以下命令啟動后台控制插件
rabbitmq-plugins enable rabbitmq_management
- 登錄后台頁面:http://localhost:15672/ 密碼和用戶名都是 guest ,界面如下
目前可以先不用理會此界面,后面使用到時會詳細介紹,也可以到這里查看官方文檔。
4.編寫MessageSender
Spring對RabbitMQ已經進行了封裝,正常使用中,會使用Spring集成,第一個項目中,我們先不考慮那么多
在IDE中新建一個Maven項目,並在pom.xml中貼入如下依賴,RabbitMQ的最新版本依賴可以在這里找到
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency>
等待Maven下載完成后,就可以在Maven Dependencies中看到RabbitMQ的JAR
在這里,我們發現,RabbitMQ的日志依賴了slf4j-api這個包,slf4j-api並不是一個日志實現,這樣子是打不出日志的,所以,我們給pom加上一個日志實現,這里用了logback
<dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.1</version> </dependency>
之后maven依賴如下,可以放心寫代碼了
新建一個MessageSender類,代碼如下
1 import java.io.IOException; 2 import java.util.concurrent.TimeoutException; 3 4 import org.slf4j.Logger; 5 import org.slf4j.LoggerFactory; 6 7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.Connection; 9 import com.rabbitmq.client.ConnectionFactory; 10 11 public class MessageSender { 12 13 private Logger logger = LoggerFactory.getLogger(MessageSender.class); 14 15 //聲明一個隊列名字 16 private final static String QUEUE_NAME = "hello"; 17 18 public boolean sendMessage(String message){ 19 //new一個RabbitMQ的連接工廠 20 ConnectionFactory factory = new ConnectionFactory(); 21 //設置需要連接的RabbitMQ地址,這里指向本機 22 factory.setHost("127.0.0.1"); 23 Connection connection = null; 24 Channel channel = null; 25 try { 26 //嘗試獲取一個連接 27 connection = factory.newConnection(); 28 //嘗試創建一個channel 29 channel = connection.createChannel(); 30 //這里的參數在后面詳解 31 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 32 //注意這里調用了getBytes(),發送的其實是byte數組,接收方收到消息后,需要重新組裝成String 33 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 34 logger.info("Sent '" + message + "'"); 35 //關閉channel和連接 36 channel.close(); 37 connection.close(); 38 } catch (IOException | TimeoutException e) { 39 //失敗后記錄日志,返回false,代表發送失敗 40 logger.error("send message faild!",e); 41 return false; 42 } 43 return true; 44 } 45 }
然后在App類的main方法中調用sendMessage
1 public class App { 2 public static void main( String[] args ){ 3 MessageSender sender = new MessageSender(); 4 sender.sendMessage("hello RabbitMQ!"); 5 } 6 }
打印日志如下
打開RabbitMQ的控制台,可以看到消息已經進到了RabbitMQ中
點進去,用控制台自帶的getMessage功能,可以看到消息已經成功由RabbitMQ管理了
至此,MessageSender已經寫好了,在該類的31和33行,我們分別調用了隊列聲明和消息發送
channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
queueDeclare,有很多參數,我們可以看一下他的源碼,注釋上有詳細的解釋,我簡單翻譯了一下
1 /** 2 * Declare a queue 聲明一個隊列 3 * @see com.rabbitmq.client.AMQP.Queue.Declare 4 * @see com.rabbitmq.client.AMQP.Queue.DeclareOk 5 * @param queue the name of the queue隊列的名字 6 * @param durable true if we are declaring a durable queue (the queue will survive a server restart)是否持久化,為true則在rabbitMQ重啟后生存 7 * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)是否是排他性隊列(別人看不到),只對當前連接有效,當前連接斷開后,隊列刪除(設置了持久化也刪除) 8 * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)自動刪除,在最后一個連接斷開后刪除隊列 9 * @param arguments other properties (construction arguments) for the queue 其他參數 10 * @return a declaration-confirm method to indicate the queue was successfully declared 11 * @throws java.io.IOException if an error is encountered 12 */ 13 Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, 14 Map<String, Object> arguments) throws IOException;
前面4個都非常好理解,最后一個“其他參數”,到底是什么其他參數,這個東西真的很難找,用到再解釋吧,官方文檔如下
- TTL Time To Live 存活時間
- Dead Lettering 遺言,當消息死亡時,做些什么
- Length Limit 長度限制
- Priority Queues 優先級
basicPublish的翻譯如下
1 /** 2 * Publish a message.發送一條消息 3 * 4 * Publishing to a non-existent exchange will result in a channel-level 5 * protocol exception, which closes the channel. 6 * 7 * Invocations of <code>Channel#basicPublish</code> will eventually block if a 8 * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect. 9 * 10 * @see com.rabbitmq.client.AMQP.Basic.Publish 11 * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a> 12 * @param exchange the exchange to publish the message to 交換模式,會在后面講,官方文檔在這里 13 * @param routingKey the routing key 控制消息發送到哪個隊列 14 * @param props other properties for the message - routing headers etc 其他參數 15 * @param body the message body 消息,byte數組 16 * @throws java.io.IOException if an error is encountered 17 */ 18 void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
這里又有個其他參數,它的類型是這樣的,設置消息的一些詳細屬性
5.編寫MessageConsumer
為了和Sender區分開,新建一個Maven項目MessageConsumer
1 package com.liyang.ticktock.rabbitmq; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 9 import com.rabbitmq.client.AMQP; 10 import com.rabbitmq.client.Channel; 11 import com.rabbitmq.client.Connection; 12 import com.rabbitmq.client.ConnectionFactory; 13 import com.rabbitmq.client.Consumer; 14 import com.rabbitmq.client.DefaultConsumer; 15 import com.rabbitmq.client.Envelope; 16 17 public class MessageConsumer { 18 19 private Logger logger = LoggerFactory.getLogger(MessageConsumer.class); 20 21 public boolean consume(String queueName){ 22 //連接RabbitMQ 23 ConnectionFactory factory = new ConnectionFactory(); 24 factory.setHost("127.0.0.1"); 25 Connection connection = null; 26 Channel channel = null; 27 try { 28 connection = factory.newConnection(); 29 channel = connection.createChannel(); 30 //這里聲明queue是為了取消息的時候,queue肯定會存在 31 //注意,queueDeclare是冪等的,也就是說,消費者和生產者,不論誰先聲明,都只會有一個queue 32 channel.queueDeclare(queueName, false, false, false, null); 33 34 //這里重寫了DefaultConsumer的handleDelivery方法,因為發送的時候對消息進行了getByte(),在這里要重新組裝成String 35 Consumer consumer = new DefaultConsumer(channel){ 36 @Override 37 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 38 throws IOException { 39 String message = new String(body, "UTF-8"); 40 logger.info("Received '" + message + "'"); 41 } 42 }; 43 //上面是聲明消費者,這里用聲明的消費者消費掉隊列中的消息 44 channel.basicConsume(queueName, true, consumer); 45 46 //這里不能關閉連接,調用了消費方法后,消費者會一直連接着rabbitMQ等待消費 47 48 } catch (IOException | TimeoutException e) { 49 //失敗后記錄日志,返回false,代表消費失敗 50 logger.error("send message faild!",e); 51 return false; 52 } 53 54 55 return true; 56 } 57 }
然后在App的main方法中調用Cunsumer進行消費
1 public class App 2 { 3 //這個隊列名字要和生產者中的名字一樣,否則找不到隊列 4 private final static String QUEUE_NAME = "hello"; 5 6 public static void main( String[] args ) 7 { 8 MessageConsumer consumer = new MessageConsumer(); 9 consumer.consume(QUEUE_NAME); 10 } 11 }
結果如下,消費者一直在等待消息,每次有消息進來,就會立刻消費掉
6.多個消費者同時消費一個隊列
改造一下Consumer
在App中new多個消費者
改造Sender,使它不停的往RabbitMQ中發送消息
啟動Sender
啟動Consumer,發現消息很平均的發給四個客戶端,一人一個,誰也不插隊
如果我們把速度加快呢?把Sender的休息時間去掉,發現消費開始變得沒有規律了,其實呢,它還是有規律的,這個是RabbitMQ的特性,稱作“Round-robin dispatching”,消息會平均的發送給每一個消費者,可以看第一第二行,消息分別是56981和56985,相應的82、82、84都被分給了其他線程,只是在當前線程的時間片內,可以處理這么多任務,所以就一次打印出來了
7.結束語
這一章介紹了從安裝到用JAVA語言編寫生產者與消費者,在這里只是簡單的消費消息並打印日志,如果一個消息需要處理的時間很長,而處理的過程中,這個消費者掛掉了,那消息會不會丟失呢?答案是肯定的,而且已經分配給這個消費者,但還沒來得及處理的消息也會一並丟失掉,這個問題,RabbitMQ早就考慮到了,並且提供了解決方案,下一篇博文將進行詳細介紹