關於Kafka high watermark的討論2


  之前寫過一篇關於Kafka High watermark的文章,引起的討論不少:有贊揚之聲,但更多的是針對文中的內容被challenge,於是下定決心找個晚上熬夜再看了一遍,昨晚挑燈通讀了一遍確實發現不少錯誤。鑒於此我決定再寫一篇博客重新梳理一下最新版本中High watermark(下稱HW)的工作原理,也算是糾正之前文章中的錯誤。這次我不打算說leader epoch,而只是專門討論HW、log end offset(日志末端位移,下稱LEO)的事情。希望我能把整個流程交代清楚。也許這篇文章依然有很多問題,到時候就懇請各位多多批評指正了:)


  和之前第一篇一樣,我首先給出與HW、LEO相關的副本角色定義:

  1. leader副本:分區leader所在broker上面的Replica副本對象,不斷處理follower副本發送的FETCH請求
  2. follower副本:分區follower所在broker上面的Replica副本對象,不斷地向leader副本發送FETCH請求
  3. ISR副本:這實際上是一個副本集合,包含leader副本和所有與leader副本保持同步的follower副本。如何判定保持同步:replica.lag.time.max.ms時間內follower副本未發送任何FETCH請求或未趕上leader副本LEO則判定為不同步

每個Replica對象都有很多屬性或字段,和本文相關的是LEO、remote LEO和HW。

  1. LEO:日志末端位移(log end offset),記錄了該Replica對象底層日志(log字段)中下一條消息的位移值。注意是下一條消息!也就是說,如果一個普通topic(非compact策略,即cleanup.policy != compact)的某個分區副本的LEO是10,倘若未發生任何消息刪除操作,那么該副本當前保存了10條消息,位移值范圍是[0, 9]。此時若有一個producer向該副本插入一條消息,則該條消息的位移值是10,而副本LEO值則更新成11。
  2. remote LEO:嚴格來說這是一個集合。leader副本所在broker的內存中維護了一個Partition對象來保存對應的分區信息,這個Partition中維護了一個Replica列表,保存了該分區所有的副本對象。除了leader Replica副本之外,該列表中其他Replica對象的LEO就被稱為remote LEO,這些LEO值也是要被更新的。
  3. HW:上一篇中我是這么描述HW值的——“水位值,對於同一個副本對象而言其HW值不會大於LEO值。小於等於HW值的所有消息都被認為是‘已備份’的”—— 嚴格來說,我這里說錯了。實際上HW值也是指向下一條消息,因此應該這樣說:小於HW或在HW以下的消息被認為是“已備份的”。另外上篇文章中的配圖也是錯誤的,如下所示

  之前文章中說HW是7時,位移介於[0, 7]的所有消息都是committed狀態。這種說法是有問題的,實際上,如果要讓[0, 7]的消息是committed狀態,那么HW值應該是8。當然關於LEO的表述是正確的,即:LEO=15表示這個副本當前有15條消息,最新一條消息的位移是14。另外我們總說consumer是無法消費未提交消息的。這句話如果用以上名詞來解讀的話,應該表述為:consumer無法消費分區leader副本中位移值大於等於分區HW值的任何消息。這里需要特別注意分區HW值就是leader副本的HW值


說一些題外話~~~~~

  在判斷能否消費某條消息時到底比較的是”小於HW“還是”小於等於HW",我個人傾向於認為是小於HW,即位移=HW值的消息是不能被消費的。我們可以從Log.scala的read方法簽名中證明這點:

/**
   * Read messages from the log.
   *
   * @param startOffset The offset to begin reading at
   * @param maxLength The maximum number of bytes to read
   * @param maxOffset The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set) <==== 注意這行!!!
   * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists)
   * @param isolationLevel The isolation level of the fetcher. The READ_UNCOMMITTED isolation level has the traditional
   *                       read semantics (e.g. consumers are limited to fetching up to the high watermark). In
   *                       READ_COMMITTED, consumers are limited to fetching up to the last stable offset. Additionally,
   *                       in READ_COMMITTED, the transaction index is consulted after fetching to collect the list
   *                       of aborted transactions in the fetch range which the consumer uses to filter the fetched
   *                       records before they are returned to the user. Note that fetches from followers always use
   *                       READ_UNCOMMITTED.
   * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the log start offset
   * @return The fetch data information including fetch starting offset metadata and messages read.
   */
  def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false,
           isolationLevel: IsolationLevel): FetchDataInfo = {
...
}

  參數maxOffset是exclusive的,也就是說這個位移上的消息是不會被讀取的。下面這段代碼證明了在實際調用過程中read的maxOffset也是傳入的HW值:

def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
......
		val initialHighWatermark = localReplica.highWatermark.messageOffset // <==== 注意這行!
......
        val maxOffsetOpt = if (readOnlyCommitted) // follower拉取消息一定拉取committed的
          Some(lastStableOffset.getOrElse(initialHighWatermark))
        else
          None
......
            val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage, isolationLevel)
......
}  

  不過LogSegment.scala的read方法並未嚴格實現這一點,貌似又支持讀取offset=HW值的消息,如下所示:

def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size,
           minOneMessage: Boolean = false): FetchDataInfo = {
......
    val fetchSize: Int = maxOffset match {
      case None =>
        // no max offset, just read until the max position
        min((maxPosition - startPosition).toInt, adjustedMaxSize)
      case Some(offset) =>
        if (offset < startOffset) // 並未比較相等的情況,也就是說startOffset = HW值的話也可繼續讀取數據
          return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false)
        val mapping = translateOffset(offset, startPosition)
        ......
    }
......
}  

  雖然在后面代碼中加入了HW位移值對應物理文件位置是否真實存在的判斷邏輯,但畢竟做了一次物理文件位置的轉換操作(調用LogSegment#translateOffset),應該說是做了一些無用功的——后續我可能會提一個jira跟社區討論一下此事,不過目前我們還是先認定HW值處的消息是不能被消費的。


  在討論HW、LEO工作原理的時候,下面這張圖能夠很好地解釋leader副本對象和follower副本對象的主要區別: 

 

   如前所述,所有的副本對象都保存在分區Partition對象的一個列表中。為了區別leader副本和follower副本,上圖中我還是把它們拆開分別表示,這樣會更加清晰一些。圖中灰色字段表示不會被更新,也就是說leader Replica對象是不會更新remote HW值的(這里的remote含義與remote LEO相同)。有了這些概念我們現在可以討論更新時機的問題了:

一、上圖右邊的follower Replica對象何時更新LEO?

  Follower副本使用專屬線程不斷地向leader副本所在broker發送FETCH請求,然后leader發送FETCH response給follower。Follower拿到response之后取出里面的數據寫入到本地底層日志中,在該過程中其LEO值會被更新。

二、上圖左邊的leader Replica對象何時更新LEO?

  和follower Replica更新LEO道理相同,leader寫底層日志時就會自動地更新它的LEO值。對於leader來說何時會寫底層日志呢?最容易想到的一個場景就是producer生產消息時。由此可見,不管是在leader端還是在 follower端,只有寫入本地底層日志時才會觸發對本地Replica對象上LEO值的更新。

三、上圖左下的Other Replicas何時更新LEO?

  首先思考一下為什么leader Partition對象需要保存所有Replica副本的LEO?事實上,它們的主要作用是幫助leader Replica對象確定其HW值之用,而由於leader Replica的HW值就是整個分區的HW值,故這些other Replicas實際上是用來確定分區HW值的。Other Replicas LEO值是在leader端broker處理FETCH請求過程中被更新的。當follower發送一個FETCH請求時,它會告訴leader要從那個位移值開始讀取,即FetchRequest中的fetchOffset字段。leader端在更新Other Replicas的LEO時會將其更新成這個fetchOffset值。

四、上圖右上的follower Replica對象何時更新HW?

  Follower Replica對象更新HW是在其更新本地LEO之后。一旦follower向本地日志寫完數據后它就會嘗試更新其HW值。具體算法是取本地LEO與FETCH response中HW值的較小值,因此follower Replica的HW值不會大於其本地LEO值。

五、上圖左上的leader Replica對象何時更新HW?

  前面說過了, leader Replica的HW值實際上就是分區的HW值,因此何時更新該值才是我們最關心的,因為它將直接影響分區數據對於consumer的可見性。以下4種情況Kafka會嘗試去更新leader Replica對象的HW值:

  1. 該Replica成為leader Replica時:當某個Replica成為分區的leader副本后,Kafka會嘗試去更新其HW值
  2. Broker崩潰導致副本被踢出ISR時:此時Kafka會執行ISR的縮減操作,故必須要檢查下分區HW值是否需要更新
  3. Producer向leader Replica寫入消息時:寫入消息會更新leader Replica的LEO,故有必要檢查下其HW值是否需要修改
  4. Leader Replica處理FETCH請求時:Leader Replica處理FETCH請求時在更新完Other Replicas的LEO后會嘗試更新其HW值 

  上面的條件揭示了一個重要的事實:如果沒有出現broker failure或leader變更等情形,分區HW值更新時機只可能有兩個:1. leader broker處理PRODUCE請求;2. leader broker處理FETCH請求。Leader Replica HW值變更的算法很簡單:首先找出leader Partition對象保存的所有與leader Replica保持同步的Replica對象(leader Replica + other Replicas)的LEO值,然后選擇其中航最小的LEO值作為分區HW值。這里的同步判斷條件有兩個:

  1. 該副本在ISR中
  2. 該副本LEO落后於leader Replica LEO的時間≤ replica.lag.time.max.ms

   乍看上去好像這兩個條件說的是一回事,畢竟ISR的定義就是第二個條件描述的那樣。但在某些情況下Kafka的確可能出現follower副本已經“追上”了leader的進度,但卻不在ISR中——比如某個從failure中恢復的副本。如果Kafka只判斷第一個條件的話,確定分區HW值時就不會考慮這些未在ISR中的副本,但這些副本已經具備了“立刻進入ISR”的資格,因此就可能出現分區HW值越過ISR中副本LEO的情況——這肯定是不允許的,因為超過ISR副本LEO的那些消息屬於未提交消息。

  在舉實際例子之前,我們先確認一下這些更新步驟的順序。首先是處理PRODUCE請求的邏輯順序:

之后是leader端broker處理FETCH請求:

最后是follower端broker處理FETCH response:

  下面舉個一個實際的例子,該例子中的topic是單分區,副本因子是2。我們首先看下當producer發送一條消息時,leader/follower端broker的副本對象到底會發生什么事情以及分區HW是如何被更新的。首先是初始狀態:

此時producer給該topic分區發送了一條消息。此時的狀態如下圖所示:

 

如上圖所見,producer發送消息成功后(假設acks=1, leader成功寫入即返回),follower發來了新的FECTH請求,依然請求fetchOffset = 0的數據。和上次不同的是,這次是有數據可以讀取的,因此整個處理流程如下圖:

 顯然,現在leader和follower都保存了位移是0的這條消息,但兩邊的HW值都沒有被更新,它們需要在下一輪FETCH請求處理中被更新,如下圖所示:

  簡單解釋一下, 第二輪FETCH請求中,follower發送fetchOffset = 1的FETCH請求——因為fetchOffset = 0的消息已經成功寫入follower本地日志了,所以這次請求fetchOffset = 1的數據了。Leader端broker接收到FETCH請求后首先會更新other replicas中的LEO值,即將remote LEO更新成1,然后更新分區HW值為1——具體的更新規則參見上面的解釋。做完這些之后將當前分區HW值(1)封裝進FETCH response發送給follower。Follower端broker接收到FETCH response之后從中提取出當前分區HW值1,然后與自己的LEO值比較,從而將自己的HW值更新成1,至此完整的HW、LEO更新周期結束。

  由上面的分析可知,兩邊HW值的更新是在后面一輪(如果有多個follower副本,也許是多輪)FETCH請求處理中完成的,這種“時間”上的錯配也是導致出現各種“數據丟失”或“數據不一致”的原因。基於此社區才引入了leader epoch機制試圖規避因使用HW而帶來的這個問題。不過本文並不關注leader epoch,只是單純希望我能把HW、LEO這件事情講明白。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM