0x00. 消息的發送流程
一條消息從生產到被消費,將會經歷三個階段:

生產階段,Producer 新建消息,然后通過網絡將消息投遞給 MQ Broker存儲階段,消息將會存儲在 Broker 端磁盤中消息階段, Consumer 將會從 Broker 拉取消息以上任一階段都可能會丟失消息,我們只要找到這三個階段丟失消息原因,采用合理的辦法避免丟失,就可以徹底解決消息丟失的問題。
0x01. 生產階段
生產者(Producer) 通過網絡發送消息給 Broker,當 Broker 收到之后,將會返回確認響應信息給 Producer。所以生產者只要接收到返回的確認響應,就代表消息在生產階段未丟失。
RocketMQ 發送消息示例代碼如下:

send 方法是一個同步操作,只要這個方法不拋出任何異常,就代表消息已經發送成功。
消息發送成功僅代表消息已經到了 Broker 端,Broker 在不同配置下,可能會返回不同響應狀態:
SendStatus.SEND_OKSendStatus.FLUSH_DISK_TIMEOUTSendStatus.FLUSH_SLAVE_TIMEOUTSendStatus.SLAVE_NOT_AVAILABLE引用官方狀態說明:

上圖中不同 broker 端配置將會在下文詳細解釋
另外 RocketMQ 還提供異步的發送的方式,適合於鏈路耗時較長,對響應時間較為敏感的業務場景。

異步發送消息一定要注意重寫回調方法,在回調方法中檢查發送結果。
不管是同步還是異步的方式,都會碰到網絡問題導致發送失敗的情況。針對這種情況,我們可以設置合理的重試次數,當出現網絡問題,可以自動重試。設置方式如下:

0x02. Broker 存儲階段
默認情況下,消息只要到了 Broker 端,將會優先保存到內存中,然后立刻返回確認響應給生產者。隨后 Broker 定期批量的將一組消息從內存異步刷入磁盤。
這種方式減少 I/O 次數,可以取得更好的性能,但是如果發生機器掉電,異常宕機等情況,消息還未及時刷入磁盤,就會出現丟失消息的情況。
若想保證 Broker 端不丟消息,保證消息的可靠性,我們需要將消息保存機制修改為同步刷盤方式,即消息存儲磁盤成功,才會返回響應。
修改 Broker 端配置如下:
## 默認情況為 ASYNC_FLUSH flushDiskType = SYNC_FLUSH
若 Broker 未在同步刷盤時間內(默認為 5s)完成刷盤,將會返回 SendStatus.FLUSH_DISK_TIMEOUT 狀態給生產者。
集群部署
為了保證可用性,Broker 通常采用一主(master)多從(slave)部署方式。為了保證消息不丟失,消息還需要復制到 slave 節點。
默認方式下,消息寫入 master成功,就可以返回確認響應給生產者,接着消息將會異步復制到 slave 節點。
注:master 配置:flushDiskType = SYNC_FLUSH
此時若 master 突然宕機且不可恢復,那么還未復制到 slave的消息將會丟失。
為了進一步提高消息的可靠性,我們可以采用同步的復制方式,master節點將會同步等待 slave 節點復制完成,才會返回確認響應。
異步復制與同步復制區別如下圖:

注: 大家不要被上圖誤導,broker master 只能配置一種復制方式,上圖只為解釋同步復制的與異步復制的概念。
Broker master 節點 同步復制配置如下:
## 默認為 ASYNC_MASTER brokerRole=SYNC_MASTER
如果 slave節點未在指定時間內同步返回響應,生產者將會收到SendStatus.FLUSH_SLAVE_TIMEOUT 返回狀態。
小結
結合生產階段與存儲階段,若需要嚴格保證消息不丟失,broker 需要采用如下配置:

同時這個過程我們還需要生產者配合,判斷返回狀態是否是 SendStatus.SEND_OK。若是其他狀態,就需要考慮補償重試。
雖然上述配置提高消息的高可靠性,但是會降低性能,生產實踐中需要綜合選擇。
0x03. 消費階段
消費者從 broker 拉取消息,然后執行相應的業務邏輯。一旦執行成功,將會返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 狀態給 Broker。
如果 Broker 未收到消費確認響應或收到其他狀態,消費者下次還會再次拉取到該條消息,進行重試。這樣的方式有效避免了消費者消費過程發生異常,或者消息在網絡傳輸中丟失的情況。
消息消費的代碼如下:

以上消費消息過程的,我們需要注意返回消息狀態。只有當業務邏輯真正執行成功,我們才能返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS。否則我們需要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,稍后再重試。
0x04. 總結
看完 RocketMQ 不丟消息處理辦法,回頭再看這篇 kafka,有沒有發現,兩者解決思路是一樣的,區別就是參數配置不一樣而已。
所以下一次,面試官再問你 XX 消息隊列如何保證不丟消息?如果你沒用過這個消息隊列,也不要哭,微笑面對他,從容給他分析那幾步會丟失,然后大致解決思路。
最后我們還可以說出我們的思考,雖然提高消息可靠性,但是可能導致消息重發,重復消費。所以對於消費客戶端,需要注意保證冪等性。
zz:https://baijiahao.baidu.com/s?id=1662095398693413299&wfr=spider&for=pc