Kafka 如何保證消息的消費順序一致性


Kafka 如何保證消息的消費順序?

在Kafka中Partition(分區)是真正保存消息的地方,發送的消息都存放在這里。Partition(分區)又存在於Topic(主題)中,並且一個Topic(主題)可以指定多個Partition(分區)。

在Kafka中,只保證Partition(分區)內有序,不保證Topic所有分區都是有序的。

所以 Kafka 要保證消息的消費順序,可以有2種方法:

一、1個Topic(主題)只創建1個Partition(分區),這樣生產者的所有數據都發送到了一個Partition(分區),保證了消息的消費順序。

二、生產者在發送消息的時候指定要發送到哪個Partition(分區)。

我們需要將 producer 發送的數據封裝成一個 ProducerRecord 對象。

(1)指明 partition 的情況下,直接將指明的值直接作為 partiton 值;

(2)沒有指明 partition 值但有 key 的情況下,將 key 的 hash 值與 topic 的 partition數進行取余得到 partition 值; 

 在Producer往Kafka插入數據時,控制同一Key分發到同一Partition,並且設置參數max.in.flight.requests.per.connection=1,也即同一個鏈接只能發送一條消息,如此便可嚴格保證Kafka消息的順序

(3)既沒有 partition 值又沒有 key 值的情況下,第一次調用時隨機生成一個整數(后

面每次調用在這個整數上自增),將這個值與 topic 可用的 partition 總數取余得到 partition

值,也就是常說的 round-robin 算法。

 

三、以下所有的分析都是基於同一個partition下的場景細化,
多partition下無法保障消息的順序性,單一partition若碰到如下場景還是需要調整參數。

場景一:設置了retries>0,並且max.in.flight.requests.per.connection>1

1、retries

生產者從服務器收到的錯誤有可能是臨時性的錯誤(比如分區找不到首領)。在這種情況下,retries參數的值決定了生產者可以重發消息的次數,
如果達到這個次數,生產者會放棄重試並返回錯誤。默認情況下,生產者會在每次重試之間等待 100ms,不過可以通過 retry.backoff.ms 參數來改變這個時間間隔。建議在設置重試次數和重試時間間隔之前,先測試一下恢復一個崩潰節點需要多少時間(比如所有分區選舉出首領需要多長時間),讓總的重試時間比 Kafka 集群從崩潰中恢復的時間長,否則生產者會過早地放棄重試。不過有些錯誤不是臨時性錯誤,沒辦法通過重試來解決(比如“消息太大”錯誤)。

2、max.in.flight.requests.per.connection

該參數指定了生產者在收到服務器響應之前可以發送多少個消息。它的值越高,就會占用越多的內存,不過也會提升吞吐量。
把它設為 1 可以保證消息是按照發送的順序寫入服務器的,即使發生了重試。這種場景下無法保障單一partition的有序,一般來說要保障消息的有序性,對於消息的可靠性也是有要求的,
所以一般retries可以設置為大於0,但是max.in.flight.requests.per.connection設置為1即可,不過這樣就有一個問題,導致了消息的吞吐量大大降低。


場景二:需要提升吞吐量max.in.flight.requests.per.connection設置大於1

此場景下業務要保障消息的吞吐量,那么max.in.flight.requests.per.connection必然就會選擇更大的一個閾值,但是此場景還能保障消息有序性嗎?答案是肯定的,
可以設置enable.idempotence=true,開啟生產者的冪等生產,可以解決順序性問題,並且允許max.in.flight.requests.per.connection設置大於1

 

四、參考:Kafka如何保證單partition有序?

1.producer發消息到隊列時,通過加鎖保證有序現在假設兩個問題broker leader在給producer發送ack時,因網絡原因超時,那么Producer將重試,造成消息重復。

先后兩條消息發送。t1時刻msg1發送失敗,msg2發送成功,t2時刻msg1重試后發送成功。造成亂序。

2.解決重試機制引起的消息亂序為實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。
對於每個PID,該Producer發送消息的每個<Topic, Partition>都對應一個單調遞增的Sequence Number。
同樣,Broker端也會為每個<PID, Topic, Partition>維護一個序號,並且每Commit一條消息時將其對應序號遞增。對於接收的每條消息,如果其序號比Broker維護的序號)大一,
則Broker會接受它,否則將其丟棄:如果消息序號比Broker維護的序號差值比一大,說明中間有數據尚未寫入,即亂序,此時Broker拒絕該消息,
Producer拋出InvalidSequenceNumber如果消息序號小於等於Broker維護的序號,說明該消息已被保存,即為重復消息,Broker直接丟棄該消息,
Producer拋出DuplicateSequenceNumberSender發送失敗后會重試,這樣可以保證每個消息都被發送到broker

 

參考:

https://www.zhihu.com/question/266390197

https://zhuanlan.zhihu.com/p/262156222


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM