本文主要討論0.11版本之前Kafka的副本備份機制的設計問題以及0.11是如何解決的。簡單來說,0.11之前副本備份機制主要依賴水位(或水印)的概念,而0.11采用了leader epoch來標識備份進度。后面我們會詳細討論兩種機制的差異。不過首先先做一些基本的名詞含義解析。
水位或水印(watermark)一詞,也可稱為高水位(high watermark),通常被用在流式處理領域(比如Apache Flink、Apache Spark等),以表征元素或事件在基於時間層面上的進度。一個比較經典的表述為:流式系統保證在水位t時刻,創建時間(event time) = t'且t' ≤ t的所有事件都已經到達或被觀測到。在Kafka中,水位的概念反而與時間無關,而是與位置信息相關。嚴格來說,它表示的就是位置信息,即位移(offset)。網上有一些關於Kafka watermark的介紹,本不應再贅述,但鑒於本文想要重點強調的leader epoch與watermark息息相關,故這里再費些篇幅闡述一下watermark。注意:由於Kafka源碼中使用的名字是高水位,故本文將始終使用high watermaker或干脆簡稱為HW。
Kafka分區下有可能有很多個副本(replica)用於實現冗余,從而進一步實現高可用。副本根據角色的不同可分為3類:
- leader副本:響應clients端讀寫請求的副本
- follower副本:被動地備份leader副本中的數據,不能響應clients端讀寫請求。
- ISR副本:包含了leader副本和所有與leader副本保持同步的follower副本——如何判定是否與leader同步后面會提到
每個Kafka副本對象都有兩個重要的屬性:LEO和HW。注意是所有的副本,而不只是leader副本。
- LEO:即日志末端位移(log end offset),記錄了該副本底層日志(log)中下一條消息的位移值。注意是下一條消息!也就是說,如果LEO=10,那么表示該副本保存了10條消息,位移值范圍是[0, 9]。另外,leader LEO和follower LEO的更新是有區別的。我們后面會詳細說
- HW:即上面提到的水位值。對於同一個副本對象而言,其HW值不會大於LEO值。小於等於HW值的所有消息都被認為是“已備份”的(replicated)。同理,leader副本和follower副本的HW更新是有區別的,我們后面詳談。
我們使用下圖來形象化地說明兩者的關系:
上圖中,HW值是7,表示位移是0~7的所有消息都已經處於“已備份狀態”(committed),而LEO值是15,那么8~14的消息就是尚未完全備份(fully replicated)——為什么沒有15?因為剛才說過了,LEO指向的是下一條消息到來時的位移,故上圖使用虛線框表示。我們總說consumer無法消費未提交消息。這句話如果用以上名詞來解讀的話,應該表述為:consumer無法消費分區下leader副本中位移值大於分區HW的任何消息。這里需要特別注意分區HW就是leader副本的HW值。
既然副本分為leader副本和follower副本,而每個副本又都有HW和LEO,那么它們是怎么被更新的呢?它們更新的機制又有什么區別呢?我們一一來分析下:
一、follower副本何時更新LEO?
如前所述,follower副本只是被動地向leader副本請求數據,具體表現為follower副本不停地向leader副本所在的broker發送FETCH請求,一旦獲取消息后寫入自己的日志中進行備份。那么follower副本的LEO是何時更新的呢?首先我必須言明,Kafka有兩套follower副本LEO(明白這個是搞懂后面內容的關鍵,因此請多花一點時間來思考):1. 一套LEO保存在follower副本所在broker的副本管理機中;2. 另一套LEO保存在leader副本所在broker的副本管理機中——換句話說,leader副本機器上保存了所有的follower副本的LEO。
為什么要保存兩套?這是因為Kafka使用前者幫助follower副本更新其HW值;而利用后者幫助leader副本更新其HW使用。下面我們分別看下它們被更新的時機。
1. follower副本端的follower副本LEO何時更新?
follower副本端的LEO值就是其底層日志的LEO值,也就是說每當新寫入一條消息,其LEO值就會被更新(類似於LEO += 1)。當follower發送FETCH請求后,leader將數據返回給follower,此時follower開始向底層log寫數據,從而自動地更新LEO值
2. leader副本端的follower副本LEO何時更新?
leader副本端的follower副本LEO的更新發生在leader在處理follower FETCH請求時。一旦leader接收到follower發送的FETCH請求,它首先會從自己的log中讀取相應的數據,但是在給follower返回數據之前它先去更新follower的LEO(即上面所說的第二套LEO)
二、follower副本何時更新HW?
follower更新HW發生在其更新LEO之后,一旦follower向log寫完數據,它會嘗試更新它自己的HW值。具體算法就是比較當前LEO值與FETCH響應中leader的HW值,取兩者的小者作為新的HW值。這告訴我們一個事實:如果follower的LEO值超過了leader的HW值,那么follower HW值是不會越過leader HW值的。
三、leader副本何時更新LEO?
和follower更新LEO道理相同,leader寫log時就會自動地更新它自己的LEO值。
四、leader副本何時更新HW值?
前面說過了,leader的HW值就是分區HW值,因此何時更新這個值是我們最關心的,因為它直接影響了分區數據對於consumer的可見性 。以下4種情況下leader會嘗試去更新分區HW——切記是嘗試,有可能因為不滿足條件而不做任何更新:
- 副本成為leader副本時:當某個副本成為了分區的leader副本,Kafka會嘗試去更新分區HW。這是顯而易見的道理,畢竟分區leader發生了變更,這個副本的狀態是一定要檢查的!不過,本文討論的是當系統穩定后且正常工作時備份機制可能出現的問題,故這個條件不在我們的討論之列。
- broker出現崩潰導致副本被踢出ISR時:若有broker崩潰則必須查看下是否會波及此分區,因此檢查下分區HW值是否需要更新是有必要的。本文不對這種情況做深入討論
- producer向leader副本寫入消息時:因為寫入消息會更新leader的LEO,故有必要再查看下HW值是否也需要修改
- leader處理follower FETCH請求時:當leader處理follower的FETCH請求時首先會從底層的log讀取數據,之后會嘗試更新分區HW值
特別注意上面4個條件中的最后兩個。它揭示了一個事實——當Kafka broker都正常工作時,分區HW值的更新時機有兩個:leader處理PRODUCE請求時和leader處理FETCH請求時。另外,leader是如何更新它的HW值的呢?前面說過了,leader broker上保存了一套follower副本的LEO以及它自己的LEO。當嘗試確定分區HW時,它會選出所有滿足條件的副本,比較它們的LEO(當然也包括leader自己的LEO),並選擇最小的LEO值作為HW值。這里的滿足條件主要是指副本要滿足以下兩個條件之一:
- 處於ISR中
- 副本LEO落后於leader LEO的時長不大於replica.lag.time.max.ms參數值(默認是10s)
乍看上去好像這兩個條件說得是一回事,畢竟ISR的定義就是第二個條件描述的那樣。但某些情況下Kafka的確可能出現副本已經“追上”了leader的進度,但卻不在ISR中——比如某個從failure中恢復的副本。如果Kafka只判斷第一個條件的話,確定分區HW值時就不會考慮這些未在ISR中的副本,但這些副本已經具備了“立刻進入ISR”的資格,因此就可能出現分區HW值越過ISR中副本LEO的情況——這肯定是不允許的,因為分區HW實際上就是ISR中所有副本LEO的最小值。
好了,理論部分我覺得說的差不多了,下面舉個實際的例子。我們假設有一個topic,單分區,副本因子是2,即一個leader副本和一個follower副本。我們看下當producer發送一條消息時,broker端的副本到底會發生什么事情以及分區HW是如何被更新的。
下圖是初始狀態,我們稍微解釋一下:初始時leader和follower的HW和LEO都是0(嚴格來說源代碼會初始化LEO為-1,不過這不影響之后的討論)。leader中的remote LEO指的就是leader端保存的follower LEO,也被初始化成0。此時,producer沒有發送任何消息給leader,而follower已經開始不斷地給leader發送FETCH請求了,但因為沒有數據因此什么都不會發生。值得一提的是,follower發送過來的FETCH請求因為無數據而暫時會被寄存到leader端的purgatory中,待500ms(replica.fetch.wait.max.ms參數)超時后會強制完成。倘若在寄存期間producer端發送過來數據,那么會Kafka會自動喚醒該FETCH請求,讓leader繼續處理之。
雖然purgatory不是本文的重點,但FETCH請求發送和PRODUCE請求處理的時機會影響我們的討論。因此后續我們也將分兩種情況來討論分區HW的更新。
第一種情況:follower發送FETCH請求在leader處理完PRODUCE請求之后
producer給該topic分區發送了一條消息。此時的狀態如下圖所示:
如圖所示,leader接收到PRODUCE請求主要做兩件事情:
- 把消息寫入寫底層log(同時也就自動地更新了leader的LEO)
- 嘗試更新leader HW值(前面leader副本何時更新HW值一節中的第三個條件觸發)。我們已經假設此時follower尚未發送FETCH請求,那么leader端保存的remote LEO依然是0,因此leader會比較它自己的LEO值和remote LEO值,發現最小值是0,與當前HW值相同,故不會更新分區HW值
所以,PRODUCE請求處理完成后leader端的HW值依然是0,remote LEO也是0,而LEO是1。假設此時follower發送了FETCH請求(或者說follower早已發送了FETCH請求,只不過在broker的請求隊列中排隊),那么狀態變更如下圖所示:
本例中當follower發送FETCH請求時,leader端的處理依次是:
- 讀取底層log數據
- 更新remote LEO = 0(為什么是0? 因為此時follower還沒有寫入這條消息。leader如何確認follower還未寫入呢?這是通過follower發來的FETCH請求中的fetch offset來確定的)
- 嘗試更新分區HW——此時leader LEO = 1,remote LEO = 0,故分區HW值= min(leader LEO, follower remote LEO) = 0
- 把數據和當前分區HW值(依然是0)發送給follower副本
而follower副本接收到FETCH response后依次執行下列操作:
- 寫入本地log(同時更新follower LEO)
- 更新follower HW——比較本地LEO和當前leader HW取小者,故follower HW = 0
此時,第一輪FETCH RPC結束,我們會發現雖然leader和follower都已經在log中保存了這條消息,但分區HW值尚未被更新。實際上,它是在第二輪FETCH RPC中被更新的,如下圖所示:
上圖中,follower發來了第二輪FETCH請求,leader端接收到后仍然會依次執行下列操作:
- 讀取底層log數據
- 更新remote LEO = 1(這次為什么是1了? 因為這輪FETCH RPC攜帶的fetch offset是1,那么為什么這輪攜帶的就是1了呢,因為上一輪結束后follower LEO被更新為1了)
- 嘗試更新分區HW——此時leader LEO = 1,remote LEO = 1,故分區HW值= min(leader LEO, follower remote LEO) = 1。注意分區HW值此時被更新了!!!
- 把數據(實際上沒有數據)和當前分區HW值(已更新為1)發送給follower副本(上圖的response(leader HW = 0)錯了,應該是response(leader HW = 1))
同樣地,follower副本接收到FETCH response后依次執行下列操作:
- 寫入本地log,當然沒東西可寫,故follower LEO也不會變化,依然是1
- 更新follower HW——比較本地LEO和當前leader HW取小者。由於此時兩者都是1,故更新follower HW = 1 (注意:我特意用了兩種顏色來描述這兩步,后續會談到原因!)
Okay,producer端發送消息后broker端完整的處理流程就講完了。此時消息已經成功地被復制到leader和follower的log中且分區HW是1,表明consumer能夠消費offset = 0的這條消息。下面我們來分析下PRODUCE和FETCH請求交互的第二種情況。
第二種情況:FETCH請求保存在purgatory中PRODUCE請求到來
這種情況實際上和第一種情況差不多。前面說過了,當leader無法立即滿足FECTH返回要求的時候(比如沒有數據),那么該FETCH請求會被暫存到leader端的purgatory中,待時機成熟時會嘗試再次處理它。不過Kafka不會無限期地將其緩存着,默認有個超時時間(500ms),一旦超時時間已過,則這個請求會被強制完成。不過我們要討論的場景是在寄存期間,producer發送PRODUCE請求從而使之滿足了條件從而被喚醒。此時,leader端處理流程如下:
- leader寫入本地log(同時自動更新leader LEO)
- 嘗試喚醒在purgatory中寄存的FETCH請求
- 嘗試更新分區HW
至於喚醒后的FETCH請求的處理與第一種情況完全一致,故這里不做詳細展開了。
以上所有的東西其實就想說明一件事情:Kafka使用HW值來決定副本備份的進度,而HW值的更新通常需要額外一輪FETCH RPC才能完成,故而這種設計是有問題的。它們可能引起的問題包括:
- 備份數據丟失
- 備份數據不一致
我們一一分析下:
一、數據丟失
如前所述,使用HW值來確定備份進度時其值的更新是在下一輪RPC中完成的。現在翻到上面使用兩種不同顏色標記的步驟處思考下, 如果follower副本在藍色標記的第一步與紫色標記的第二步之間發生崩潰,那么就有可能造成數據的丟失。我們舉個例子來看下。
上圖中有兩個副本:A和B。開始狀態是A是leader。我們假設producer端min.insync.replicas設置為1,那么當producer發送兩條消息給A后,A寫入到底層log,此時Kafka會通知producer說這兩條消息寫入成功。
但是在broker端,leader和follower底層的log雖都寫入了2條消息且分區HW已經被更新到2,但follower HW尚未被更新(也就是上面紫色顏色標記的第二步尚未執行)。倘若此時副本B所在的broker宕機,那么重啟回來后B會自動把LEO調整到之前的HW值,故副本B會做日志截斷(log truncation),將offset = 1的那條消息從log中刪除,並調整LEO = 1,此時follower副本底層log中就只有一條消息,即offset = 0的消息。
B重啟之后需要給A發FETCH請求,但若A所在broker機器在此時宕機,那么Kafka會令B成為新的leader,而當A重啟回來后也會執行日志截斷,將HW調整回1。這樣,位移=1的消息就從兩個副本的log中被刪除,即永遠地丟失了。
這個場景丟失數據的前提是在min.insync.replicas=1時,一旦消息被寫入leader端log即被認為是“已提交”,而延遲一輪FETCH RPC更新HW值的設計使得follower HW值是異步延遲更新的,倘若在這個過程中leader發生變更,那么成為新leader的follower的HW值就有可能是過期的,使得clients端認為是成功提交的消息被刪除。
二、leader/follower數據離散
除了可能造成的數據丟失以外,這種設計還有一個潛在的問題,即造成leader端log和follower端log的數據不一致。比如leader端保存的記錄序列是r1,r2,r3,r4,r5,....;而follower端保存的序列可能是r1,r3,r4,r5,r6...。這也是非法的場景,因為顧名思義,follower必須追隨leader,完整地備份leader端的數據。
我們依然使用一張圖來說明這種場景是如何發生的:
這種情況的初始狀態與情況1有一些不同的:A依然是leader,A的log寫入了2條消息,但B的log只寫入了1條消息。分區HW更新到2,但B的HW還是1,同時producer端的min.insync.replicas = 1。
這次我們讓A和B所在機器同時掛掉,然后假設B先重啟回來,因此成為leader,分區HW = 1。假設此時producer發送了第3條消息(綠色框表示)給B,於是B的log中offset = 1的消息變成了綠色框表示的消息,同時分區HW更新到2(A還沒有回來,就B一個副本,故可以直接更新HW而不用理會A)之后A重啟回來,需要執行日志截斷,但發現此時分區HW=2而A之前的HW值也是2,故不做任何調整。此后A和B將以這種狀態繼續正常工作。
顯然,這種場景下,A和B底層log中保存在offset = 1的消息是不同的記錄,從而引發不一致的情形出現。
Kafka 0.11.0.0.版本解決方案
造成上述兩個問題的根本原因在於HW值被用於衡量副本備份的成功與否以及在出現failture時作為日志截斷的依據,但HW值的更新是異步延遲的,特別是需要額外的FETCH請求處理流程才能更新,故這中間發生的任何崩潰都可能導致HW值的過期。鑒於這些原因,Kafka 0.11引入了leader epoch來取代HW值。Leader端多開辟一段內存區域專門保存leader的epoch信息,這樣即使出現上面的兩個場景也能很好地規避這些問題。
所謂leader epoch實際上是一對值:(epoch,offset)。epoch表示leader的版本號,從0開始,當leader變更過1次時epoch就會+1,而offset則對應於該epoch版本的leader寫入第一條消息的位移。因此假設有兩對值:
(0, 0)
(1, 120)
則表示第一個leader從位移0開始寫入消息;共寫了120條[0, 119];而第二個leader版本號是1,從位移120處開始寫入消息。
leader broker中會保存這樣的一個緩存,並定期地寫入到一個checkpoint文件中。
當leader寫底層log時它會嘗試更新整個緩存——如果這個leader首次寫消息,則會在緩存中增加一個條目;否則就不做更新。而每次副本重新成為leader時會查詢這部分緩存,獲取出對應leader版本的位移,這就不會發生數據不一致和丟失的情況。
下面我們依然使用圖的方式來說明下利用leader epoch如何規避上述兩種情況
一、規避數據丟失
上圖左半邊已經給出了簡要的流程描述,這里不詳細展開具體的leader epoch實現細節(比如OffsetsForLeaderEpochRequest的實現),我們只需要知道每個副本都引入了新的狀態來保存自己當leader時開始寫入的第一條消息的offset以及leader版本。這樣在恢復的時候完全使用這些信息而非水位來判斷是否需要截斷日志。
二、規避數據不一致
同樣的道理,依靠leader epoch的信息可以有效地規避數據不一致的問題。
總結
0.11.0.0版本的Kafka通過引入leader epoch解決了原先依賴水位表示副本進度可能造成的數據丟失/數據不一致問題。有興趣的讀者可以閱讀源代碼進一步地了解其中的工作原理。
源代碼位置:kafka.server.epoch.LeaderEpochCache.scala (leader epoch數據結構)、kafka.server.checkpoints.LeaderEpochCheckpointFile(checkpoint檢查點文件操作類)還有分布在Log中的CRUD操作。
參考:
KIP-101 - Alter Replication Protocol to use Leader Epoch rather than High Watermark for Truncation