人生終將是場單人旅途,孤獨之前是迷茫,孤獨過后是成長。
楔子
本篇是消息隊列RabbitMQ
的第四彈。
RabbitMQ
我已經寫了三篇了,基礎的收發消息和基礎的概念我都已經寫了,學任何東西都是這樣,先基礎的上手能用,然后遇到問題再去解決,無法理解就去深入源碼,隨着時間的積累對這一門技術的理解也會隨之提高。
基礎操作已經熟練后,相信大家不可避免的會生出向那更高處攀登的心來,今天我就羅列一些RabbitMQ
比較高級的用法,有些用得到有些用不上,但是一定要有所了解,因為大部分情況我們都是面向面試學習~
- 如何保證消息的可靠性?
- 消息隊列如何進行限流?
- 如何設置延時隊列進行延時消費?
1. 📖如何保證消息的可靠性?
先來看看我們的萬年老圖,從圖上我們大概可以看出來一個消息會經歷四個節點,只有保證這四個節點的可靠性才能保證整個系統的可靠性。
- 生產者發出后保證到達了MQ。
- MQ收到消息保證分發到了消息對應的Exchange。
- Exchange分發消息入隊之后保證消息的持久性。
- 消費者收到消息之后保證消息的正確消費。
經歷了這四個保證,我們才能保證消息的可靠性,從而保證消息不會丟失。
2. 🔍生產者發送消息到MQ失敗
我們的生產者發送消息之后可能由於網絡閃斷等各種原因導致我們的消息並沒有發送到MQ之中,但是這個時候我們生產端又不知道我們的消息沒有發出去,這就會造成消息的丟失。
為了解決這個問題,RabbitMQ
引入了事務機制和發送方確認機制(publisher confirm),由於事務機制過於耗費性能所以一般不用,這里我着重講述發送方確認機制。
這個機制很好理解,就是消息發送到MQ那端之后,MQ會回一個確認收到的消息給我們。
打開此功能需要配置,接下來我來演示一下配置:
spring:
rabbitmq:
addresses: 127.0.0.1
host: 5672
username: guest
password: guest
virtual-host: /
# 打開消息確認機制
publisher-confirm-type: correlated
我們只需要在配置里面打開消息確認即可(true是返回客戶端,false是自動刪除)。
生產者:
public void sendAndConfirm() {
User user = new User();
log.info("Message content : " + user);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user,correlationData);
log.info("消息發送完畢。");
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("CorrelationData content : " + correlationData);
log.info("Ack status : " + ack);
log.info("Cause content : " + cause);
if(ack){
log.info("消息成功發送,訂單入庫,更改訂單狀態");
}else{
log.info("消息發送失敗:"+correlationData+", 出現異常:"+cause);
}
}
});
}
生產者代碼里我們看到又多了一個參數:CorrelationData
,這個參數是用來做消息的唯一標識,同時我們打開消息確認之后需要對rabbitTemplate
多設置一個setConfirmCallback
,參數是一個匿名類,我們消息確認成功or失敗之后的處理就是寫在這個匿名類里面。
比如一條訂單消息,當消息確認到達MQ確認之后再行入庫或者修改訂單的節點狀態,如果消息沒有成功到達MQ可以進行一次記錄或者將訂單狀態修改。
Tip:消息確認失敗不只有消息沒發過去會觸發,消息發過去但是找不到對應的Exchange,也會觸發。
3. 📔MQ接收失敗或者路由失敗
生產者的發送消息處理好了之后,我們就可以來看看MQ端的處理,MQ可能出現兩個問題:
- 消息找不到對應的Exchange。
- 找到了Exchange但是找不到對應的Queue。
這兩種情況都可以用RabbitMQ
提供的mandatory
參數來解決,它會設置消息投遞失敗的策略,有兩種策略:自動刪除或返回到客戶端。
我們既然要做可靠性,當然是設置為返回到客戶端。
配置:
spring:
rabbitmq:
addresses: 127.0.0.1
host: 5672
username: guest
password: guest
virtual-host: /
# 打開消息確認機制
publisher-confirm-type: correlated
# 打開消息返回
publisher-returns: true
template:
mandatory: true
我們只需要在配置里面打開消息返回即可,template.mandatory: true
這一步不要少~
生產者:
public void sendAndReturn() {
User user = new User();
log.info("Message content : " + user);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("被退回的消息為:{}", message);
log.info("replyCode:{}", replyCode);
log.info("replyText:{}", replyText);
log.info("exchange:{}", exchange);
log.info("routingKey:{}", routingKey);
});
rabbitTemplate.convertAndSend("fail",user);
log.info("消息發送完畢。");
}
這里我們可以拿到被退回消息的所有信息,然后再進行處理,比如放到一個新的隊列單獨處理,路由失敗一般都是配置問題了。
4. 📑消息入隊之后MQ宕機
到這一步基本都是一些很小概率的問題了,比如MQ突然宕機了或者被關閉了,這種問題就必須要對消息做持久化,以便MQ重新啟動之后消息還能重新恢復過來。
消息的持久化要做,但是不能只做消息的持久化,還要做隊列的持久化和Exchange的持久化。
@Bean
public DirectExchange directExchange() {
// 三個構造參數:name durable autoDelete
return new DirectExchange("directExchange", false, false);
}
@Bean
public Queue erduo() {
// 其三個參數:durable exclusive autoDelete
// 一般只設置一下持久化即可
return new Queue("erduo",true);
}
創建Exchange和隊列時只要設置好持久化,發送的消息默認就是持久化消息。
設置持久化時一定要將Exchange和隊列都設置上持久化:
單單只設置Exchange持久化,重啟之后隊列會丟失。單單只設置隊列的持久化,重啟之后Exchange會消失,既而消息也丟失,所以如果不兩個一塊設置持久化將毫無意義。
Tip: 這些都是MQ宕機引起的問題,如果出現服務器宕機或者磁盤損壞則上面的手段統統無效,必須引入鏡像隊列,做異地多活來抵御這種不可抗因素。
5. 📌消費者無法正常消費
最后一步會出問題的地方就在消費者端了,不過這個解決問題的方法我們之前的文章已經說過了,就是消費者的消息確認。
spring:
rabbitmq:
addresses: 127.0.0.1
host: 5672
username: guest
password: guest
virtual-host: /
# 手動確認消息
listener:
simple:
acknowledge-mode: manual
打開手動消息確認之后,只要我們這條消息沒有成功消費,無論中間是出現消費者宕機還是代碼異常,只要連接斷開之后這條信息還沒有被消費那么這條消息就會被重新放入隊列再次被消費。
當然這也可能會出現重復消費的情況,不過在分布式系統中冪等性是一定要做的,所以一般重復消費都會被接口的冪等給攔掉。
所謂冪等性就是:一個操作多次執行產生的結果與一次執行產生的結果一致。
冪等性相關內容不在本章討論范圍~所以我就不多做闡述了。
6. 💡消息可靠性案例
這個圖是我很早之前畫的,是為了記錄當時使用RabbitMQ
做消息可靠性的具體做法,這里我正好拿出來做個例子給大家看一看。
這個例子中的消息是先入庫的,然后生產者從DB里面拿到數據包裝成消息發給MQ,經過消費者消費之后對DB數據的狀態進行更改,然后重新入庫。
這中間有任何步驟失敗,數據的狀態都是沒有更新的,這時通過一個定時任務不停的去刷庫,找到有問題的數據將它重新扔到生產者那里進行重新投遞。
這個方案其實和網上的很多方案大同小異,基礎的可靠性保證之后,定時任務做一個兜底進行不斷的掃描,力圖100%可靠性。
后記
越寫越長,因為篇幅緣故限流和延時隊列放到下一篇了,我會盡快發出來供大家閱讀,講真,我真的不是故意多水一篇的!!!
最后再給優狐打個廣告,最近掘金在GitHub上面建立了一個開源計划 - open-source,旨在收錄各種好玩的好用的開源庫,如果大家有想要自薦或者分享的開源庫都可以參與進去,為這個開源計划做一份貢獻,同時這個開源庫的Start
也在穩步增長中,參與進去也可以增加自己項目的曝光度,一舉兩得。
同時這個開源庫還有一個兄弟項目 - open-source-translation,旨在招募技術文章翻譯志願者進行技術文章的翻譯工作,
爭做最棒開源翻譯,翻譯業界高質量文稿,為技術人的成長獻一份力。
最近這段時間事情挺多,優狐令我八月底之前升級到三級,所以各位讀者的贊對我很重要,希望大家能夠高抬貴手,幫我一哈~
好了,以上就是本期的全部內容,感謝你能看到這里,歡迎對本文點贊收藏與評論,👍你們的每個點贊都是我創作的最大動力。
我是耳朵,一個一直想做知識輸出的偽文藝程序員,我們下期見。