前言
本篇博客已被收錄GitHub:https://zhouwenxing.github.io/
文中所涉及的源碼也已被收錄GitHub:https://github.com/zhouwenxing/lonely-wolf-note (message-queue模塊)
使用消息隊列必須要保證生產者發送的消息能被消費者所接收,那么生產者如何接收消息呢?下圖是 RabbitMQ
的工作模型:
上圖中生產者會將消息發送到交換機 Exchange
上,再由 Exchange
發送給不同的 Queue
,而 Queue
是用來存儲消息隊列,那么假如有多個生產者,那么消息發送到交換機 Exchange
之后,應該如何和 Queue
之間建立綁定關系呢?
如何使用 RabbitMQ 發送消息
RabbitMQ
中提供了3種發送消息的路由方式。
直連 Direct 模式
通過指定一個精確的綁定鍵來實現 Exchange
(交換機) 和 Queue
(消息隊列) 之間的綁定,也就是說,當創建了一個直連類型的交換機時,生產者在發送消息時攜帶的路由鍵(routing key),必須與某個綁定鍵(binding key)完全匹配時,這條消息才會從交換機路由到滿足路由關系消息隊列上,然后消費者根據各自監聽的隊列就可以獲取到消息(如下圖所示,Queue1
綁定了 order
,那么這時候發送消息的路由鍵必須為 order
才能分配到 Queue1
上):
主題 Topic 模式
Direct
模式會存在一定的局限性,有時候我們需要按類型划分,比如訂單類路由到一個隊列,產品類路由到另一個隊列,所以在 RabbitMQ 中,提供了主題模式來實現模糊匹配。使用主題類型連接方式支持兩種通配符:
直連方式只能精確匹配,有時候我們需要實現模糊匹配,那么這時候就需要主題類型的連接方式,在 RabbitMQ
中,使用主題類型連接方式支持兩種通配符:
- #:表示
0
個或者多個單詞 - *:表示
1
個單詞
PS:使用通配符時,單詞指的是用英文符號的小數點 .
隔開的字符,如:abc.def
就表示有 abc
和 def
兩個單詞。
下圖所示中,因為 Queue1
綁定了 order.#
,所以當發送消息的路由鍵為 order
或者 order.xxx
時都可以使得消息分配到 Queue1
上:
廣播 Fanout 模式
當我們定義了一個廣播類型的交換機時就不需要指定綁定鍵,而且生產者發送消息到交換機上時,也不需要攜帶路由鍵,此時當消息到達交換機時,所有與其綁定的隊列都會收到消息,這種模式的消息發送適用於消息通知類需求。
如下如所示,Queue1
,Queue2
,Queue3
三個隊列都綁定到了一個 Fanout
交換機上,那么當 Fanout Exchange
收到消息時,會同時將消息發送給三個隊列:
在 RabbitMQ
提供的后台管理系統中也能查詢到創建的交換機和隊列等信息,並且可以通過管理后台直接創建隊列和交換機:
消息發送實戰
下面通過一個 SpringBoot
例子來體會一下三種發送消息的方式。
- 1、
application.yml
文件中添加如下配置:
spring:
rabbitmq:
host: ip
port: 5672
username: admin
password: 123456
- 2、新增一個
RabbitConfig
配置類(為了節省篇幅省略了包名和導入 ),此類中聲明了三個交換機和三個隊列,並分別進行綁定:
@Configuration
public class RabbitConfig {
//直連交換機
@Bean("directExchange")
public DirectExchange directExchange(){
return new DirectExchange("LONGLY_WOLF_DIRECT_EXCHANGE");
}
//主題交換機
@Bean("topicExchange")
public TopicExchange topicExchange(){
return new TopicExchange("LONGLY_WOLF_TOPIC_EXCHANGE");
}
//廣播交換機
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange(){
return new FanoutExchange("LONGLY_WOLF_FANOUT_EXCHANGE");
}
@Bean("orderQueue")
public Queue orderQueue(){
return new Queue("LONGLY_WOLF_ORDER_QUEUE");
}
@Bean("userQueue")
public Queue userQueue(){
return new Queue("LONGLY_WOLF_USER_QUEUE");
}
@Bean("productQueue")
public Queue productQueue(){
return new Queue("LONGLY_WOLF_PRODUCT_QUEUE");
}
//Direct交換機和orderQueue綁定,綁定鍵為:order.detail
@Bean
public Binding bindDirectExchange(@Qualifier("orderQueue") Queue queue, @Qualifier("directExchange") DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("order.detail");
}
//Topic交換機和userQueue綁定,綁定鍵為:user.#
@Bean
public Binding bindTopicExchange(@Qualifier("userQueue") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("user.#");
}
//Fanout交換機和productQueue綁定
@Bean
public Binding bindFanoutExchange(@Qualifier("productQueue") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
}
- 3、新建一個消費者
ExchangeConsumer
類,不同的方法實現分別監聽不同的隊列:
@Component
public class ExchangeConsumer {
/**
* 監聽綁定了direct交換機的的消息隊列
*/
@RabbitHandler
@RabbitListener(queues = "LONGLY_WOLF_ORDER_QUEUE")
public void directConsumer(String msg){
System.out.println("direct交換機收到消息:" + msg);
}
/**
* 監聽綁定了topic交換機的的消息隊列
*/
@RabbitHandler
@RabbitListener(queues = "LONGLY_WOLF_USER_QUEUE")
public void topicConsumer(String msg){
System.out.println("topic交換機收到消息:" + msg);
}
/**
* 監聽綁定了fanout交換機的的消息隊列
*/
@RabbitHandler
@RabbitListener(queues = "LONGLY_WOLF_PRODUCT_QUEUE")
public void fanoutConsumer(String msg){
System.out.println("fanout交換機收到消息:" + msg);
}
}
- 4、新增一個
RabbitExchangeController
類來作為生產者,進行消息發送:
@RestController
@RequestMapping("/exchange")
public class RabbitExchangeController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping(value="/send/direct")
public String sendDirect(String routingKey,@RequestParam(value = "msg",defaultValue = "no direct message") String msg){
rabbitTemplate.convertAndSend("LONGLY_WOLF_DIRECT_EXCHANGE",routingKey,msg);
return "succ";
}
@GetMapping(value="/send/topic")
public String sendTopic(String routingKey,@RequestParam(value = "msg",defaultValue = "no topic message") String msg){
rabbitTemplate.convertAndSend("LONGLY_WOLF_TOPIC_EXCHANGE",routingKey,msg);
return "succ";
}
@GetMapping(value="/send/fanout")
public String sendFaout(String routingKey,@RequestParam(value = "msg",defaultValue = "no faout message") String msg){
rabbitTemplate.convertAndSend("LONGLY_WOLF_FANOUT_EXCHANGE",routingKey,msg);
return "succ";
}
}
- 5、啟動服務,當我們調用第一個接口時候,路由鍵和綁定鍵
order.detail
精確匹配時,directConsumer
就會收到消息,同樣的,調用第二接口時,路由鍵滿足user.#
時,topicConsumer
就會收到消息,而只要調用第三個接口,不論是否指定路由鍵,fanoutConsumer
都會收到消息。
消息過期了怎么辦
簡單的發送消息我們學會了,難道這就能讓我們就此止步了嗎?顯然是不能的,要玩就要玩高級點,所以接下來讓我們給消息加點佐料。
TTL(Time-To-Live)
TTL
即 一條消息在隊列中的最大存活時間。在一條在隊列中超過配置的 TTL
的消息稱為已死消息。但是需要注意的是,已死消息並不能保證會立即從隊列中刪除,但是能保證已死的消息不會被投遞出去。
設置 TTL
的方式有兩種:
-
1、給隊列設置
x-message-ttl
,此時所有被投遞到隊列中的消息,都會在到達TTL
時成為已死消息。這種情況就會出現當一條消息同時路由到
N
個帶有TTL
時間的隊列,而由於每個隊列的TTL
不一定相同,所以同一條消息在不同的隊列中可能會在不同時間死亡或者不會死亡(未設置TTL
),所以一個隊列中的消息死亡不會影響到其他隊列中的消息。 -
2、單獨給某一條消息設置過期時間。
此時需要注意的時,當消息達到
TTL
時,可能不會馬上被丟棄,因為只有處於隊列頭部消息過期后才會被丟棄,假如隊列頭部的消息沒有設置TTL
,而第2
條消息設置了TTL
,那么即使第2
條消息成為了已死消息,也必須要等到隊列頭部的消息被消費之后才會被丟棄,而已死消息在被丟棄之前也會被計入統計數據(比如隊列中的消息總數)。所以為了更好的利用TTL
特性,建議讓消費者在線消費消息,這樣才能確保消息更快的被丟棄,防止消息堆積。
PS:消息過期和消費者傳遞之間可能存在自然的競爭條件。例如,消息可能在發送途中(未到達消費者)過期。
隊列的生存
和 TTL
針對消息不同的是,我們可以通過設置過期時間屬性 `x-expires`` 來處理隊列,當在指定過期時間內內未使用隊列時,服務器保證將刪除隊列(但是無法保證在過期時間過后隊列將以多快的速度被刪除)。
TTL 和過期時間實戰
- 1、在上面定義的
RabbitConfig
類中,再新增一個TTL
隊列並將其綁定到direct
交換機上:
@Bean("ttlQueue")
public Queue ttlQueue(){
Map<String, Object> map = new HashMap<String, Object>();
map.put("x-message-ttl", 5000);//隊列中所有消息5秒后過期
map.put("x-expires", 100000);//隊列閑置10秒后被刪除
//參數1-name:隊列名稱
//參數2-durable:是否持久化
//參數3-exclusive:是否排他。設置為true時,則該隊列只對聲明當前隊列的連接(Connection)可用,一旦連接斷開,隊列自動被刪除
//參數4-autoDelete:是否自動刪除。前提是必須要至少有一個消費者先連上當前隊列,然后當所有消費者都斷開連接之后,隊列自動被刪除
return new Queue("LONGLY_WOLF_TTL_QUEUE",false,false,false,map);
}
//ttl隊列綁定到direct交換機(交換機和隊列可以多對多)
@Bean
public Binding ttlBindFanoutExchange(@Qualifier("ttlQueue") Queue queue, @Qualifier("directExchange") DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("test.ttl");
}
- 2、在
ExchangeConsumer
消費者類上監聽TTL
隊列(和其他消費者不同的時候,這里為了打印出隊列屬性,改成了通過Message
對象來接收消息 ):
/**
* 監聽ttl消息隊列
*/
@RabbitHandler
@RabbitListener(queues = "LONGLY_WOLF_TTL_QUEUE")
public void ttlConsumer(Message message){
System.out.println("ttl隊列收到消息:" + new String(message.getBody()));
System.out.println("ttl隊列收到消息:" + JSONObject.toJSONString(message.getMessageProperties()));
}
- 3、在生產者類
RabbitExchangeController
上新增一個接口用來測試發送過期消息,這里通過MessageProperties
設置的expiration
屬性就相當於是給單條消息設置了一個TTL
:
@GetMapping(value="/send/ttl")
public String sendTtl(String routingKey,@RequestParam(value = "msg",defaultValue = "no ttl message") String msg){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("5000");//5秒后被刪除,即TTL屬性(針對單條消息)
Message message = new Message(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("LONGLY_WOLF_DIRECT_EXCHANGE",routingKey,message);
return "succ";
}
- 4、此時如果我們把消費者的監聽去掉之后再發送消息,在管理后台就可以看到
5
秒之后消息會被刪除,10
秒之后隊列會被刪除。
PS:如果同時給隊列和單條消息都設置了 TTL
,則會以時間短的為主。
其他屬性
隊列中還有其他一些屬性可以設置,在這里我們就不一一舉例了:
- x-message-ttl:隊列中消息的存活時間(毫秒),達到TTL的消息可能會被刪除。
- x-expires:隊列在多長時間(毫秒)沒有被訪問以后會被刪除。
- x-max-length:隊列中的最大消息數。
- x-max-length-bytes:隊列的最大容量(bytes)。
- overflow:隊列溢出之后的策略。主要可以配置如下參數:
reject-publish
- 直接丟棄最近發布的消息,如若啟用了publisher confirm
(發布者確認),發布者將通過發送basic.nack
消息通知拒絕,如果當前隊列綁定有多個消費者,則消息在收到basic.nack
拒絕通知后,仍然會被發布到其他隊列;drop-head
- 丟棄隊列頭部消息(集群模式下只支持這種策略)reject-publish-dlx
- 最近發布的消息會進入死信隊列。 - x-dead-letter-exchange:隊列的死信交換機。
- x-dead-letter-routing-key:死信交換機的路由鍵。
- x-single-active-consumer:true/false。表示是否最多只允許一個消費者消費,如果有多個消費者同時綁定,則只會激活第一個,除非第一個消費者被取消或者死亡,才會自動轉到下一個消費者。
- x-max-priority:隊列中消息的最大優先級, 消息的優先級不能超過它。
- x-queue-mode:
3.6.0
版本引入的,主要是為了實現惰性加載。隊列將收到的消息盡可能快的進行持久化操作到磁盤上,然后只有在用戶請求的時候才會加載到RAM
內存。這個參數支持兩個值:default
和lazy
。當不進行設置的時候,就是默認為default
,不做任何改變;當設置為lazy
就會進行懶加載。 - x-queue-master-locator:為了保證消息的
FIFO
,所以在高可用集群模式下需要選擇一個節點作為主節點。這個參數主要有三種模式:min-masters
- 托管最小數量的綁定主機的節點;client-local
- 選擇聲明的隊列已經連接到客戶端的節點;random
- 隨機選擇一個節點。
神奇的死信隊列(Dead Letter)
上面的參數介紹中,提到了死信隊列,這又是什么新鮮的東西呢?其實從名字上來看很好理解,就是指的已死的消息,或者說無家可歸的消息。一個消息進入死信隊列,主要有以下三種條件:
-
1、消息被消費者拒絕並且未設置重回隊列。
-
2、消息過期(即設置了
TTL
)。 -
3、隊列達到最大長度,超過了
Max length
或Max length bytes
,則隊列頭部的消息會被發送到死信隊列。
死信隊列實戰
- 1、在上面定義的
RabbitConfig
類中,定義一個死信交換機,並將之前的ttl
隊列新增一個屬性x-dead-letter-exchange
,最后再將死信隊列和死信交換機進行綁定:
//直連死信交換機(也可以用topic或者fanout類型交換機)
@Bean("deatLetterExchange")
public DirectExchange deatLetterExchange(){
return new DirectExchange("LONGLY_WOLF_DEAD_LETTER_DIRECT_EXCHANGE");
}
@Bean("ttlQueue")
public Queue ttlQueue(){
Map<String, Object> map = new HashMap<String, Object>();
map.put("x-message-ttl", 5000);//隊列中所有消息5秒后過期
map.put("x-dead-letter-exchange", "LONGLY_WOLF_DEAD_LETTER_DIRECT_EXCHANGE");//已死消息會進入死信交換機
return new Queue("LONGLY_WOLF_TTL_QUEUE",false,false,false,map);
}
//死信隊列
@Bean("deadLetterQueue")
public Queue deadLetterQueue(){
return new Queue("LONGLY_WOLF_DEAD_LETTER_QUEUE");
}
- 2、在
ExchangeConsumer
消費者類上將監聽TTL
隊列的監聽取消,注釋掉監聽:
/**
* 監聽ttl消息隊列
*/
@RabbitHandler
// @RabbitListener(queues = "LONGLY_WOLF_TTL_QUEUE")
public void ttlConsumer(Message message){
System.out.println("ttl隊列收到消息:" + new String(message.getBody()));
System.out.println("ttl隊列收到消息:" + JSONObject.toJSONString(message.getMessageProperties()));
}
- 3、此時
TTL
隊列無消費者,並且設置了消息的TTL
為5
秒,所以5
秒之后就會進入死信隊列。 - 5、訪問接口:
http://localhost:8080/exchange/send/ttl?routingKey=test&msg=測試死信隊列
,發送消息之后,等待5
秒就查看消息,進入死信隊列:
消息真的發送成功了嗎
了解了消息的基本發送功能之后,就可以高枕無憂了嗎?消息發出去之后,消費者真的收到消息了嗎?消息發送之后如何知道消息發送成功了?假如發送消息路由錯了導致無法路由到隊列怎么辦?大家是不是都有這些疑問呢?別着急,接下來就讓我們來一一來分析一下。
一條消息從生產者開始發送消息到消費者消費完消息主要可以分為以下 4
個階段:
- 1、生產者將消息發送到
Broker
(即:RabbitMQ
的交換機)。 - 2、交換機將消息路由到隊列。
- 3、隊列收到消息后存儲消息。
- 4、消費者從隊列獲取消息進行消費。
接下來我們就從這 4
個步驟上來逐步分析 RabbitMQ
如何保證消息發送的可靠性。
消息真的到達交換機了嗎
當我們發送一條消息之后,如何知道對方收到消息了?這就和我們寫信一樣,寫一封信出去,如何知道對方收到我們寄出去的信?最簡單的方式就是對方也給我們回一封信,我們收到對方的回信之后就可以知道自己的信已經成功寄達。
在 RabbitMQ
中服務端也提供了 2
種方式來告訴客戶端(生產者)是否收到消息:Transaction
(事務)模式和 Confirm
(確認)模式。
Transaction(事務) 模式
在 Java API
編程中開啟事務只需要增加以下代碼即可:
try {
channel.txSelect();//開啟事務
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.txCommit();//提交事務
}catch (Exception e){
channel.txRollback();//消息回滾
}
在 Spring Boot
中需要對 RabbitTemplate
進行事務設置:
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setChannelTransacted(true);//開啟事務
return rabbitTemplate;
}
為了了解 RabbitMQ
當中事務機制的原理,我們在 Wireshark
中輸入 ip.addr==192.168.1.1
對本地 ip
進行抓包,發送一條消息之后,抓到如下數據包:
通過數據包,可以得出開啟事務之后,除了原本的發送消息之外,多出了開啟事務和事務提交的通信:
開啟事務之后,有一個致命的缺點就是發送消息流程會被阻塞。也就是說必須一條消息發送成功之后,才會允許發送另一條消息。正因為事務模式有這個缺點,所以一般情況下並不建議在生產環境開啟事務,那么有沒有更好的方式來實現消息的送達確認呢?那么就讓我們再看看Confirm
(確認)模式。
Confirm(確認)模式
消息確認模式又可以分為三種(事務模式和確認模式無法同時開啟):
- 單條確認模式:發送一條消息,確認一條消息。此種確認模式的效率也不高。
- 批量確認模式:發送一批消息,然后同時確認。批量發送有一個缺點就是同一批消息一旦有一條消息發送失敗,就會收到失敗的通知,需要將這一批消息全部重發。
- 異步確認模式:一邊發送一邊確認,消息可能被單條確認也可能會被批量確認。
Java API 實現確認模式
- 單條消息確認模式
channel.confirmSelect();//開啟確認模式
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
if (channel.waitForConfirms()){//wait.ForConfirms(long time)方法可以指定等待時間
System.out.println("消息確認發送成功");
}
- 批量確認模式
channel.confirmSelect();//開啟確認模式
//批量發送
for (int i=0;i<10;i++){
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}
try{
channel.waitForConfirmsOrDie();
}catch (IOException e){//只要有1條消息未被確認,就會拋出異常
System.out.println("有消息發送失敗了");
}
- 異步確認模式
channel.addConfirmListener(new ConfirmListener() {
/**
* 已確認消息,即發送成功后回調
* @param deliveryTag -唯一標識id(即發送消息時獲取到的nextPublishSeqNo)
* @param multiple - 是否批量確認,當multiple=true,表示<=deliveryTag的消息被批量確認,multiple=false,表示只確認了單條
*/
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {//成功回調
System.out.println("收到確認消息了");
//TODO 可以做一些想做的事
}
/**
* 發送失敗消息后回調
* @param deliveryTag -唯一標識id(即發送消息時獲取到的nextPublishSeqNo)
* @param multiple - 是否批量確認,當multiple=true,表示<=deliveryTag的消息被批量確認,multiple=false,表示只確認了單條
*/
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {//失敗回調
if (multiple) {//批量確認,<deliveryTag的消息都發送失敗
//TODO 消息重發?
} else {//非批量,=deliveryTag的消息發送失敗
//TODO 消息重發?
}
}
});
channel.confirmSelect();//開啟確認模式
for (int i=0;i<10;i++){//批量發送
long nextSeqNo = channel.getNextPublishSeqNo();//獲取發送消息的唯一標識(從1開始遞增)
//TODO 可以考慮把消息id存起來
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}
SpringBoot 實現確認模式
通過配置文件 spring.rabbitmq.publisher-confirm-type
參數進行配置確認(舊版本是 spring.rabbitmq.publisher-confirms
參數)。
- 1、新增配置文件屬性配置
spring:
rabbitmq:
publisher-confirm-type: correlated # none-表示禁用回調(默認) simple- 參考RabbitExchangeController#sendWithSimpleConfirm()方法
- 2、
RabbitConfig
配置文件中修改如下:
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
// rabbitTemplate.setChannelTransacted(true);//開啟事務
//消息是否成功發送到Exchange
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack){//消息發送失敗
System.out.println("消息發送失敗,原因為:" + cause);
return;
}
//消息發送成功
System.out.println("消息發送成功");
}
});
return rabbitTemplate;
}
這樣當我們發送消息成功之后,就會收到回調。
- 3、當上面的參數配置修改為
simple
,則需要在發送消息的時候使用invoke
調用waitForConfirms
或者waitForConfirmsOrDie
方法來確認是否發送成功:
@GetMapping(value="/send/confirm")
public String sendWithSimpleConfirm(String routingKey,@RequestParam(value = "msg",defaultValue = "no direct message") String msg){
//使用waitForConfirms方法確認
boolean sendFlag = rabbitTemplate.invoke(operations -> {
rabbitTemplate.convertAndSend(
"LONGLY_WOLF_DIRECT_EXCHANGE",
"routingKey",
msg
);
return rabbitTemplate.waitForConfirms(5000);
});
//也可以使用waitForConfirmsOrDie方法確認
boolean sendFlag2 = rabbitTemplate.invoke(operations -> {
rabbitTemplate.convertAndSend(
"LONGLY_WOLF_DIRECT_EXCHANGE",
"routingKey",
msg
);
try {
rabbitTemplate.waitForConfirmsOrDie(5000);
}catch (Exception e){
return false;
}
return true;
});
System.out.println(sendFlag);
System.out.println(sendFlag2);
return "succ";
}
消息無法從交換機路由到正確的隊列怎么辦
上面通過事務或者確認機制確保了消息成功發送到交換機,那么接下來交換機會負責將消息路由到隊列,這時候假如隊列不存在或者路由錯誤就會導致消息路由失敗,這又該如何保證呢?
同樣的,RabbitMQ
中也提供了 2
種方式來確保消息可以正確路由到隊列:開啟監聽模式或者通過新增備份交換機模式來備份數據。
監聽回調
上面介紹的是消息是否發送到交換機的回調,而從交換機路由到隊列,同樣可以開啟確認模式。
Java API 方式開啟監聽模式
下面就是開啟監聽主要代碼,為了節省篇幅,省略了其余不相干代碼(完成代碼已上傳至 GitHub
)
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("收到未路由到隊列的回調消息:" + new String(body));
}
});
//注意這里的第三個參數,mandatory需要設置為true(發送一個錯誤的路由,即可收到回調)
channel.basicPublish(EXCHANGE_NAME,"ERROR_ROUTING_KEY",true,null,msg.getBytes());
Spring Boot 開啟監聽模式
在 RabitConfig
類中添加如下配置:
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMandatory(true);//開啟監聽回調
//消息是否成功被路由到隊列,沒有路由到隊列時會收到回調(原setReturnCallback在2.0版本已過期)
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("收到未路由到隊列的回調消息:" + new String(returnedMessage.getMessage().getBody()));
}
});
return rabbitTemplate;
}
備份交換機
除了開啟監聽的方式,還可以通過定義備份交換機的方式來實現,當原交換機無法正確路由到隊列時,則會進入備份交換機,再由備份交換機路由到正確隊列(要注意區分備份交換機和死信交換機的區別)。
Java API 實現備份交換機
下面就是一個實現備份交換機的例子,因為這里備份交換機定義的是 Topic
類型,所有路由必須滿足定義好的路由,實際使用中一般會設置會 Fanout
,因為無法預測錯誤的路由到底是多少:
//聲明交換機且指定備份交換機
Map<String,Object> argMap = new HashMap<String,Object>();
argMap.put("alternate-exchange","TEST_ALTERNATE_EXCHANGE");
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,false,false,argMap);
//隊列和交換機進行綁定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTEING_KEY);
//聲明備份交換機和備份隊列,並綁定(為了防止收不到消息,備份交換機一般建議設置為Fanout類型)
channel.queueDeclare("BAK_QUEUE", false, false, false, null);
channel.exchangeDeclare("TEST_ALTERNATE_EXCHANGE", BuiltinExchangeType.TOPIC);
channel.queueBind("BAK_QUEUE","TEST_ALTERNATE_EXCHANGE","ERROR.#");
String msg = "I'm a bak exchange msg";
channel.basicPublish(EXCHANGE_NAME,"ERROR.ROUTING_KEY",null,msg.getBytes());
Spring Boot 實現備份交換機
Spring Boot
實現備份交換機原理和 Java API
實現相同:
- 1、首先在
RabbiConfig
中新增兩個交換機,一個是原始交換機,一個是備份交換機,同時新增一個備份隊列和備份交換機進行綁定,這里的備份交換機是一個Fanout
類型,注意因為這里主要是演示備份交換機,所以這里的原始交換機沒有和任何隊列綁定,也就無法路由到隊列,從而使得消息進入備份交換機:
//用於測試備份交換機的原直連交換機
@Bean("bakDirectEchange")
public DirectExchange bakDirectEchange(){
Map argMap = new HashMap<>();
argMap.put("alternate-exchange", "LONGLY_WOLF_BAK_FANOUT_EXCHANGE");
return new DirectExchange("LONGLY_WOLF_BAK_ORIGIN_DIRECT_EXCHANGE",false,false,argMap);
}
//備份廣播交換機
@Bean("bakFanoutExchange")
public FanoutExchange bakFanoutExchange(){
return new FanoutExchange("LONGLY_WOLF_BAK_FANOUT_EXCHANGE");
}
//備份隊列
@Bean("bakQueue")
public Queue bakQueue(){
return new Queue("LONELY_WOLF_BAK_QUEUE");
}
//備份交換機和備份隊列進行綁定
@Bean
public Binding BindExchange(@Qualifier("bakQueue") Queue queue, @Qualifier("bakFanoutExchange") FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
2、在消費者類 ExchangeConsumer
中監聽備份隊列:
/**
* 監聽備份消息隊列
*/
@RabbitHandler
@RabbitListener(queues = "LONELY_WOLF_BAK_QUEUE")
public void bakQueueConsumer(Message message){
System.out.println("備份隊列收到消息:" + new String(message.getBody()));
}
- 3、最后在生產者類
RabbitExchangeController
中新增一個消息發送的方法進行消息發送:
@GetMapping(value="/send/bak")
public String sendBak(String routingKey,@RequestParam(value = "msg",defaultValue = "no bak message") String msg){
rabbitTemplate.convertAndSend("LONGLY_WOLF_BAK_ORIGIN_DIRECT_EXCHANGE",routingKey,msg);
return "succ";
}
調用之后可以看到,備份隊列會收到消息,從而說明了消息在無法路由到隊列時會進入到備份隊列。
隊列存儲消息后發生異常怎么辦
在保證了前面兩個階段的可靠性之后,消息終於安全抵達了隊列,那么這時候就絕對安全了嗎?
當我們的消費者的消費速度跟不上生產者的生產速度時,就會導致消息堆積在隊列中,而默認消息是沒有持久化的,存在於內存之中,所以假如服務器宕機等故障發生,就會導致隊列中的數據丟失。
這里的解決方案也很簡單,就是將消息進行持久化,在 RabbitMQ
當中,持久化也可以分為 3
種:交換機持久化,隊列持久化和消息持久化。
雖然說持久化能一定程度上保證消息的可靠性,然而當出現了服務器的磁盤損壞,依然可能出現消息丟失,所以為了更加完美,RabbitMQ
集群可能是必須的,當然,本文不會涉及到集群的知識,集群的知識以及搭建會放到下次再來分析。
交換機持久化
聲明交換機時,durable
參數設置為 true
。
隊列持久化
聲明隊列時,durable
參數設置為 true
。
消息持久化
發送消息時可以將消息設置為持久化。
Java API 消息持久化
在 Java API
中,可以通過如下方式設置消息持久化:
//deliveryMode=2表示消息持久化
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).build();
channel.basicPublish("exchangeName","routingKey",properties,msg.getBytes());
Spring Boot 消息持久化
在 Spring Boot
中可以通過如下方式將消息設置為持久化:
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//消息持久化
Message message = new Message(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("exchangeName","routingKey",message);
消費者消費消息失敗了怎么辦
踏遍千山萬水,經過 3
層地獄模式,消息終於被消費者拿到手了,然而悲劇的事情又發生了,消費者消費消息的時候可能因為消費者本身的問題或者其他意外導致了消費者消費消息失敗了,這時候消息還是沒能被正確處理,這時候難道眼睜睜看着最后關頭了束手無策了嗎?
非也,作為一款如此優秀的消息隊列,怎么可能沒考慮到這種場景呢。還記不記得上面我們提到的確認模式,實際上,上面的兩種確認模式都屬於服務端的確認,在 RabbitMQ
中為消費者也提供了確認模式,這就是消費者的確認。
消費者確認(ack)
隊列當中會把消息刪除的前提就是這條消息被消費者消費掉了,但是服務器如何知道消息被消費了呢?這就是需要通過消費者確認之后才會刪除,而我們前面在介紹消息發送的時候貌似並沒有看到消費者確認流程,這是因為消費者默認在收到消息后會給服務器一個應答,服務端收到消費者的應答之后,就會刪除消息。
Java API 實現消費者應答
在 Java API
中應答方式有兩種,自動應答和手動應答,當自動應答時,則只要消費者收到消息就會給服務端確認,不在乎消息是否消費成功。
- 1、新建一個消費者
AckConsumer
類(省略了包名和導入),這里為了實現方便,通過生產者的頭部標記來決定采用何種應答策略:
public class AckConsumer {
private static String QUEUE_NAME = "ACK_QUEUE";
public static void main(String[] args) throws Exception{
//1.聲明連接
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://username:password@ip:port");
//2.建立連接
Connection conn = factory.newConnection();
//3.創建消息通道
Channel channel = conn.createChannel();
//4.聲明隊列(默認交換機AMQP default,Direct)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" 等待接收消息...");
// 創建消費者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("收到消息: " + new String(body, "UTF-8"));
Map<String,Object> map = properties.getHeaders();//獲取頭部消息
String ackType = map.get("ackType").toString();
if (ackType.equals("ack")){//手動應答
channel.basicAck(envelope.getDeliveryTag(),true);
}else if(ackType.equals("reject-single")){//拒絕單條消息
//拒絕消息。requeue參數表示消息是否重新入隊
channel.basicReject(envelope.getDeliveryTag(),false);
// channel.basicNack(envelope.getDeliveryTag(),false,false);
}else if (ackType.equals("reject-multiple")){//拒絕多條消息
//拒絕消息。multiple參數表示是否批量拒絕,為true則表示<deliveryTag的消息都被拒絕
channel.basicNack(envelope.getDeliveryTag(),true,false);
}
}
};
//開始獲取消息,第二個參數 autoAck表示是否開啟自動應答
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
- 2、新建一個生產者
AckProducer
類(省略了包名和導入):
public class AckProducter {
private static String QUEUE_NAME = "ACK_QUEUE";//隊列
private static String EXCHANGE_NAME = "ACK_EXCHANGE";//交換機
private static String ROUTEING_KEY = "test";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://admin:123456@47.107.155.197:5672");
// 建立連接
Connection conn = factory.newConnection();
// 創建消息通道
Channel channel = conn.createChannel();
Map<String, Object> headers = new HashMap<String, Object>(1);
headers.put("ackType", "ack");//請應答
// headers.put("ackType", "reject-single");//請單條拒絕
// headers.put("ackType", "reject-multiple");//請多條拒絕
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentEncoding("UTF-8") // 編碼
.headers(headers) // 自定義屬性
.messageId(String.valueOf(UUID.randomUUID()))
.build();
String msg = "I'm a ack message";
//聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//聲明交換機
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,false);
//隊列和交換機進行綁定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTEING_KEY);
// 發送消息
channel.basicPublish(EXCHANGE_NAME, ROUTEING_KEY, properties, msg.getBytes());
channel.close();
conn.close();
}
}
Spring Boot 實現消費者應答
在 Spring Boot
中消費者給服務端的確認方式分為 3
種:
-
NONE
:自動應答(ack
)。 -
MANUAL
:手動應答(ack
)。如果設置為手動應答,而消費者又遲遲不給服務器應答,那么消息就會一直存在隊列,可能會造成消息堆積和重復消費現象。 -
AUTO
:當沒有拋出異常時會自動應答(ack
)。除此外,當發生異常時,分為以下三種情況:- 1、當拋出
AmqpRejectAndDontRequeueException
異常時,消息會被拒絕,也不會重新入隊。 - 2、當拋出
ImmediateAcknowledgeAmqpException
異常時,消費者會自動發送應答給服務端。 - 3、當拋出其他異常時,消息會被拒絕,且會重新入隊。當出現這種情況且消費者只有一個時,非常容易造成死循環,所以應該極力避免這種情況的發生。
- 1、當拋出
-
1、
Spring Boot
中可以通過參數控制應答類型:
spring:
rabbitmq:
listener:
type: simple # direct類型是2.0之后才有的
simple:
acknowledge-mode: manual
- 2、在消費者類
ExchangeConsumer
中新建一個方法來監聽隊列,其中第一個注釋掉的方法是原本存在的,第二個方法是新增的,主要新增了幾個參數,注意Channel
是com.rabbitmq.client.Channel
包下的:
/**
* 監聽綁定了direct交換機的的消息隊列
*/
// @RabbitHandler
// @RabbitListener(queues = "LONGLY_WOLF_ORDER_QUEUE")
// public void directConsumer(String msg){
// System.out.println("direct交換機收到消息:" + msg);
// }
/**
* 監聽綁定了direct交換機的的消息隊列,並進行手動應答
*/
@RabbitHandler
@RabbitListener(queues = "LONGLY_WOLF_ORDER_QUEUE")
public void manualDirectConsumer(String msg, Channel channel,Message message) throws IOException {
System.out.println("direct交換機收到消息:" + msg + "。此消息需要手動應答");
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//手動應答
}
- 3、或者也可以通過
SimpleMessageListenerContainer
類實現監聽,新建一個RabbitAckConfig
類(省略了包名和導入):
@Configuration
public class RabbitAckConfig {
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("LONGLY_WOLF_ORDER_QUEUE");//設置監聽隊列名
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//手動確認
container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {//消息處理
System.out.println("收到消息:" + new String(message.getBody()) + "。此消息需要手動應答");
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
});
return container;
}
}
PS:需要注意的是,這兩種方式不要同時使用,否則無法保證消息會被哪個監聽到。
僅靠 RabbitMQ 自身可靠性能實現業務需求嗎
上面介紹的兩種確認模式,服務端確認和消費者確認。其中服務端確認是會回調給生產者的,所以生產者可以知道消息是否已經到達服務器且是否正確路由到隊列,然而,對於消費者的確認,生產者是不知道的,這是因為消息隊列的作用之一就是為了實現生產者和消費者的解耦,換言之,消費者知道消息成功發送到隊列,但是無法知道消息是否被消費者消費。
所以為了知道消息是否被成功消費,主要有兩種思路:
- 1、消費者在消費成功之后需要回調生產者提供的API來告知消息已經被消費
- 2、服務端在收到消費者確認后給生產者一個回執通知
然而假如生產者遲遲沒有收到消費者是否消費成功的信息,那么可能就需要補償,比如微信支付等都會有補償機制,間隔一定時間就將消息重發一次。
補償機制同時也會帶來一個問題,假如說消費者消費成功了,但是在告訴生產者的時候失敗了,那么這時候消息如果再次補償就會造成重復消費,所以消費者需要支持冪等(即無論一條消息被消費多少次,都不會改變結果)。當然,同時還有其他場景需要考慮,比如消息之間的依賴性等等問題都需要結合具體業務場景來具體處理。
總結
本文主要講述了 RabbitMQ
的消息發送方式,介紹了 3
種不同交換機的方式,同時最后也從發送消息的主要 4
個步驟分析了每一個步驟如何保證消息的可靠性,並分別通過 Java API
和 Spring Boot
提供了示例,中間還提到了死信隊列,死信隊列本質也是一個隊列,只不過存儲的消息比較特殊,相信通過本文,大家對 RabbitMQ
會有一個更深層次的了解。