之前寫過一篇關於Kafka High watermark的文章,引起的討論不少:有贊揚之聲,但更多的是針對文中的內容被challenge,於是下定決心找個晚上熬夜再看了一遍,昨晚挑燈通讀了一遍確實發現不少錯誤。鑒於此我決定再寫一篇博客重新梳理一下最新版本中High watermark(下稱HW)的工作原理,也算是糾正之前文章中的錯誤。這次我不打算說leader epoch,而只是專門討論HW、log end offset(日志末端位移,下稱LEO)的事情。希望我能把整個流程交代清楚。也許這篇文章依然有很多問題,到時候就懇請各位多多批評指正了:)
和之前第一篇一樣,我首先給出與HW、LEO相關的副本角色定義:
- leader副本:分區leader所在broker上面的Replica副本對象,不斷處理follower副本發送的FETCH請求
- follower副本:分區follower所在broker上面的Replica副本對象,不斷地向leader副本發送FETCH請求
- ISR副本:這實際上是一個副本集合,包含leader副本和所有與leader副本保持同步的follower副本。如何判定保持同步:replica.lag.time.max.ms時間內follower副本未發送任何FETCH請求或未趕上leader副本LEO則判定為不同步
每個Replica對象都有很多屬性或字段,和本文相關的是LEO、remote LEO和HW。
- LEO:日志末端位移(log end offset),記錄了該Replica對象底層日志(log字段)中下一條消息的位移值。注意是下一條消息!也就是說,如果一個普通topic(非compact策略,即cleanup.policy != compact)的某個分區副本的LEO是10,倘若未發生任何消息刪除操作,那么該副本當前保存了10條消息,位移值范圍是[0, 9]。此時若有一個producer向該副本插入一條消息,則該條消息的位移值是10,而副本LEO值則更新成11。
- remote LEO:嚴格來說這是一個集合。leader副本所在broker的內存中維護了一個Partition對象來保存對應的分區信息,這個Partition中維護了一個Replica列表,保存了該分區所有的副本對象。除了leader Replica副本之外,該列表中其他Replica對象的LEO就被稱為remote LEO,這些LEO值也是要被更新的。
- HW:上一篇中我是這么描述HW值的——“水位值,對於同一個副本對象而言其HW值不會大於LEO值。小於等於HW值的所有消息都被認為是‘已備份’的”—— 嚴格來說,我這里說錯了。實際上HW值也是指向下一條消息,因此應該這樣說:小於HW或在HW以下的消息被認為是“已備份的”。另外上篇文章中的配圖也是錯誤的,如下所示:

之前文章中說HW是7時,位移介於[0, 7]的所有消息都是committed狀態。這種說法是有問題的,實際上,如果要讓[0, 7]的消息是committed狀態,那么HW值應該是8。當然關於LEO的表述是正確的,即:LEO=15表示這個副本當前有15條消息,最新一條消息的位移是14。另外我們總說consumer是無法消費未提交消息的。這句話如果用以上名詞來解讀的話,應該表述為:consumer無法消費分區leader副本中位移值大於等於分區HW值的任何消息。這里需要特別注意分區HW值就是leader副本的HW值。
說一些題外話~~~~~
在判斷能否消費某條消息時到底比較的是”小於HW“還是”小於等於HW",我個人傾向於認為是小於HW,即位移=HW值的消息是不能被消費的。我們可以從Log.scala的read方法簽名中證明這點:
/** * Read messages from the log. * * @param startOffset The offset to begin reading at * @param maxLength The maximum number of bytes to read * @param maxOffset The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set) <==== 注意這行!!! * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists) * @param isolationLevel The isolation level of the fetcher. The READ_UNCOMMITTED isolation level has the traditional * read semantics (e.g. consumers are limited to fetching up to the high watermark). In * READ_COMMITTED, consumers are limited to fetching up to the last stable offset. Additionally, * in READ_COMMITTED, the transaction index is consulted after fetching to collect the list * of aborted transactions in the fetch range which the consumer uses to filter the fetched * records before they are returned to the user. Note that fetches from followers always use * READ_UNCOMMITTED. * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the log start offset * @return The fetch data information including fetch starting offset metadata and messages read. */ def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false, isolationLevel: IsolationLevel): FetchDataInfo = {
...
}
參數maxOffset是exclusive的,也就是說這個位移上的消息是不會被讀取的。下面這段代碼證明了在實際調用過程中read的maxOffset也是傳入的HW值:
def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
......
val initialHighWatermark = localReplica.highWatermark.messageOffset // <==== 注意這行!
......
val maxOffsetOpt = if (readOnlyCommitted) // follower拉取消息一定拉取committed的
Some(lastStableOffset.getOrElse(initialHighWatermark))
else
None
......
val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage, isolationLevel)
......
}
不過LogSegment.scala的read方法並未嚴格實現這一點,貌似又支持讀取offset=HW值的消息,如下所示:
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size,
minOneMessage: Boolean = false): FetchDataInfo = {
......
val fetchSize: Int = maxOffset match {
case None =>
// no max offset, just read until the max position
min((maxPosition - startPosition).toInt, adjustedMaxSize)
case Some(offset) =>
if (offset < startOffset) // 並未比較相等的情況,也就是說startOffset = HW值的話也可繼續讀取數據
return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false)
val mapping = translateOffset(offset, startPosition)
......
}
......
}
雖然在后面代碼中加入了HW位移值對應物理文件位置是否真實存在的判斷邏輯,但畢竟做了一次物理文件位置的轉換操作(調用LogSegment#translateOffset),應該說是做了一些無用功的——后續我可能會提一個jira跟社區討論一下此事,不過目前我們還是先認定HW值處的消息是不能被消費的。
在討論HW、LEO工作原理的時候,下面這張圖能夠很好地解釋leader副本對象和follower副本對象的主要區別:

如前所述,所有的副本對象都保存在分區Partition對象的一個列表中。為了區別leader副本和follower副本,上圖中我還是把它們拆開分別表示,這樣會更加清晰一些。圖中灰色字段表示不會被更新,也就是說leader Replica對象是不會更新remote HW值的(這里的remote含義與remote LEO相同)。有了這些概念我們現在可以討論更新時機的問題了:
一、上圖右邊的follower Replica對象何時更新LEO?
Follower副本使用專屬線程不斷地向leader副本所在broker發送FETCH請求,然后leader發送FETCH response給follower。Follower拿到response之后取出里面的數據寫入到本地底層日志中,在該過程中其LEO值會被更新。
二、上圖左邊的leader Replica對象何時更新LEO?
和follower Replica更新LEO道理相同,leader寫底層日志時就會自動地更新它的LEO值。對於leader來說何時會寫底層日志呢?最容易想到的一個場景就是producer生產消息時。由此可見,不管是在leader端還是在 follower端,只有寫入本地底層日志時才會觸發對本地Replica對象上LEO值的更新。
三、上圖左下的Other Replicas何時更新LEO?
首先思考一下為什么leader Partition對象需要保存所有Replica副本的LEO?事實上,它們的主要作用是幫助leader Replica對象確定其HW值之用,而由於leader Replica的HW值就是整個分區的HW值,故這些other Replicas實際上是用來確定分區HW值的。Other Replicas LEO值是在leader端broker處理FETCH請求過程中被更新的。當follower發送一個FETCH請求時,它會告訴leader要從那個位移值開始讀取,即FetchRequest中的fetchOffset字段。leader端在更新Other Replicas的LEO時會將其更新成這個fetchOffset值。
四、上圖右上的follower Replica對象何時更新HW?
Follower Replica對象更新HW是在其更新本地LEO之后。一旦follower向本地日志寫完數據后它就會嘗試更新其HW值。具體算法是取本地LEO與FETCH response中HW值的較小值,因此follower Replica的HW值不會大於其本地LEO值。
五、上圖左上的leader Replica對象何時更新HW?
前面說過了, leader Replica的HW值實際上就是分區的HW值,因此何時更新該值才是我們最關心的,因為它將直接影響分區數據對於consumer的可見性。以下4種情況Kafka會嘗試去更新leader Replica對象的HW值:
- 該Replica成為leader Replica時:當某個Replica成為分區的leader副本后,Kafka會嘗試去更新其HW值
- Broker崩潰導致副本被踢出ISR時:此時Kafka會執行ISR的縮減操作,故必須要檢查下分區HW值是否需要更新
- Producer向leader Replica寫入消息時:寫入消息會更新leader Replica的LEO,故有必要檢查下其HW值是否需要修改
- Leader Replica處理FETCH請求時:Leader Replica處理FETCH請求時在更新完Other Replicas的LEO后會嘗試更新其HW值
上面的條件揭示了一個重要的事實:如果沒有出現broker failure或leader變更等情形,分區HW值更新時機只可能有兩個:1. leader broker處理PRODUCE請求;2. leader broker處理FETCH請求。Leader Replica HW值變更的算法很簡單:首先找出leader Partition對象保存的所有與leader Replica保持同步的Replica對象(leader Replica + other Replicas)的LEO值,然后選擇其中航最小的LEO值作為分區HW值。這里的同步判斷條件有兩個:
- 該副本在ISR中
- 該副本LEO落后於leader Replica LEO的時間≤ replica.lag.time.max.ms
乍看上去好像這兩個條件說的是一回事,畢竟ISR的定義就是第二個條件描述的那樣。但在某些情況下Kafka的確可能出現follower副本已經“追上”了leader的進度,但卻不在ISR中——比如某個從failure中恢復的副本。如果Kafka只判斷第一個條件的話,確定分區HW值時就不會考慮這些未在ISR中的副本,但這些副本已經具備了“立刻進入ISR”的資格,因此就可能出現分區HW值越過ISR中副本LEO的情況——這肯定是不允許的,因為超過ISR副本LEO的那些消息屬於未提交消息。
在舉實際例子之前,我們先確認一下這些更新步驟的順序。首先是處理PRODUCE請求的邏輯順序:

之后是leader端broker處理FETCH請求:

最后是follower端broker處理FETCH response:

下面舉個一個實際的例子,該例子中的topic是單分區,副本因子是2。我們首先看下當producer發送一條消息時,leader/follower端broker的副本對象到底會發生什么事情以及分區HW是如何被更新的。首先是初始狀態:

此時producer給該topic分區發送了一條消息。此時的狀態如下圖所示:

如上圖所見,producer發送消息成功后(假設acks=1, leader成功寫入即返回),follower發來了新的FECTH請求,依然請求fetchOffset = 0的數據。和上次不同的是,這次是有數據可以讀取的,因此整個處理流程如下圖:

顯然,現在leader和follower都保存了位移是0的這條消息,但兩邊的HW值都沒有被更新,它們需要在下一輪FETCH請求處理中被更新,如下圖所示:

簡單解釋一下, 第二輪FETCH請求中,follower發送fetchOffset = 1的FETCH請求——因為fetchOffset = 0的消息已經成功寫入follower本地日志了,所以這次請求fetchOffset = 1的數據了。Leader端broker接收到FETCH請求后首先會更新other replicas中的LEO值,即將remote LEO更新成1,然后更新分區HW值為1——具體的更新規則參見上面的解釋。做完這些之后將當前分區HW值(1)封裝進FETCH response發送給follower。Follower端broker接收到FETCH response之后從中提取出當前分區HW值1,然后與自己的LEO值比較,從而將自己的HW值更新成1,至此完整的HW、LEO更新周期結束。
由上面的分析可知,兩邊HW值的更新是在后面一輪(如果有多個follower副本,也許是多輪)FETCH請求處理中完成的,這種“時間”上的錯配也是導致出現各種“數據丟失”或“數據不一致”的原因。基於此社區才引入了leader epoch機制試圖規避因使用HW而帶來的這個問題。不過本文並不關注leader epoch,只是單純希望我能把HW、LEO這件事情講明白。
