我們暫且不考慮寫磁盤的具體過程,先大致看看下面的圖,這代表了 Kafka 的核心架構原理。

Kafka 分布式存儲架構
那么現在問題來了,如果每天產生幾十 TB 的數據,難道都寫一台機器的磁盤上嗎?這明顯是不靠譜的啊!所以說,這里就得考慮數據的分布式存儲了,我們結合 Kafka 的具體情況來說說。在 Kafka 里面,有一個核心的概念叫做“Topic”,這個 Topic 你就姑且認為是一個數據集合吧。舉個例子,如果你現在有一份網站的用戶行為數據要寫入 Kafka,你可以搞一個 Topic 叫做“user_access_log_topic”,這里寫入的都是用戶行為數據。然后如果你要把電商網站的訂單數據的增刪改變更記錄寫 Kafka,那可以搞一個 Topic 叫做“order_tb_topic”,這里寫入的都是訂單表的變更記錄。然后假如說咱們舉個例子,就說這個用戶行為 Topic 吧,里面如果每天寫入幾十 TB 的數據,你覺得都放一台機器上靠譜嗎?明顯不太靠譜,所以 Kafka 有一個概念叫做 Partition,就是把一個 Topic 數據集合拆分為多個數據分區,你可以認為是多個數據分片,每個 Partition 可以在不同的機器上,儲存部分數據。這樣,不就可以把一個超大的數據集合分布式存儲在多台機器上了嗎?大家看下圖,一起來體會一下。

Kafka 高可用架構
但是這個時候,我們又會遇到一個問題,就是萬一某台機器宕機了,這台機器上的那個 Partition 管理的數據不就丟失了嗎?
所以說,我們還得做多副本冗余,每個 Partition 都可以搞一個副本放在別的機器上,這樣某台機器宕機,只不過是 Partition 其中一個副本丟失。如果某個 Partition 有多副本的話,Kafka 會選舉其中一個 Parititon 副本作為 Leader,然后其他的 Partition 副本是 Follower。只有 Leader Partition 是對外提供讀寫操作的,Follower Partition 就是從 Leader Partition 同步數據。一旦 Leader Partition 宕機了,就會選舉其他的 Follower Partition 作為新的 Leader Partition 對外提供讀寫服務,這不就實現了高可用架構了?
大家看下面的圖,看看這個過程:

Kafka 寫入數據丟失問題
現在我們來看看,什么情況下 Kafka 中寫入數據會丟失呢?其實也很簡單,大家都知道寫入數據都是往某個 Partition 的 Leader 寫入的,然后那個 Partition 的 Follower 會從 Leader 同步數據。但是萬一 1 條數據剛寫入 Leader Partition,還沒來得及同步給 Follower,此時 Leader Partiton 所在機器突然就宕機了呢?
大家看下圖:

如上圖,這個時候有一條數據是沒同步到 Partition0 的 Follower 上去的,然后 Partition0 的 Leader 所在機器宕機了。此時就會選舉 Partition0 的 Follower 作為新的 Leader 對外提供服務,然后用戶是不是就讀不到剛才寫入的那條數據了?因為 Partition0 的 Follower 上是沒有同步到最新的一條數據的。這個時候就會造成數據丟失的問題。
Kafka 的 ISR 機制是什么?
現在我們先留着這個問題不說具體怎么解決,先回過頭來看一個 Kafka 的核心機制,就是 ISR 機制。這個機制簡單來說,就是會自動給每個 Partition 維護一個 ISR 列表,這個列表里一定會有 Leader,然后還會包含跟 Leader 保持同步的 Follower。也就是說,只要 Leader 的某個 Follower 一直跟他保持數據同步,那么就會存在於 ISR 列表里。但是如果 Follower 因為自身發生一些問題,導致不能及時的從 Leader 同步數據過去,那么這個 Follower 就會被認為是“out-of-sync”,被從 ISR 列表里踢出去。所以大家先得明白這個 ISR 是什么,說白了,就是 Kafka 自動維護和監控哪些 Follower 及時的跟上了 Leader 的數據同步。
Kafka 寫入的數據如何保證不丟失?
所以如果要讓寫入 Kafka 的數據不丟失,你需要保證如下幾點:
每個 Partition 都至少得有 1 個 Follower 在 ISR 列表里,跟上了 Leader 的數據同步。
每次寫入數據的時候,都要求至少寫入 Partition Leader 成功,同時還有至少一個 ISR 里的 Follower 也寫入成功,才算這個寫入是成功了。
如果不滿足上述兩個條件,那就一直寫入失敗,讓生產系統不停的嘗試重試,直到滿足上述兩個條件,然后才能認為寫入成功。
按照上述思路去配置相應的參數,才能保證寫入 Kafka 的數據不會丟失。
好!現在咱們來分析一下上面幾點要求。
第一條,必須要求至少一個 Follower 在 ISR 列表里。
那必須的啊,要是 Leader 沒有 Follower 了,或者是 Follower 都沒法及時同步 Leader 數據,那么這個事兒肯定就沒法弄下去了。
第二條,每次寫入數據的時候,要求 Leader 寫入成功以外,至少一個 ISR 里的 Follower 也寫成功。
大家看下面的圖,這個要求就是保證說,每次寫數據,必須是 Leader 和 Follower 都寫成功了,才能算是寫成功,保證一條數據必須有兩個以上的副本。這個時候萬一 Leader 宕機,就可以切換到那個 Follower 上去,那么 Follower 上是有剛寫入的數據的,此時數據就不會丟失了。

如上圖所示,假如現在 Leader 沒有 Follower 了,或者是剛寫入 Leader,Leader 立馬就宕機,還沒來得及同步給 Follower。在這種情況下,寫入就會失敗,然后你就讓生產者不停的重試,直到 Kafka 恢復正常滿足上述條件,才能繼續寫入。這樣就可以讓寫入 Kafka 的數據不丟失。
總結
最后總結一下,其實 Kafka 的數據丟失問題,涉及到方方面面。譬如生產端的緩存問題,包括消費端的問題,同時 Kafka 自己內部的底層算法和機制也可能導致數據丟失。但是平時寫入數據遇到比較大的一個問題,就是 Leader 切換時可能導致數據丟失。所以本文僅僅是針對這個問題說了一下生產環境解決這個問題的方案。
【消息隊列】kafka是如何保證消息不被重復消費的
一、kafka自帶的消費機制
kafka有個offset的概念,當每個消息被寫進去后,都有一個offset,代表他的序號,然后consumer消費該數據之后,隔一段時間,會把自己消費過的消息的offset提交一下,代表我已經消費過了。下次我要是重啟,就會繼續從上次消費到的offset來繼續消費。
但是當我們直接kill進程了,再重啟。這會導致consumer有些消息處理了,但是沒來得及提交offset。等重啟之后,少數消息就會再次消費一次。
其他MQ也會有這種重復消費的問題,那么針對這種問題,我們需要從業務角度,考慮它的冪等性。
二、通過保證消息隊列消費的冪等性來保證
舉個例子,當消費一條消息時就往數據庫插入一條數據。如何保證重復消費也插入一條數據呢?
那么我們就需要從冪等性角度考慮了。冪等性,我通俗點說,就一個數據,或者一個請求,無論來多次,對應的數據都不會改變的,不能出錯。
怎么保證消息隊列消費的冪等性?
我們需要結合業務來思考,比如下面的例子:
1.比如某個數據要寫庫,你先根據主鍵查一下,如果數據有了,就別插入了,update一下好吧
2.比如你是寫redis,那沒問題了,反正每次都是set,天然冪等性
3.對於消息,我們可以建個表(專門存儲消息消費記錄)
生產者,發送消息前判斷庫中是否有記錄(有記錄說明已發送),沒有記錄,先入庫,狀態為待消費,然后發送消息並把主鍵id帶上。
消費者,接收消息,通過主鍵ID查詢記錄表,判斷消息狀態是否已消費。若沒消費過,則處理消息,處理完后,更新消息記錄的狀態為已消費。
今天我們聊一個話題,這個話題大家可能在面試過程中,或者是工作當中經常遇到 :point_right: 如何保證 Kafka 消息不重復消費? 我們在做開發的時候為了程序的健壯性,在使用 Kafka 的時候一般都會設置重試的次數,但是因為網絡的一些原因,設置了重試就有可能導致有些消息重復發送了(當然導致消息重復也有可能是其他原因),那么怎么解決消息重復這個問題呢?
關於這個問題,我這兒提供了如下三種解決方案,供大家參考。
解決方案
方案一 / 保存並查詢
給每個消息都設置一個獨一無二的 key,消費的時候把 key 記錄下來,然后每次消費新的消息的時候都查詢一下,看當前消息的這個 key 是否消費過,如果沒有消費過才進行消費。(這種方式好想,但是其實實現起來一點也不簡單)
方案二 / 利用冪等
冪等(Idempotence) 在數學上是這樣定義的,如果一個函數 f(x) 滿足:f(f(x)) = f(x),則函數 f(x) 滿足冪等性。
這個概念被拓展到計算機領域,被用來描述一個操作、方法或者服務。一個冪等操作的特點是, 其任意多次執行所產生的影響均與一次執行的影響相同 。一個冪等的方法,使用同樣的參數,對它進行多次調用和一次調用,對系統產生的影響是一樣的。 所 以,對於冪等的方法,不用擔心重復執行會對系統造 成任何改變。
我們舉個例子:chestnut: 來說明一下。在不考慮並發的情況下,“將 X 老 師的 賬戶余額設置為 100 萬元”,執行一次后對系統的影響是,X 老師的賬戶余額變成了 100 萬元。 只要提供的參數 100萬元不變,那即使再執行多少次,X 老師的賬戶余額始終都是 100萬元,不會變化,這個操作就是一個冪 等的操作。
再舉一個例子:chestnut: ,“將 X 老師的余額加 100 萬元”,這個操作它就不是冪等的,每執行一次,賬戶余額就會增加 100 萬元,執行多次和執行一次對系統的影響(也就是賬戶的余額)是不一樣的。
所以,通過這兩個例子,我們可以想到如果系統消費消息的業務邏輯具備冪等性,那就不用擔心消息重復的問題了,因為同一條消息,消費一次和消費多次對系統的影響是完全一樣的。 也就可以認為,消費多次等於消費一次。
那么,如何實現冪等操作呢? 最好的方式就是,從業務邏輯設計上入手,將消費的業務邏輯設計成具備冪等性的操作 。 但是,不是所有的業務都能設計成天然冪等的,這里就需要一些方法和技巧來實現冪等。
下面我們介紹一種常用的方法: 利用數據庫的唯一約束實現冪等 。
例如,我們剛剛提到的那個不具備冪等特性的轉賬的例子: 將 X 老師的賬戶余額加 100 萬元。 在這個例子中,我們可以通過改造業務邏輯,讓它具備冪等性。
首先,我們可以限定,對於每個轉賬單每個賬戶只可以執行一次變更操作,在分布式系統中,這個限制實現的方法非常多,最簡單的是我們在數據庫中建一張轉賬流水表,這個表有三個字段: 轉賬單 ID、賬戶 ID 和變更金額,然后給轉賬單 ID 和賬戶 ID 這兩個字段聯合起來創建一個唯一約束,這樣對於相同的轉賬單 ID 和賬戶 ID,表里至多只能存在一條記錄。
這樣,我們消費消息的邏輯可以變為: “在轉賬流水表中增加一條轉賬記錄,然后再根據轉賬記錄,異步操作更新用戶余額即可。 ”在轉賬流水表增加一條轉賬記錄這個操作中,由於我們在這個表中預先定義了“賬戶 ID 轉賬單 ID”的唯一約束,對於同一個轉賬單同一個賬戶只能插入一條記錄,后續重復的插入操作都會失敗,這樣就實現了一個冪等的操作。
方案三 / 設置前置條件
為更新的數據設置前置條件另外一種實現冪等的思路是, 給數據變更設置一個前置條件 ,如果滿足條件就更新數據,否則拒絕更新數據,在更新數據的時候,同時變更前置條件中需要判斷的數據。
這樣,重復執行這個操作時,由於第一次更新數據的時候已經變更了前置條件中需要判斷 的數據,不滿足前置條件,則不會重復執行更新數據操作。
比如,剛剛我們說過,“將 X 老師的賬戶的余額增加 100 萬元”這個操作並不滿足冪等性,我們可以把這個操作加上一個前置條件,變為: “如果X老師的賬戶當前的余額為 500萬元,將余額加 100萬元”,這個操作就具備了冪等性。
對應到消息隊列中的使用時,可以在發消息時在消息體中帶上當前的余額,在消費的時候進行判斷數據庫中,當前余額是否與消息中的余額相等,只有相等才執行變更操作。
但是,如果我們要更新的數據不是數值,或者我們要做一個比較復雜的更新操作怎么辦? 用什么作為前置判斷條件呢? 更加通用的方法是,給你的數據增加一個版本號屬性,每次更數據前,比較當前數據的版本號是否和消息中的版本號一致 ,如果不一致就拒絕更新數據,更新數據的同時將版本號 +1,一樣可以實現冪等。
每天都會有更新看過的朋友可以點波關注,Java學習路線和優質資源評論或后台回復“Java”獲取。