Rocketmq如何保證消息不丟失


1、消息整體處理過程

 
 

這里我們將消息的整體處理階段分為3個階段進行分析:

Producer發送消息階段。

Broker處理消息階段。

Consumer消費消息階段。

Producer發送消息階段

發送消息階段涉及到Producer到broker的網絡通信,因此丟失消息的幾率一定會有,那RocketMQ在此階段用了哪些手段保證消息不丟失了(或者說降低丟失的可能性)。

手段一:提供SYNC的發送消息方式,等待broker處理結果。

RocketMQ提供了3種發送消息方式,分別是:

同步發送:Producer 向 broker 發送消息,阻塞當前線程等待 broker 響應 發送結果。

異步發送:Producer 首先構建一個向 broker 發送消息的任務,把該任務提交給線程池,等執行完該任務時,回調用戶自定義的回調函數,執行處理結果。

Oneway發送:Oneway 方式只負責發送請求,不等待應答,Producer只負責把請求發出去,而不處理響應結果。

我們在調用producer.send方法時,不指定回調方法,則默認采用同步發送消息的方式,這也是丟失幾率最小的一種發送方式。

手段二:發送消息如果失敗或者超時,則重新發送。

發送重試源碼如下,本質其實就是一個for循環,當發送消息發生異常的時候重新循環發送。默認重試3次,重試次數可以通過producer指定。

手段三:broker提供多master模式,即使某台broker宕機了,保證消息可以投遞到另外一台正常的broker上。

如果broker只有一個節點,則broker宕機了,即使producer有重試機制,也沒用,因此利用多主模式,當某台broker宕機了,換一台broker進行投遞。

總結

producer消息發送方式雖然有3種,但為了減小丟失消息的可能性盡量采用同步的發送方式,同步等待發送結果,利用同步發送+重試機制+多個master節點,盡可能減小消息丟失的可能性。

Broker處理消息階段

手段四:提供同步刷盤的策略

public enum FlushDiskType { SYNC_FLUSH, //同步刷盤 ASYNC_FLUSH//異步刷盤(默認) }

我們知道,當消息投遞到broker之后,會先存到page cache,然后根據broker設置的刷盤策略是否立即刷盤,也就是如果刷盤策略為異步,broker並不會等待消息落盤就會返回producer成功,也就是說當broker所在的服務器突然宕機,則會丟失部分頁的消息。

手段五:提供主從模式,同時主從支持同步雙寫

即使broker設置了同步刷盤,如果主broker磁盤損壞,也是會導致消息丟失。 因此可以給broker指定slave,同時設置master為SYNC_MASTER,然后將slave設置為同步刷盤策略。

此模式下,producer每發送一條消息,都會等消息投遞到master和slave都落盤成功了,broker才會當作消息投遞成功,保證休息不丟失。

總結

在broker端,消息丟失的可能性主要在於刷盤策略和同步機制。
RocketMQ默認broker的刷盤策略為異步刷盤,如果有主從,同步策略也默認的是異步同步,這樣子可以提高broker處理消息的效率,但是會有丟失的可能性。因此可以通過同步刷盤策略+同步slave策略+主從的方式解決丟失消息的可能。

Consumer消費消息階段

手段六:consumer默認提供的是At least Once機制

從producer投遞消息到broker,即使前面這些過程保證了消息正常持久化,但如果consumer消費消息沒有消費到也不能理解為消息絕對的可靠。因此RockerMQ默認提供了At least Once機制保證消息可靠消費。

何為At least Once?

Consumer先pull 消息到本地,消費完成后,才向服務器返回ack。

通常消費消息的ack機制一般分為兩種思路:

1、先提交后消費;

2、先消費,消費成功后再提交;

思路一可以解決重復消費的問題但是會丟失消息,因此Rocketmq默認實現的是思路二,由各自consumer業務方保證冪等來解決重復消費問題。

手段七:消費消息重試機制

當消費消息失敗了,如果不提供重試消息的能力,則也不能算完全的可靠消費,因此RocketMQ本身提供了重新消費消息的能力。

總結

consumer端要保證消費消息的可靠性,主要通過At least Once+消費重試機制保證。

2、如何保證消息不被重復消費

回答這個問題,首先你別聽到重復消息這個事兒,就一無所知吧,你先大概說一說可能會有哪些重復消費的問題。

首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能會出現消息重復消費的問題,正常。因為這問題通常不是 MQ 自己保證的,是由我們開發來保證的。挑一個 Kafka 來舉個例子,說說怎么重復消費吧。

Kafka 實際上有個 offset 的概念,就是每個消息寫進去,都有一個 offset,代表消息的序號,然后 consumer 消費了數據之后,每隔一段時間(定時定期),會把自己消費過的消息的 offset 提交一下,表示“我已經消費過了,下次我要是重啟啥的,你就讓我繼續從上次消費到的 offset 來繼續消費吧”。

但是凡事總有意外,比如我們之前生產經常遇到的,就是你有時候重啟系統,看你怎么重啟了,如果碰到點着急的,直接 kill 進程了,再重啟。這會導致 consumer 有些消息處理了,但是沒來得及提交 offset,尷尬了。重啟之后,少數消息會再次消費一次。

有這么個場景。數據 1/2/3 依次進入 kafka,kafka 會給這三條數據每條分配一個 offset,代表這條數據的序號,我們就假設分配的 offset 依次是 152/153/154。消費者從 kafka 去消費的時候,也是按照這個順序去消費。假如當消費者消費了 offset=153 的這條數據,剛准備去提交 offset 到 zookeeper,此時消費者進程被重啟了。那么此時消費過的數據 1/2 的 offset 並沒有提交,kafka 也就不知道你已經消費了 offset=153 這條數據。那么重啟之后,消費者會找 kafka 說,嘿,哥兒們,你給我接着把上次我消費到的那個地方后面的數據繼續給我傳遞過來。由於之前的 offset 沒有提交成功,那么數據 1/2 會再次傳過來,如果此時消費者沒有去重的話,那么就會導致重復消費。

 
 

如果消費者干的事兒是拿一條數據就往數據庫里寫一條,會導致說,你可能就把數據 1/2 在數據庫里插入了 2 次,那么數據就錯啦。

其實重復消費不可怕,可怕的是你沒考慮到重復消費之后,怎么保證冪等性

舉個例子吧。假設你有個系統,消費一條消息就往數據庫里插入一條數據,要是你一個消息重復兩次,你不就插入了兩條,這數據不就錯了?但是你要是消費到第二次的時候,自己判斷一下是否已經消費過了,若是就直接扔了,這樣不就保留了一條數據,從而保證了數據的正確性。

一條數據重復出現兩次,數據庫里就只有一條數據,這就保證了系統的冪等性。

冪等性,通俗點說,就一個數據,或者一個請求,給你重復來多次,你得確保對應的數據是不會改變的,不能出錯。

所以第二個問題來了,怎么保證消息隊列消費的冪等性?

其實還是得結合業務來思考,我這里給幾個思路:

比如你拿個數據要寫庫,你先根據主鍵查一下,如果這數據都有了,你就別插入了,update 一下好吧。

比如你是寫 Redis,那沒問題了,反正每次都是 set,天然冪等性。

比如你不是上面兩個場景,那做的稍微復雜一點,你需要讓生產者發送每條數據的時候,里面加一個全局唯一的 id,類似訂單 id 之類的東西,然后你這里消費到了之后,先根據這個 id 去比如 Redis 里查一下,之前消費過嗎?如果沒有消費過,你就處理,然后這個 id 寫 Redis。如果消費過了,那你就別處理了,保證別重復處理相同的消息即可。

比如基於數據庫的唯一鍵來保證重復數據不會重復插入多條。因為有唯一鍵約束了,重復數據插入只會報錯,不會導致數據庫中出現臟數據。

 
 

作者:我心如初007
鏈接:https://juejin.im/post/5f1a70d86fb9a07ea01a2b9f
來源:掘金

著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM