圖解 Kafka 水印備份機制


高可用是很多分布式系統中必備的特征之一,Kafka 日志的高可用是通過基於 leader-follower 的多副本同步實現的,每個分區下有多個副本,其中只有一個是 leader 副本,提供發送和消費消息,其余都是 follower 副本,不斷地發送 fetch 請求給 leader 副本以同步消息,如果 leader 在整個集群運行過程中不發生故障,follower 副本不會起到任何作用,問題就在於任何系統都不能保證其穩定運行,當 leader 副本所在的 broker 崩潰之后,其中一個 follower 副本就會成為該分區下新的 leader 副本,那么問題來了,在選為新的 leader 副本時,會導致消息丟失或者離散嗎?Kafka 是如何解決 leader 副本變更時消息不會出錯?以及 leader 與 follower 副本之間的數據同步是如何進行的?帶着這幾個問題,我們接着往下看,一起揭開 Kafka 水印備份的神秘面紗。

水印相關概念

在講解水印備份之前,我們必須要先搞清楚幾個關鍵的術語以及它們的含義,下面我用一張圖來示意 Kafka 分區副本的位移信息:

如上圖所示,綠色部分表示已完全備份的消息,對消費者可見,紫色部分表示未完全備份的消息,對消費者不可見。

LEO(last end offset):日志末端位移,記錄了該副本對象底層日志文件中下一條消息的位移值,副本寫入消息的時候,會自動更新 LEO 值。

HW(high watermark):從名字可以知道,該值叫高水印值,HW 一定不會大於 LEO 值,小於 HW 值的消息被認為是“已提交”或“已備份”的消息,並對消費者可見。

leader 會保存兩個類型的 LEO 值,一個是自己的 LEO,另一個是 remote LEO 值,remote LEO 值就是 follower 副本的 LEO 值,意味着 follower 副本的 LEO 值會保存兩份,一份保存到 leader 副本中,一份保存到自己這里。

remote LEO 值有什么用呢?

它是決定 HW 值大小的關鍵,當 HW 要更新時,就會對比 LEO 值(也包括 leader LEO),取最小的那個做最新的 HW 值。

以下介紹 LEO 和 HW 值的更新機制:

LEO 更新機制:

  1. leader 副本自身的 LEO 值更新:在 Producer 消息發送過來時,即 leader 副本當前最新存儲的消息位移位置 +1;
  2. follower 副本自身的 LEO 值更新:從 leader 副本中 fetch 到消息並寫到本地日志文件時,即 follower 副本當前同步 leader 副本最新的消息位移位置 +1;
  3. leader 副本中的 remote LEO 值更新:每次 follower 副本發送 fetch 請求都會包含 follower 當前 LEO 值,leader 拿到該值就會嘗試更新 remote LEO 值。

leader HW 更新機制:

leader HW 更新分為故障時更新與正常時更新:

故障時更新:

  1. 副本被選為 leader 副本時:當某個 follower 副本被選為分區的 leader 副本時,kafka 就會嘗試更新 HW 值;
  2. 副本被踢出 ISR 時:如果某個副本追不上 leader 副本進度,或者所在 broker 崩潰了,導致被踢出 ISR,leader 也會檢查 HW 值是否需要更新,畢竟 HW 值更新只跟處於 ISR 的副本 LEO 有關系。

正常時更新:

  1. producer 向 leader 副本寫入消息時:在消息寫入時會更新 leader LEO 值,因此需要再檢查是否需要更新 HW 值;
  2. leader 處理 follower FETCH 請求時:follower 的 fetch 請求會攜帶 LEO 值,leader 會根據這個值更新對應的 remote LEO 值,同時也需要檢查是否需要更新 HW 值。

follower HW 更新機制:

  1. follower 更新 HW 發生在其更新 LEO 之后,每次 follower Fetch 響應體都會包含 leader 的 HW 值,然后比較當前 LEO 值,取最小的作為新的 HW 值。

圖解水印備份過程

在了解了 Kafka 水印備份機制的相關概念之后,下面我用圖來幫大家更好地理解 Kafka 的水印備份過程,假設某個分區有兩個副本,min.insync.replica=1:

Step 1:leader 和 follower 副本處於初始化值,follower 副本發送 fetch 請求,由於 leader 副本沒有數據,因此不會進行同步操作;

Step 2:生產者發送了消息 m1 到分區 leader 副本,寫入該條消息后 leader 更新 LEO = 1;

Step 3:follower 發送 fetch 請求,攜帶當前最新的 offset = 0,leader 處理 fetch 請求時,更新 remote LEO = 0,對比 LEO 值最小為 0,所以 HW = 0,leader 副本響應消息數據及 leader HW = 0 給 follower,follower 寫入消息后,更新 LEO 值,同時對比 leader HW 值,取最小的作為新的 HW 值,此時 follower HW = 0,這也意味着,follower HW 是不會超過 leader HW 值的。

Step 4:follower 發送第二輪 fetch 請求,攜帶當前最新的 offset = 1,leader 處理 fetch 請求時,更新 remote LEO = 1,對比 LEO 值最小為 1,所以 HW = 1,此時 leader 沒有新的消息數據,所以直接返回 leader HW = 1 給 follower,follower 對比當前最新的 LEO 值 與 leader HW 值,取最小的作為新的 HW 值,此時 follower HW = 1。

基於水印備份機制的一些缺陷

從以上步驟可看出,leader 中保存的 remote LEO 值的更新總是需要額外一輪 fetch RPC 請求才能完成,這意味着在 leader 切換過程中,會存在數據丟失以及數據不一致的問題,下面我用圖來說明存在的問題:

  • 數據丟失

前面也說過,leader 中的 HW 值是在 follower 下一輪 fetch RPC 請求中完成更新的,如上圖所示,有副本 A 和 B,其中 B 為 leader 副本,A 為 follower 副本,在 A 進行第二段 fetch 請求,並接收到響應之后,此時 B 已經將 HW 更新為 2,如果這是 A 還沒處理完響應就崩潰了,即 follower 沒有及時更新 HW 值,A 重啟時,會自動將 LEO 值調整到之前的 HW 值,即會進行日志截斷,接着會向 B 發送 fetch 請求,但很不幸的是此時 B 也發生宕機了,Kafka 會將 A 選舉為新的分區 Leader。當 B 重啟后,會從 向 A 發送 fetch 請求,收到 fetch 響應后,拿到 HW 值,並更新本地 HW 值,此時 HW 被調整為 1(之前是 2),這時 B 會做日志截斷,因此,offsets = 1 的消息被永久地刪除了。

可能你會問,follower 副本為什么要進行日志截斷?

這是由於消息會先記錄到 leader,follower 再從 leader 中拉取消息進行同步,這就導致 leader LEO 會比 follower 的要大(ollower之間的offset也不盡相同,雖然最終會一致,但過程中會有差異),假設此時出現 leader 切換,有可能選舉了一個 LEO 較小的 follower 成為新的 leader,這時該副本的 LEO 就會成為新的標准,這就會導致 follower LEO 值有可能會比 leader LEO 值要大的情況,因此 follower 在進行同步之前,需要從 leader 獲取 LastOffset 的值(該值后面會有解釋),如果 LastOffset 小於 當前 LEO,則需要進行日志截斷,然后再從 leader 拉取數據實現同步。

可能你還會問,日志截斷會不會造成數據丟失?

前面也說過,HW 值以上的消息是沒有“已提交”或“已備份”的,因此消息也是對消費者不可見,即這些消息不對用戶作承諾,也即是說從 HW 值截斷日志,並不會導致數據丟失(承諾用戶范圍內)。

  • 數據不一致/離散

以上情況,需要滿足以下其中一個條件才會發生:

  1. 宕機之前,B 已不在 ISR 列表中,unclean.leader.election.enable=true,即允許非 ISR 中副本成為 leader;
  2. B 消息寫入到 pagecache,但尚未 flush 到磁盤。

分區有兩個副本,其中 A 為 Leader 副本,B 為 follower 副本,A 已經寫入兩條消息,且 HW 更新到 2,B 只寫了 1條消息,HW 為 1,此時 A 和 B 同時宕機,B 先重啟,B 成為了 leader 副本,這時生產者發送了一條消息,保存到 B 中,由於此時分區只有 B,B 在寫入消息時把 HW 更新到 2,就在這時候 A 重新啟動,發現 leader HW 為 2,跟自己的 HW 一樣,因此沒有執行日志截斷,這就造成了 A 的 offset=1 的日志與 B 的 offset=1 的日志不一樣的現象。

leader epoch

為了解決 HW 更新時機是異步延遲的,而 HW 又是決定日志是否備份成功的標志,從而造成數據丟失和數據不一致的現象,Kafka 引入了 leader epoch 機制,在每個副本日志目錄下都創建一個 leader-epoch-checkpoint 文件,用於保存 leader 的 epoch 信息,如下,leader epoch 長這樣:

它的格式為 (epoch offset),epoch指的是 leader 版本,它是一個單調遞增的一個正整數值,每次 leader 變更,epoch 版本都會 +1,offset 是每一代 leader 寫入的第一條消息的位移值,比如:

(0, 0)
(1, 300)

以上第二個版本是從位移300開始寫入消息,意味着第一個版本寫入了 0-299 的消息。

leader epoch 具體的工作機制如下:

1)當副本成為 leader 時:

這時,如果此時生產者有新消息發送過來,會首先新的 leader epoch 以及 LEO 添加到 leader-epoch-checkpoint 文件中。

2)當副本變成 follower 時:

  1. 發送 LeaderEpochRequest 請求給 leader 副本,該請求包括了 follower 中最新的 epoch 版本;
  2. leader 返回給 follower 的相應中包含了一個 LastOffset,如果 follower last epoch = leader last epoch,則 LastOffset = leader LEO,否則取大於 follower last epoch 中最小的 leader epoch 的 start offset 值,舉個例子:假設 follower last epoch = 1,此時 leader 有 (1, 20) (2, 80) (3, 120),則 LastOffset = 80;
  3. follower 拿到 LastOffset 之后,會對比當前 LEO 值是否大於 LastOffset,如果當前 LEO 大於 LastOffset,則從 LastOffset 截斷日志;
  4. follower 開始發送 fetch 請求給 leader 保持消息同步。

基於 leader epoch 的工作機制,我們接下來看看它是如何解決水印備份缺陷的:

(1)解決數據丟失:

如上圖所示,A 重啟之后,發送 LeaderEpochRequest 請求給 B,由於 B 還沒追加消息,此時 epoch = request epoch = 0,因此返回 LastOffset = leader LEO = 2 給 A,A 拿到 LastOffset 之后,發現等於當前 LEO 值,故不用進行日志截斷。就在這時 B 宕機了,A 成為 leader,在 B 啟動回來后,會重復 A 的動作,同樣不需要進行日志截斷,數據沒有丟失。

(2)解決數據不一致/離散

如上圖所示,A 和 B 同時宕機后,B 先重啟回來成為分區 leader,這時候生產者發送了一條消息過來,leader epoch 更新到 1,此時 A 啟動回來后,發送 LeaderEpochRequest(follower epoch = 0) 給 B,B 判斷 follower epoch 不等於 最新的 epoch,於是找到大於 follower epoch 最小的 epoch = 1,即 LastOffset = epoch start offset = 1,A 拿到 LastOffset 后,判斷小於當前 LEO 值,於是從 LastOffset 位置進行日志截斷,接着開始發送 fetch 請求給 B 開始同步消息,避免了消息不一致/離散的問題。

更多精彩文章請關注作者維護的公眾號「后端進階」,這是一個專注后端相關技術的公眾號。
關注公眾號並回復「后端」免費領取后端相關電子書籍。
歡迎分享,轉載請保留出處。

公眾號「后端進階」,專注后端技術分享!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM