對於剛剛接觸消息隊列的同學,最常遇到的問題,也是最頭痛的問題就是丟消息了。對於大部分業務系統來說,丟消息意味着數據丟失,是完全無法接受的。
其實,現在主流的消息隊列產品都提供了非常完善的消息可靠性保證機制,完全可以做到在消息傳遞過程中,即使發生網絡中斷或者硬件故障,也能確保消息的可靠傳遞,不丟消息。
絕大部分丟消息的原因都是由於開發者不熟悉消息隊列,沒有正確使用和配置消息隊列導致的。雖然不同的消息隊列提供的 API 不一樣,相關的配置項也不同,但是在保證消息可靠傳遞這塊兒,它們的實現原理是一樣的。
這節課我們就來講一下,消息隊列是怎么保證消息可靠傳遞的,這里面的實現原理是怎么樣的。當你熟知原理以后,無論你使用任何一種消息隊列,再簡單看一下它的 API 和相關配置項,就能很快知道該如何配置消息隊列,寫出可靠的代碼,避免消息丟失。
檢測消息丟失的方法
我們說,用消息隊列最尷尬的情況不是丟消息,而是消息丟了還不知道。一般而言,一個新的系統剛剛上線,各方面都不太穩定,需要一個磨合期,這個時候,特別需要監控到你的系統中是否有消息丟失的情況。
如果是 IT 基礎設施比較完善的公司,一般都有分布式鏈路追蹤系統,使用類似的追蹤系統可以很方便地追蹤每一條消息。如果沒有這樣的追蹤系統,這里我提供一個比較簡單的方法,來檢查是否有消息丟失的情況。
我們可以利用消息隊列的有序性來驗證是否有消息丟失。原理非常簡單,在 Producer 端,我們給每個發出的消息附加一個連續遞增的序號,然后在 Consumer 端來檢查這個序號的連續性。
如果沒有消息丟失,Consumer 收到消息的序號必然是連續遞增的,或者說收到的消息,其中的序號必然是上一條消息的序號 +1。如果檢測到序號不連續,那就是丟消息了。還可以通過缺失的序號來確定丟失的是哪條消息,方便進一步排查原因。
大多數消息隊列的客戶端都支持攔截器機制,你可以利用這個攔截器機制,在 Producer 發送消息之前的攔截器中將序號注入到消息中,在 Consumer 收到消息的攔截器中檢測序號的連續性,這樣實現的好處是消息檢測的代碼不會侵入到你的業務代碼中,待你的系統穩定后,也方便將這部分檢測的邏輯關閉或者刪除。
如果是在一個分布式系統中實現這個檢測方法,有幾個問題需要你注意。
首先,像 Kafka 和 RocketMQ 這樣的消息隊列,它是不保證在 Topic 上的嚴格順序的,只能保證分區上的消息是有序的,所以我們在發消息的時候必須要指定分區,並且,在每個分區單獨檢測消息序號的連續性。
如果你的系統中 Producer 是多實例的,由於並不好協調多個 Producer 之間的發送順序,所以也需要每個 Producer 分別生成各自的消息序號,並且需要附加上 Producer 的標識,在 Consumer 端按照每個 Producer 分別來檢測序號的連續性。
Consumer 實例的數量最好和分區數量一致,做到 Consumer 和分區一一對應,這樣會比較方便地在 Consumer 內檢測消息序號的連續性。
確保消息可靠傳遞
講完了檢測消息丟失的方法,接下來我們一起來看一下,整個消息從生產到消費的過程中,哪些地方可能會導致丟消息,以及應該如何避免消息丟失。
你可以看下這個圖,一條消息從生產到消費完成這個過程,可以划分三個階段,為了方便描述,我給每個階段分別起了個名字。
- 生產階段: 在這個階段,從消息在 Producer 創建出來,經過網絡傳輸發送到 Broker 端。
- 存儲階段: 在這個階段,消息在 Broker 端存儲,如果是集群,消息會在這個階段被復制到其他的副本上。
- 消費階段: 在這個階段,Consumer 從 Broker 上拉取消息,經過網絡傳輸發送到 Consumer 上。
1. 生產階段
在生產階段,消息隊列通過最常用的請求確認機制,來保證消息的可靠傳遞:當你的代碼調用發消息方法時,消息隊列的客戶端會把消息發送到 Broker,Broker 收到消息后,會給客戶端返回一個確認響應,表明消息已經收到了。客戶端收到響應后,完成了一次正常消息的發送。
只要 Producer 收到了 Broker 的確認響應,就可以保證消息在生產階段不會丟失。有些消息隊列在長時間沒收到發送確認響應后,會自動重試,如果重試再失敗,就會以返回值或者異常的方式告知用戶。
你在編寫發送消息代碼時,需要注意,正確處理返回值或者捕獲異常,就可以保證這個階段的消息不會丟失。以 Kafka 為例,我們看一下如何可靠地發送消息:
同步發送時,只要注意捕獲異常即可。
try { RecordMetadata metadata = producer.send(record).get(); System.out.println(" 消息發送成功。"); } catch (Throwable e) { System.out.println(" 消息發送失敗!"); System.out.println(e); }
異步發送時,則需要在回調方法里進行檢查。這個地方是需要特別注意的,很多丟消息的原因就是,我們使用了異步發送,卻沒有在回調中檢查發送結果。
producer.send(record, (metadata, exception) -> { if (metadata != null) { System.out.println(" 消息發送成功。"); } else { System.out.println(" 消息發送失敗!"); System.out.println(exception); } });
2. 存儲階段
在存儲階段正常情況下,只要 Broker 在正常運行,就不會出現丟失消息的問題,但是如果 Broker 出現了故障,比如進程死掉了或者服務器宕機了,還是可能會丟失消息的。
如果對消息的可靠性要求非常高,可以通過配置 Broker 參數來避免因為宕機丟消息。
對於單個節點的 Broker,需要配置 Broker 參數,在收到消息后,將消息寫入磁盤后再給 Producer 返回確認響應,這樣即使發生宕機,由於消息已經被寫入磁盤,就不會丟失消息,恢復后還可以繼續消費。例如,在 RocketMQ 中,需要將刷盤方式 flushDiskType 配置為 SYNC_FLUSH 同步刷盤。
如果是 Broker 是由多個節點組成的集群,需要將 Broker 集群配置成:至少將消息發送到 2 個以上的節點,再給客戶端回復發送確認響應。這樣當某個 Broker 宕機時,其他的 Broker 可以替代宕機的 Broker,也不會發生消息丟失。后面我會專門安排一節課,來講解在集群模式下,消息隊列是如何通過消息復制來確保消息的可靠性的。
3. 消費階段
消費階段采用和生產階段類似的確認機制來保證消息的可靠傳遞,客戶端從 Broker 拉取消息后,執行用戶的消費業務邏輯,成功后,才會給 Broker 發送消費確認響應。如果 Broker 沒有收到消費確認響應,下次拉消息的時候還會返回同一條消息,確保消息不會在網絡傳輸過程中丟失,也不會因為客戶端在執行消費邏輯中出錯導致丟失。
你在編寫消費代碼時需要注意的是,不要在收到消息后就立即發送消費確認,而是應該在執行完所有消費業務邏輯之后,再發送消費確認。
同樣,我們以用 Python 語言消費 RabbitMQ 消息為例,來看一下如何實現一段可靠的消費代碼:
def callback(ch, method, properties, body): print(" [x] 收到消息 %r" % body) # 在這兒處理收到的消息 database.save(body) print(" [x] 消費完成 ") # 完成消費業務邏輯后發送消費確認響應 ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(queue='hello', on_message_callback=callback)
你可以看到,在消費的回調方法 callback 中,正確的順序是,先是把消息保存到數據庫中,然后再發送消費確認響應。這樣如果保存消息到數據庫失敗了,就不會執行消費確認的代碼,下次拉到的還是這條消息,直到消費成功。
小結
這節課我帶大家分析了一條消息從發送到消費整個流程中,消息隊列是如何確保消息的可靠性,不會丟失的。這個過程可以分為分三個階段,每個階段都需要正確的編寫代碼並且設置正確的配置項,才能配合消息隊列的可靠性機制,確保消息不會丟失。
- 在生產階段,你需要捕獲消息發送的錯誤,並重發消息。
- 在存儲階段,你可以通過配置刷盤和復制相關的參數,讓消息寫入到多個副本的磁盤上,來確保消息不會因為某個 Broker 宕機或者磁盤損壞而丟失。
- 在消費階段,你需要在處理完全部消費業務邏輯之后,再發送消費確認。
你在理解了這幾個階段的原理后,如果再出現丟消息的情況,應該可以通過在代碼中加一些日志的方式,很快定位到是哪個階段出了問題,然后再進一步深入分析,快速找到問題原因。