Kafka 是如何做到消息不丟或不重復的


 

相信大家在工作中都用過消息隊列,特別是 Kafka 使用得更是普遍,業務工程師在使用 Kafka 的時候除了擔憂 Kafka 服務端宕機外,其實最怕如下這樣兩件事。

 
  • 消息丟失。下游系統沒收到上游系統發送的消息,造成系統間數據不一致。比如,訂單系統沒有把成功狀態的訂單消息成功發送到消息隊列里,造成下游的統計系統沒有收到下單成功訂單的消息,於是造成系統間數據的不一致,從而引起用戶查看個人訂單列表時跟實際不相符的問題。
  • 消息重復。相同的消息重復發送會造成消費者消費兩次同樣的消息,這同樣會造成系統間數據的不一致。比如,訂單支付成功后會通過消息隊列給支付系統發送需要扣款的金額,如果消息發送兩次一樣的扣款消息,而訂單只支付了一次,就會給用戶帶來余額多扣款的問題。
總結來說,這兩個問題直接影響到業務系統間的數據一致性。
 
那到底該如何避免這兩個問題的發生呢?
 
Kafka 針對這兩個問題有系統的解決方案,需要服務端、客戶端做相應的配置以及采取一些補償方案。
 
下面我會從生產端、服務端、消費端三個角度講解 Kafka 是如何做到消息不丟失或消息不重復的
 

先介紹下三種消息語義及場景

介紹一下“消息語義”的概念,這是理論基礎,會有利於你更好地抓住下面解決方案的要點。
 
消息語義有三種,分別是:消息最多傳遞一次、消息最少傳遞一次、消息有且僅有一次傳遞,這三種語義分別對應:消息不重復、消息不丟失、消息既不丟失也不重復。
 
這里的“消息傳遞一次”是指生產者生產消息成功,Broker 接收和保存消息成功,消費者消費消息成功。對一個消息來說,這三個要同時滿足才算是“消息傳遞一次”。上面所說的那三種消息語義可梳理為如下。
 
  • 最多一次(At most once):對應消息不重復。消息最多傳遞一次,消息有可能會丟,但不會重復。一般運用於高並發量、高吞吐,但是對於消息的丟失不是很敏感的場景。
  • 最少一次(At least once):對應消息不丟失。消息最少傳遞一次,消息不會丟,但有可能重復。一般用於並發量一般,對於消息重復傳遞不敏感的場景。
  • 有且僅有一次(Exactly once):每條消息只會被傳遞一次,消息不會丟失,也不會重復。 用於對消息可靠性要求高,且對吞吐量要求不高的場景。

 

Kafka 如何做到消息不丟失?

我們先來討論一下 Kafka 是如何做到消息不丟失的,也就是:生產者不少生產消息,服務端不丟失消息,消費者也不能少消費消息。
 
那具體要怎么來實現呢?下面我們就來詳細講解下。
 
生產端:不少生產消息
 
以下是為了保證消息不丟失,生產端需要配置的參數和相關使用方法。
 
  • 第一個,要使用帶回調方法的 API,具體 API 方法如下:
 
  Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
使用帶有回調方法的 API 時,我們可以根據回調函數得知消息是否發送成功,如果發送失敗了,我們要進行異常處理,比如把失敗消息存儲到本地硬盤或遠程數據庫,等應用正常了再發送,這樣才能保證消息不丟失。
  • 第二個,設置參數 acks=-1。acks 這個參數是指有多少分區副本收到消息后,生產者才認為消息發送成功了,可選的參數值有 0、1 和 -1。
    acks=0,表示生產者不等待任何服務器節點的響應,只要發送消息就認為成功。
    acks=1,表示生產者收到 leader 分區的響應就認為發送成功。
    acks=-1,表示只有當 ISR(ISR 的含義后面我會詳細介紹)中的副本全部收到消息時,生產者才會認為消息生產成功了。這種配置是最安全的,因為如果 leader 副本掛了,當 follower 副本被選為 leader 副本時,消息也不會丟失。但是系統吞吐量會降低,因為生產者要等待所有副本都收到消息后才能再次發送消息。
 
  • 第三個,設置參數 retries=3。參數 retries 表示生產者生產消息的重試次數。這里 retries=3 是一個建議值,一般情況下能滿足足夠的重試次數就能重試成功。但是如果重試失敗了,對異常處理時就可以把消息保存到其他可靠的地方,如磁盤、數據庫、遠程緩存等,然后等到服務正常了再繼續發送消息。
  • 第四個,設置參數 retry.backoff.ms=300。retry.backoff.ms 指消息生產超時或失敗后重試的間隔時間,單位是毫秒。如果重試時間太短,會出現系統還沒恢復就開始重試的情況,進而導致再次失敗。結合我個人經驗來說,300 毫秒還是比較合適的。
 
只要上面這四個要點配置對了,就可以保證生產端的生產者不少生產消息了。

 

服務端:不丟失消息
 

以下是為了保證服務端不丟消息,服務端需要配置的參數。

  • 第一個,設置 replication.factor >1。replication.factor 這個參數表示分區副本的個數,這里我們要將其設置為大於 1 的數,這樣當 leader 副本掛了,follower 副本還能被選為 leader 副本繼續接收消息。
  • 第二個,設置 min.insync.replicas >1。min.insync.replicas 指的是 ISR 最少的副本數量,原理同上,也需要大於 1 的副本數量來保證消息不丟失。

    這里我簡單介紹下 ISR。ISR 是一個分區副本的集合,每個分區都有自己的一個 ISR 集合。但不是所有的副本都會在這個集合里,首先 leader 副本是在 ISR 集合里的,如果一個 follower 副本的消息沒落后 leader 副本太長時間,這個 follower 副本也在 ISR 集合里;可是如果有一個 follower 副本落后 leader 副本太長時間,就會從 ISR 集合里被淘汰出去。也就是說,ISR 里的副本數量是小於或等於分區的副本數量的。

  • 第三個,設置 unclean.leader.election.enable = false。unclean.leader.election.enable 指是否能把非 ISR 集合中的副本選舉為 leader 副本。unclean.leader.election.enable = true,也就是說允許非 ISR 集合中的 follower 副本成為 leader 副本。

 

消費端:不能少消費消息
 
為了保證不丟失消息,消費者就不能少消費消息,該如何去實現呢?消費端需要做好如下的配置。
 
第一個,設置 enable.auto.commit=false。enable.auto.commit 這個參數表示是否自動提交,如果是自動提交會導致什么問題出現呢?
 
消費者消費消息是有兩個步驟的,首先拉取消息,然后再處理消息。向服務端提交消息偏移量可以手動提交也可以自動提交。
 
如果把參數 enable.auto.commit 設置為 true 就表示消息偏移量是由消費端自動提交,由異步線程去完成的,業務線程無法控制。如果剛拉取了消息之后,業務處理還沒進行完,這時提交了消息偏移量但是消費者卻掛了,這就造成還沒進行完業務處理的消息的位移被提交了,下次再消費就消費不到這些消息,造成消息的丟失。因此,一定要設置 enable.auto.commit=false,也就是手動提交消息偏移量。
 
第二個,要有手動提交偏移量的正確步驟。enable.auto.commit=false 並不能完全滿足消費端消息不丟的條件,還要有正確的手動提交偏移量的過程。具體如何操作呢?
 
業務邏輯先對消息進行處理,再提交 offset,這樣是能夠保證不少消費消息的。但是你可以想象這樣一個場景:如果消費者在處理完消息后、提交 offset 前出現宕機,待消費者再上線時,還會處理未提交的那部分消息,但是這部分已經被消費者處理過了,也就是說這樣做雖然避免了丟消息,但是會有重復消費的情況出現。(這種情況比較少,一般特殊情況特殊處理就好)
 
具體代碼需要這么寫:
List<String> messages = consumer.poll(); processMsg(messages); consumer.commitOffset();
 

Kafka 如何做到消息不重復?

 
生產端不重復生產消息,服務端不重復存儲消息,消費端也不能重復消費消息。
 
相較上面“消息不丟失”的場景,“消息不重復”的服務端無須做特別的配置,因為服務端不會重復存儲消息,如果有重復消息也應該是由生產端重復發送造成的。也就是說,下面我們只需要分析生產端和消費端就行。
 
 
生產端:不重復生產消息
 
生產端發送消息后,服務端已經收到消息了,但是假如遇到網絡問題,無法獲得響應,生產端就無法判斷該消息是否成功提交到了 Kafka,而我們一般會配置重試次數,但這樣會引發生產端重新發送同一條消息,從而造成消息重復的發送。
 
對於這個問題,Kafka 0.11.0 的版本之前並沒有什么解決方案,不過從 0.11.0 的版本開始,Kafka 給每個生產端生成一個唯一的 ID,並且在每條消息中生成一個 sequence num,sequence num 是遞增且唯一的,這樣就能對消息去重,達到一個生產端不重復發送一條消息的目的。
 
但是這個方法是有局限性的,只對在一個生產端內生產的消息有效,如果一個消息分別在兩個生產端發送就不行了,還是會造成消息的重復發送。好在這種可能性比較小,因為消息的重試一般會在一個生產端內進行。當然,對應一個消息分別在兩個生產端發送的請求我們也有方案,只是要多做一些補償的工作,比如,我們可以為每一個消息分配一個全局 ID,並把全局 ID 存放在遠程緩存或關系型數據庫里,這樣在發送前可以判斷一下是否已經發送過了。
 
 
消費端:不能重復消費消息
 
為了保證消息不重復,消費端就不能重復消費消息,該如何去實現呢?消費端需要做好如下配置。
 
第一步,設置 enable.auto.commit=false。跟前面一樣,這里同樣要避免自動提交偏移量。你可以想象這樣一種情況,消費端拉取消息和處理消息都完成了,但是自動提交偏移量還沒提交消費端卻掛了,這時候 Kafka 消費組開始重新平衡並把分區分給另一個消費者,由於偏移量沒提交新的消費者會重復拉取消息,這就最終造成重復消費消息。
 
第二步,單純配成手動提交同樣不能避免重復消費,還需要消費端使用正確的消費“姿勢”。
 
消費者拉取消息后,先提交 offset 后再處理消息,這樣就不會出現重復消費消息的可能。但是你可以想象這樣一個場景:在提交 offset 之后、業務邏輯處理消息之前出現了宕機,待消費者重新上線時,就無法讀到剛剛已經提交而未處理的這部分消息,還是會有少消費消息的情況。這種情況也是少數,可以根據業務做補償
 
具體代碼如下:
 
List messages = consumer.poll(); 
consumer.commitOffset(); 
processMsg(messages);

 

總結一下:

 

Kafka 中消息不丟失、不重復很重要,就我個人經驗來講,業務人員除了擔憂消息隊列服務端宕機外,對消息的丟失和消息的重復會非常敏感,因為這直接影響到了業務本身。
 
總體來講,要保證消息不丟失和不重復,你要從生產端、服務端和消費端三個部分全盤考慮才可行,只是單獨考慮某一端是遠遠不夠的。同時,我也希望你搞懂消息語義的含義,因為所有的消息隊列都會有相應的涉及。
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM