RabbitMQ 和 Kafka 的消息可靠性對比


RabbitMQ和Kafka都提供持久的消息保證。兩者都提供至少一次和至多一次的保證,另外,Kafka在某些限定情況下可以提供精確的一次(exactly-once)保證。

讓我們首先理解一下上述術語的含義:

至多一次投遞:消息絕對不會被重復投遞,但是消息可能丟失

至少一次投遞:消息絕對不會被丟失,但是有可能重復被消費

精確的一次投遞:消息系統的聖杯。所有的消息精確的被投遞一次。

“投遞”貌似不是准確的語言描述,“處理”才是。無論怎么描述,我們關心的是,消費者能否處理消息,以及處理的次數。但是使用“處理”會使問題變得復雜。比如說,消息必須投遞兩次才能被處理一次。再比如,如果消費者在處理的過程中宕機,消息必須被第二次投遞(給另一個消費者)。

其次,使用“處理”來表達會使得部分失敗(partial failure)變得頭疼。處理消息一般包括多個步驟。處理的開始到結束包括應用的邏輯以及應用與消息系統的通信。應用邏輯的部分失敗由應用來處理。如果應用處理的邏輯是事務的,結果是all or nothing, 那么應用邏輯可以避免部分失敗。但是實際上,多個步驟往往涉及不同的系統,使得事務性變得不可能。如果我們考慮到通信,應用,緩存,數據庫,我們無法達到精確的一次處理(exactly-once processing).

所以,精確地一次只出現在如下情況中:消息的處理只包括消息系統本身,並且消息系統本身的處理是事務的。在該限定場景下,我們可以處理消息,寫消息,發送消息被處理的ACK, 一切都在事務中。而這正是Kafka流能提供的。

但是,如果消息處理是冪等(idempotent)的,我們就可以繞過基於事務的精確一次保證。如果消息處理是冪等的,我們可以安全的處理重復的消息。當然,並不是所有的消息處理都是冪等的。

 

責任鏈

本質上講,生產者不能知道消息是否被消費。他們能知道的是,消息系統是否接收了消息,是否把消息安全的存儲起來以便投遞。這里存在一條責任鏈,開始於生產者,移動到消息系統,最后到達消費者。每個環節都要正確執行,環節間的交接也要正確執行。這意味着,作為一個應用開發者,你要正確的寫程序,防止丟失消息,或者濫用消息。

消息順序

這篇文章主要關注RabbitMQ和Kafka如何提供至少一次和至多一次的投遞。但是,也包括消息的順序。簡單來講,兩者都支持FIFO順序。RabbitMQ在隊列這個層次,Kafka在話題的分區層次。

RabbitMQ

投遞保證依賴於:

消息的持久性——一旦存儲下來,就不會丟失

消息的ACK——RabbitMQ與生產者、消費者之間的信號

隊列鏡像

隊列可以在節點間被鏡像(復制)。對於每個隊列,存在一個主隊列,在單獨一個節點上。假設我們有3個節點,10 個隊列,每個隊列2個鏡像。那么10個主隊列和20個鏡像將分布在3個節點間。主隊列如何分布是可以被配置的。當一個節點宕機后,

在宕機的節點上的每一個主隊列,在另一個節點上的鏡像隊列會被提升為主隊列

在其他節點上的鏡像隊列會被創建出來,以代替宕機的節點上的鏡像隊列,從而維護復制因子(replication factor)

持久的隊列

RabbitMQ有兩種隊列:持久的和非持久的。持久的隊列會被存儲在磁盤上,節點重啟后會重新構建出來。

持久的消息

持久的隊列不能保證消息可以在宕機時被保留下來。只有被設定為持久的消息才會在宕機重啟后恢復。

對於RabbitMQ,越多的消息是持久的,隊列的吞吐率就越差。因此如果你有實時流,而且輕微的丟數據不會有大問題,那么你不該考慮隊列鏡像,並且消息應該設定為非持久的。然而,如果你必須不能在節點宕機時丟失數據,那么應該使用隊列表鏡像,持久的隊列和持久的消息。

消息的ACK

消息發布

消息發布時,可能會被丟失或重復。這取決於生產者的行為。

Fire and Forget 發布者可以選擇不使用生產者ACK,簡單的發動消息棄之不顧。消息不會被復制,但是可能被丟失(至多一次投遞)

發布確認:當發布者與中間人(broker)建立頻道后,可以 設置該頻道使用確認消息。則中間人會回復發布者的消息如下:

basic.ack:正ACK.消息已經收到,現在消息在RabbitMQ這邊了。

basic.nack:負ACK.發生錯誤,消息未被處理。責任還在發布者。發布者可能需要重發。

除了以上兩種,還有一種回復basic.return。有時發布者不僅需要知道中間人收到了消息,而且需要知道消息已經在若干隊列中持久化了。比如,有時發布者發布了一條消息給交換機,但是交換機上沒有綁定任何匹配的隊列,那么中間人會簡單的丟棄消息。大多數情況下,這沒有問題,但是有時,發布者需要知道消息是被丟棄了還是被處理了。可以對每個消息設定mandatory標記,如此一來,如果消息沒有被處理而是被丟棄,那么會返回一個basic.return

發布者可以選擇發送每一條消息都等待ACK,但是會嚴重影響吞吐率。所以,發布者一般發布消息流,但是會限制未ACK的消息的數目。一旦達到了message in flight 的數目限制,發布者會暫停,等待ACK的到來。

現在,我們有了多條在途中的消息(在發布者與RabbitMQ之間),為了提高吞吐率,RabbitMQ使用multiple標記位來將ACK組成一組。如此一來,所有的消息會被分配一個單調遞增的序列號(Sequence Number)。消息的ACK中會包含對應的序列號。當組合使用Multiple標記位時,發布者需要維護發送出去消息的序列號,以便它知道哪些消息被ACK。

所以,利用ACK,我們可以通過以下方法避免消息丟失:

當收到nack,重新發布消息。

當收到nack或者basic.return,將消息持久化到某地。

 

事務:在RabbitMQ中,並不常用事務。因為

不明確的保證:如果消息被路由到多個隊列,或者起用了mandatory標記,那么事務的原子性是不可靠的。

性能比較差。

坦率的講,我從未使用過事務,它增加了額外的保證,提高了不確定性。

連接/頻道異常:除了消息的ACK外,發布者還需要考慮連接斷開或者中間人出錯,兩者都會導致頻道丟失。頻道丟失會導致無法接收消息的ACK.在這個問題上,發布者可以考慮妥協,一種是冒消息丟失的風險一種是冒消息重復的風險。

如果中間人宕機,可能此時消息還在OS的buffer中,或者正在被解析,因此被丟失。又或者,這條消息已經持久化,正當中間人發送ACK時,宕機了,在這種情況下,其實消息已經成功投遞了。

連接斷開同樣如此。我們無法得知宕機的具體時機,所以只能選擇:

不重新發布,冒消息丟失的風險

重新發布,冒消息重復的風險

如果發布者有很多在途的消息,問題會惡化。一種方式是發布者提供提示,告訴消費者消息是重發的,讓消費者嘗試去重。

 

 

消費者

對於ACK,消費者有兩種選擇

無ACK模式

手動ACK模式。

無ACK模式:或者稱為自動ACK模式,是危險的。首先,只要消息投遞給應用層,就會被從隊列中刪除。這會導致消息丟失:

消息還在內部buffer中,但是應用層宕機

消息處理失敗

其次,我們無法控制消息傳遞的速度。使用手動ACK,我們可以設定預取(QoS)值,來限制應用獲得的未ACK的消息的數目。如果沒有這個功能,RabbitMQ會很快的傳遞消息,超出消費者可以處理的訥訥管理,導致內部buffer溢出或內存問題。

手動ACK模式:消費者必須手動給出消息的ACK.消費者可以設定預取值大於一,便可以並行的處理多條數據。消費者可以選擇單條消息的發送ACK,也可以設定multiple標記位,一次ACK多條消息。批處理會提高性能。

當消費者打開一個頻道,被投遞的消息會收到一個單調上升的整數值Delivery Tag。這個信息會包括在ACK當中作為消息的標識。

ACK有如下幾種:

basic.ack.RabbitMQ會從隊列中刪除該條消息。可以使用multiple標記。

basic.nack。消費者需要告訴RabbitMQ是否需要重新將消息壓入隊列。重入隊列意味着消息會被放在隊列頭,再次投遞給消費者。也支持multiple標志位。

basic.reject.與basic.nack類似,但是不支持multiple標記位。

所以從語義上級講,basic.ack與(basic.nack&requeue==false)是等價的。都會導致消息從隊列中刪除。

下一個問題是,什么時候發送ACK?如果消息處理很快,可以選擇消息處理完再發送ACK.但是,如果消息處理需要幾分鍾,那么處理完再發送ACK是有問題的。如果頻道宕機,所有未ACK的消息會重入隊列,導致消息重復。

通信/頻道 故障

如果通信故障,或者中間人故障導致頻道宕機,那么所有的未ACK的消息都會重新入隊列再次投遞,這不會導致消息丟失,但是會導致消息重復。

消費者保持未ACK的消息越久,消息被重新投遞的風險越高。當消息是被重投遞時,消息會設置redelivered標志位。所以最壞情況下,至少消費者是可以知道消息是一條重發的消息。

冪等性

如果你需要冪等並且保證消息不會丟失,那么意味着你需要實現消息去重或其他冪等模式。如果消息去重非常耗時,那么你可以讓發布者對重發的消息添加頭數據,讓消費者檢查頭數據和redelivered 標志位。

結論
RabbitMQ提供提供強大的,可靠地,持久的消息保證,但是,你有很多辦法把它弄糟。

以下是一些注意事項

如果想要保證至少一次投遞,使用隊列鏡像,持久的隊列,持久的消息,發布者ACK,mandatory標志位,手動消費者ACK;

使用最少一次投遞,你或許需要增加去重邏輯或者使用冪等范式

如果你不關心消息丟失,而更關注低延時和高度可擴展,那么你不需要使用隊列鏡像,持久的消息和發布者ACK.當然,我自己會保留使用手動消費者ACK,通過設定預取2值來控制消息投遞的速度,當然,你需要設定multiple標志位並批量ACK.

 

Kafka

Kafka的投遞通過如下保證:

消息持久性:一旦存入話題,消息不會丟失

消息ACK:kafka(或者包括Zookeeper)與生產者、消費者信號

關於批處理

Kaka和RabbitMQ有在消息批量發送、消費方面不同。RabbitMQ可以實現如下:

每發送x條消息就暫停,直到所有消息的ACK被收到。RabbitMQ通常將多條ACK組成一組,使用multiple標志位

消費者設定一個預取值,將消息的ACK組成一組

但是消息本身不是批量發送的,它更多的是指允許一組消息在途,使用multiple 標志位。這一點跟TCP很像。

而Kafka則有明確的消息批量處理。批處理可以提高性能,同時也需要權衡,正如RabbitMQ權衡在途的未ACK消息一樣。越多的在途消息,會導致越嚴重的消息重復(當故障發生時)。

Kafka可以更高效的在消費者端進行批處理,因為kafka有分區的概念。每個分區對應一個消費者,所以及時一個很大的批處理也不會營子昂負載的分布。然而,對於RabbitMQ而言,如果使用已經被廢棄的拉取API拉取批量的消息,會導致非常嚴重的負載不均衡。以及很長的處理延時。RabbitMQ在設計時就不適合批處理。

持久性

日志復制

為了容錯,Kafka在分區層面有一個主從架構,主分區成為master,復制分區成為slave或者follower.每個master可以有很多follower.當主分區的服務器宕機后,follower中會有一份被提升為主分區,所以只會導致短暫的服務停止,但是不會導致數據丟失。

Kafka有一個概念,叫做In Sync Replicas(同步的復制)。每一個復制都可以是同步的,或是非同步的。同步意味着跟主分區相比,擁有相同的消息。復制可能會變成非同步的,如果它落后了。這可能是因為網絡延遲,宿主機故障等。消息丟失只會發生在如下情況:主分區服務器宕機,所有的復制都是非同步的。

消息ACK與偏移追蹤

取決於Kafka如何存儲消息以及消費者如何消費消息,Kafka依賴於消息ACK來進行偏移追蹤。

生產者的消息ACK

當生產者發送消息時,會告訴中間人何種期待ACK:

不需要ACK:fire and forget, 對應於acks = 0

主分區已經將消息持久化。 對應於acks=1

主分區以及所有同步的復制都將消息持久化, 對應於acks=ALL

消息可以在發布時被復制,正如RabbitMQ一樣。如果中間人宕機或者網絡故障,發布者會把沒有收到ACK的消息重發。當然,大多數情況下,消息應該是被主分區持久化並復制了。

然而,Kafka有一個很好的去重的特性,但是必須如下設置:

enable.idempotence 設置成true

max.in.flight.requests.per.connection 低於5

retries設置1或更高

acks設置成ALL

在這種配置下,如果你為了吞吐率,批處理的單位設置成6或者acks設置成0/1,那么你就沒辦法獲得去重。

消費者偏移追蹤

消費者需要存儲他們的偏移以備宕機,讓另一個消費者接替。偏移存儲在zookeeper上或者kafka的話題中。

一旦消費者從分區中讀取一批量的消息,它有多種選擇去更新偏移:

立即更新:在開始處理消息前。這對應於最多一次投遞。無論消費者是否宕機,消息都不會被重復。比如10條正在被處理,此時消費者在第五條消息處理時宕機,那么只有前4條消息被處理,其余被跳過,接替的消費者從下一個批次開始。

最后更新。當所有消息都被處理后。這對應於至少一次投遞。無論消費者是否宕機,沒有消息會被丟失,盡管消息會被處理兩次。比如10條消息正在被處理,當消費者在消費第五條消息時宕機,則整個10條消息會被接替的消費者再次處理。

精確地一次語義只有在使用Java Library Kafka Stream時被保證。如果你使用Java,我強烈推薦使用。精確一次語義的只要問題在於消息的處理和偏移的更新需要哎事務中完成。例如,如果消息處理是發送一條郵件的話,那么我們就無法完成精確的一次。例如我們發送玩郵件后,消費者宕機,我們可以更新偏移,但是會導致郵件再次被發送。

Kafka Stream 的Java 應用,將消息處理后生成新的消息不同的話題,那么這個應用將是滿足精確一次語義的。因為我們可以使用Kafka的事務功能與寫消息並更新偏移。

關於事務和隔離層次

Kafka中事務的應用主要是讀-處理-寫模式。事務可以跨越多個話題和分區。一個生產者打開一個事務,寫一個批量的消息,然后提交事務。

當消費者使用默認的read uncommited 隔離級別時,消費者可以看到所有的消息,無論是提交的,未提交的,還是終止的。當消費者使用read committed隔離級別時,消費者不會看到未提交的或者終止的消息。

你可能比較疑惑,隔離級別如何影響消息順序。答案是,不影響。消費者依舊按序讀取消息。Last Stable Offset(LSO)之前的消息都會被讀取。

總結

RabbitMQ和Kafka都提供可靠的,持久的消息系統,所以如果可靠性對你來說很重要,那么你大可放心,兩者都是可靠的。當時,Kafka更勝一籌,因為提供冪等的發布,並且,及時錯誤的操作偏移,消息也不會丟失。

顯然,沒有十全十美的產品,但是只要應用正確的使用ACK,管理員正確的配置復制,並且你的數據中心沒有轟然倒塌,你就可以放心,消息不會丟失。至於容錯和可用性,也需要另外討論。

下面是一些簡單結論:

兩者都提供至多一次和至少一次語義

兩者都提供復制

兩者對消息重復和吞吐率有相同的取舍。盡管kafka提供冪等的發布,但是僅限於一定的體量。

兩者都可以控制在途的未ACK消息數量

兩者都保證順序

Kafka提供真正的事務操作,主要用於讀-處理-寫。盡管你需要注意吞吐率。

使用Kafka,及時消費者錯誤處理,但是可以使用偏移進行回退。RabbitMQ則不行。

Kafka基於分區的概念,可以使用批處理提高性能。而RabbitMQ不適合批處理,因為它基於推送模型,並且使用競爭的消費者。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM