消息中間件-消息的可靠性傳遞


消息中間件-消息的可靠性傳遞

前言

消息中間件的可靠性消息傳遞,是消息中間件領域非常重要的方案落實問題(在這之前的MQ理論,MQ選型是抽象層次更高的問題,這里不談)。

並且這個問題與日常開發是存在較大的關聯的。可以這么說,凡是使用了MQ的,機會都要考慮這個問題。當然也有一些原始數據采集,日志數據收集等應用場景對此沒有過高要求。但是大多數的業務場景,對此還是有着較高要求的。比如訂單系統,支付系統,消息系統等,你弄丟一條消息,嘿嘿。

網上對於這方面的博客,大多從單一MQ,或者干脆就是在論述MQ。我不喜歡這樣的論述,這樣的論述太過局限,也過於拖沓。

這次,主要從理論方面論證消息的可靠性傳遞的落實。具體技術,都是依據這些理論的,具體實現都差不多。不過為了便於大家理解,我在文中會以RabbitMq,Kafka這兩個主流MQ稍作舉例。

在日常開發中,我更傾向於在具體開發前,先整理思路,走通理論,再開始編碼。畢竟,如果連理論都走不同,還談什么編碼。

另外,我按照消息可靠性層次逐步推進,形成相應的目錄,希望大家喜歡(因為我認為,相較網上這方面現有博客的目錄,這樣的目錄更合理,更人性化)。

概述

這里簡單談一些有關消息可靠性傳遞的理論。

消息傳遞次數

消息在消息系統(生產者+MQ+消費者),其消費的次數,無非一下三種情況:

  • 最多一次
  • 最少一次
  • 不多不少一次

消息可靠性層次

這也代表着消息系統的消息可靠性的三個層次:

  • 最多一次:上游服務的消息發出了,至於下游能不能收到服務,就不管了。結果就是下游服務,可能根本就沒有接收到消息。
  • 最少一次:上游服務的消息發出了,並通過某些機制,確保下游服務一定收到了該消息。但是收到了幾次,就不管了。結果就是下游服務,可能多次收到同一條消息。
  • 不多不少一次:上游服務的消息發出了,並確保下游服務一定收到了消息。下游服務通過某些機制,確保多次收到該消息與單次收到該消息,對其系統狀態的影響是相同的。

方案落實

實現上述三個層次,需要逐步從三個方面考慮:

  • 最多一次:會用消息隊列即可,只要確保消息的連通性即可
  • 最少一次:通過MQ提供的確認機制,確保消息的傳遞
  • 不多不少一次:通過外部應用程序,確保消息的單次消費與多次消費對系統狀態影響是一致的

上述三個層次,對系統的性能損耗,系統復雜度等都是逐步上升的。

當然,我們首先,需要了解這三個層次分別如何實現。
再在實際開發中,根據需要,靈活選取合適方案。

最多一次的消息傳遞

這個方案是最簡單的,只要確保消息系統的正確運作,以及系統的連通性即可。在正常情況下,可以保證絕大部分數據的可靠性傳遞。但是仍舊存在極小數據的丟失,並且數據的丟失會因為消息隊列的選擇,以及消息並發量,而受到影響。

優點

  • 實現簡單。只要搭建對應的MQ服務器,寫出對應的生產者與消費者,以及相應配置,即可正常工作。

缺點

  • 無法保證數據的可靠性,會存在一定的數據丟失情況,尤其是在並發量較大時

實際應用

可以應用於日志上傳這樣對消息可靠性要求低的應用場景。

總結

如果數據量不大的情況下,推薦使用RabbitMQ,其消息可靠性在地數據量下,是最可靠的。但是在達到萬級並發時,會存在消息丟失,丟失的比例可以達到千分之一。

如果數據量較大的情況下,要么采用集群。要么就采用Kafk(Kafka可支持十萬級並發)

一般來說,這種消息可靠性多見於項目初建,或類似日志采集,原始數據采集這樣的特定場景。

最少一次的消息傳遞

這個方案開始利用MQ提供的特定機制,來提高消息傳遞的可靠性。

優點

  • 不錯的消息可靠性。確保不會出現消息丟失的情況
  • 實現並不復雜。只需要合理使用MQ的API,設置合理參數(如重試次數)即可

缺點

  • 會出現消息重復消費的情況
  • 參數的設置需要合理。如重試次數,一般設置為5次,也可根據情況,進行調整
  • 資源占用的提升。如帶寬(每次消息成功生產,消費都需要返回一條數據進行確認)等

方案落實

該方案的實現組成,由以下三個方面構成:

  • 消息的可靠生產
  • 消息的可靠存儲
  • 消息的可靠消費

通過以上三個方面的落實,確保可消息一定被下游服務消費。

消息的可靠生產

消息的可靠生產,是通過回調確認機制,確保消息一定被消息服務器接收。

消息生產,發送給消息服務器后,消息服務器會返回一個確認信息,表示數據正常接收。

如果生產者在一定時間內沒有接收到確認信息,就會觸發重試機制,進行消息的重發。

如RabbitMq的comfirm機制,Kafka的acks機制等。

RabbitMq的confirm機制存在三個模式:

  • 普通模式:channel.waitForConfirms()
  • 批量模式:channel.waitForConfirmsOrDie()
  • 異步模式:channel.addConfirmListener()

這三個模式,看名稱就可以知道具體作用了。如果希望了解具體代碼落實,詳見RabbitMQ事務和Confirm發送方消息確認——深入解讀,其中確認機制寫得較為簡潔。

至於Kafka的acks機制,同樣存在三個模式:

  • acks = 0 :不需要Kafka的任何Partition確認,即確認發送成功(這個之確保消息發送出去了,並不保證消息服務器是否成功接收)
  • acks = 1 :(默認)需要Kafka的Partition Leader確認,即被Kafka的一個Partition(Leader)接收。但是這樣依舊存在極小概率的消息丟失,即Partition Leader獲取了對應消息,並給了acks確認回復。但是在其他Partition同步前,Partition Leader宕機,數據丟失。那么這就造成了消息丟失。
  • acks = all :需要Kafka對應ISR中的全部Partition確認,才確認消息發送成功(當然,這里假定Kafka是多節點集群,如果只有一個分區,那就毫無意義了)。

說到這里,簡單說一下,上述的操作可能造成消息的重復生產。

最簡單的例子,消息成功發送,但是對應的消息確認信息由於網絡波動而丟失。那么生產者就會重復發送該消息,所以消息服務器接收到了兩條相同消息,故產生了消息的重復生產。

另外,上述的重試,都是存在響應時長判斷(超出1min,就認為數據丟失),以及重試次數限制(超過5次,就不進行重試。否則,大量重試數據可能會拖垮整個服務)。

消息的可靠存儲

消息的可靠存儲,是確保消息在消息服務器經過,或者說堆積時不會因為宕機,網絡等狀況,丟失消息。

網上很多博客在論述消息的可靠性傳遞時,常常把這點遺漏。因為他們理所當然地認為消息隊列已經通過集群等實現了消息隊列服務的可用性,故消息的可靠性存儲也就實現了。

但是這里存在兩個問題。第一,可靠性不等於可用性。第二,消息的可靠存儲,作為消息可靠性傳遞的一部分,是不可缺失的。

可用性:確保服務的可用。即對應的服務,可以提供服務。

可靠性:確保服務的正確。即對應的服務,提供的是正確的服務。

區別:我瀏覽淘寶,淘寶頁面打不開,這就涉及了可用性問題(可用性計算公式:可用時間/全部時長*100%)。而我瀏覽淘寶,查詢訂單,給我顯示的是別人的訂單,這就涉及了可靠性問題。

另外這里再糾正一點,可靠性並不依賴於可用性。即使我打不開淘寶頁面,我也不能說淘寶提供訂單查詢就有問題(只是如果沒有了可用性,談論可靠性是非常沒有意義的。畢竟都用不了了,誰還關心其內容是否正確呢,都看不到)

消息隊列的可用性,是通過多個節點構成集群,避免單點故障,從而提升可用性。

消息隊列的可靠存儲,是通過備份實現(這里不糾結備份如何確保正確)的。如RabbitMq集群的MemNode與DiskNode,又或者Kafka的replication機制等。

消息的可靠消費

消息的可靠消費,就是確保消息被消費者獲取,並被成功消費。避免由於消息丟失,或者消費者宕機而造成消息消費不成功,最終造成消息的丟失(因為RabbitMq服務器在認為消息被成功消費后,將對應數據刪除或標記為“已消費”)。

至於消息的可靠消費,核心理念還是重試,重試,再重試。不過具體的實現就八仙過海,各顯神通了。

這里分別說一下RabbitMq,Kafka,Rocket三者對於可靠消費的處理:

RabbitMq

提供ack機制。默認是auto,直接在拿到消息時,直接ack。確保了消息到達了消費者,但是無法解決消費者消費失敗這樣的問題。

實際開發中,為了確保消息的可靠消費,一般會設置為munal,只有在程序正確運行后,才會調用對應api,表示消息正確消費。

Kafka

由於Kafka的消息是落地到硬盤文件的,而且Kafka的消息分發方式是pull的,所以消息的拉取是通過offset機制去確認對應位置消息的。

當然,Kafka的offset默認是自動提交的(可通過nable_auto_commit與auto_commit_interval_ms控制)。

所以消費者調用服務失敗等原因,可以通過手動offset提交,來實現對數據的重復消費(甚至是歷史數據的消費),也就可以在消費失敗時對同一消息進行再消費。

如果是消費者宕機等原因,由於Kafka服務器沒有收到對應的offset提交,所以認為那條消息沒有被消費成功,故返回的依舊是那條消息。

RocketMq

其實RocketMq的處理有些類似Kafka確認機制+RabbitMq死信隊列的感覺。

首先,消費者從RocketMq拉取消息,如果成功消費,就返回確認消息。

如果未成功消費,就嘗試重新消費。

嘗試消費一定次數后(如5次),就會將該消息發送之RocketMq中的重試隊列。

如果遇到消費者宕機的情況,RocketMq會認為該消息未成功消費,會被其他消費者繼續消費。

其實在RabbitMq的可靠性消費時,我們也會將多次消費失敗的數據保存下來,便於后期修復等。不過保存的方式由很多種,日志,數據庫,消息隊列等。而RocketMq則給出了具體的落實方案。

上述的操作,可能造成消息的重復消費。

最簡單的例子,消息成功被消費者消費,但是消費者還沒來得及發送確認信息,就宕機了。

消息隊列由於沒有收到確認消息,認為該條消息尚未被消息,就將該消息交由其他消費者繼續消費。

不多不少一次的消息傳遞

這個方案,就是通過MQ以外的應用程序,來進行擴展,最終達到消息准確消費的目的。

那么為什么不將這個功能,囊括在MQ中呢?

個人認為有四個方面的考慮:

  • 消息中間件,應該明確其功能域,而消息生產與消息消費往往涉及業務,所以避免與業務的耦合。所以消息中間件只完善了可靠存儲。
  • 准確消費,往往涉及MQ以外的部分,需要其他部分的配合。就類似與XA接口一樣。這樣會帶來編碼的約束,系統的耦合性等。
  • 准確消費的實現可以通過一個工具,模塊去實現,但是不該硬編碼。畢竟現有的處理方案並不一定就是最優解(尤其是在調控中心,TCC框架展現的現在)。
  • 性能影響。為了一個不通用的功能,會帶來消息中間件的性能大幅下降

優勢

  • 確保消息被准確消費(不多不少一次)

缺點

  • 實現復雜(生產者與消費者都需要建立對應數據庫)
  • 需要建立對應規范(但是通用規范確定后,實現就會變得快速)
  • 資源占用的提升。如帶寬(每次消息成功生產,消費都需要返回一條數據進行確認)等

存在的問題

消息存儲部分的准確存儲,不該我們來操心,所以只闡述消息生產與消息消費兩個部分。

消息的重復生產

  • 消息發給了消息隊列服務器,消息隊列服務器的確認信息由於網絡波動等,沒有及時到達生產者
  • 消息發送給了消息隊列服務器,生產者在接收消息前,宕機
  • 消息發送給了消息隊列服務器,生產者在接收消息后,還沒來得及進行確認邏輯,宕機

綜上來看,就是消息發出后,到生產者消息確認信息的處理之間,出現各種意外,導致重復生產。

消息的重復消費

  • 消息已經被消費,消費者還沒來得及發送確認信息,就宕機了
  • 消息已經被消費,消費者發出確認信息,確認信息由於網絡波動等,沒有及時到達消息隊列服務器
  • 消息已經被消費,消費者發出確認信息,消息隊列服務器對應實例在接收到確認信息前,宕機
  • 消息已經被消費,消費者發出確認信息,消息隊列服務器接受到了確認信息,還沒來得及進行確認邏輯,宕機

綜上來看,就是消息已經被消費后,到消息隊列服務器進行確認消息處理之間,出現各種意外,導致重復消費。

解決方案

解決方案:messageId+冪等

准確來說,解決方案的核心是冪等,而messageId是作為輔助手段的。

冪等

冪等,簡單說明一下,就是多次操作與單次操作對系統狀態的影響是一致的。

i = 1; 

就是冪等操作,因為無論進行幾次,i的值都沒有變化。

i++;

則不是冪等操作,因為i的值與執行次數息息相關。

故通過冪等操作來確保同一條消息,不被執行多次。

messageId

但是,消費者如何確定是否為同一條消息呢?

有的消息體存在唯一性字段,如orderId等。但有的消息並沒有這樣的唯一性字段。

所以需要一個專門的字段,來表示唯一性,並且與業務消息解耦。這就是messageId。

既可以采用消息體的唯一性字段(可以是單一字段,也可以是組合字段),也可以通過特定方式生成對應標識(分布式系統,需要注意不同實例生產者產生相同標識的可能,詳見分布式全局唯一ID的實現)。

具體的生成情況,就不在這里贅述了。

方案落實

先來一張大圖(這種事情,圖片展示最直觀了),展示一下流程:

在這里插入圖片描述
(圖片是絕對清晰的。看不清圖片的朋友,請將圖片在新頁面打開,或下載。說實話,來到新公司,首先提升的就是畫圖能力。囧)

簡單說一下流程,大家可以對照着上圖,看一下:

生產者到消息中間件服務器

  1. 生產者根據需要發送的消息,生成對應messageId。並封裝對應message至生產者數據庫(該操作應該利用事務性,確保生產者事件處理與message保存至數據庫的原子性),同時標注message狀態為sending(發送中狀態)
  2. 將對應message發送給消息隊列服務器
  3. 如果沒有收到生產確認信息,則重新發送message(如果這個時候遇到生產者實例宕機,也不用擔心。因為后續會有補償程序,進行補償重發操作)
  4. 當收到消息中間件服務器的消息生產確認消息(即確定消息已經達到消息中間件服務器),將數據庫中對應message的狀態修改為sended(已發送狀態)

上述中提到的補償機制,其實是類似事務中的一個操作。通過一個定時任務,定時巡檢數據庫處於sending狀態的message,並通過生產者極性發送(所以message一般都保存source,target等信息)。

之所以會有sending狀態的message,就是因為存在生產者消息發送出去了,還沒收到生產確認信息,結果生產者實例自己宕機的情況。

至於補償機制的定時任務,是一個非常簡單的實現,這里就不再贅述了。

消息中間件到消費者

這里進行的操作是針對非冪等的操作。

如果是冪等操作,則可以直接進行。畢竟多次執行與單次執行對數據庫的影響是一致的。

但是注意冪等操作在部分場景下無效的問題(時間影響上),如“余額 = 1k”的操作對於數據庫而言是冪等的,但是在兩次“余額 = 1k”操作間,有一個“余額 = 2k”的操作,則會發生問題(丟失了“余額 = 2k”操作)。當然,這種類似ABA問題,完全可以引入版本號,來進行解決。

綜上,還是推薦采用以下解決方法,流程較為簡單:

  1. 消費者獲取數據
  2. 消費者判斷數據庫是否有對應message
  3. 如果存在對應message,則放棄執行(因為這是一個重復操作)
  4. 如果不存在,則進行相關消息處理。並通過事務控制,在消費者數據庫中添加message(確保消息的處理與數據庫添加message是原子操作)

至此,消息的准確傳遞就完成了。

總結

消息可靠性傳遞的發展過程,也體現了人們對消息中間件功能的一步步追求,更是體現了工程師們解決問題的思路。

很多時候,我們會遇到很多問題,甚至令人感到雜亂不堪,無從下手。這個時候,最好的辦法就是靜下心來,對它們進行划分(按照重要程度,緊迫度,實現難度),再進行一個長期規划,一步步來解決。往往這個時候,動動筆,在筆記上列下清單,會是一個不錯的辦法。

其中消息的准確傳遞,涉及一些事務相關的內容。也許有人已經聯想到,消息隊列是否可以作為分布式事務的一種手段呢?我會在之后的博客中,來闡述分布式事務這一重要主題。

如果有什么問題或想法,可以私信或@我。

願與諸君共進步。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM