Kafka副本管理—— 為何去掉replica.lag.max.messages參數


  今天查看Kafka 0.10.0的官方文檔,發現了這樣一句話:Configuration parameter replica.lag.max.messages was removed. Partition leaders will no longer consider the number of lagging messages when deciding which replicas are in sync. 即replica.lag.max.messages參數被正式地移除了,現在topic每個分區的leader副本都不再使用這個參數作為判斷follower副本同步狀態的依據。看到之后頓覺十分好奇於是抽出半天時間仔細研究了一下,終於弄明白了移除該參數的原因,特此記錄一下。

  首先我們來看一下這個參數本來的含義: If a replica falls more than this many messages behind the leader, the leader will remove the follower from ISR and treat it as dead. 即如果某個副本落后leader副本的消息數超過了這個值,那么leader副本就會把該follower副本從ISR中移除。Kafka 0.8.2.2的代碼是這樣使用該參數的:

val stuckReplicas = candidateReplicas.filter(r => (time.milliseconds - r.logEndOffsetUpdateTimeMs) > keepInSyncTimeMs)
val slowReplicas = candidateReplicas.filter(r =>
      r.logEndOffset.messageOffset >= 0 &&
      leaderLogEndOffset.messageOffset - r.logEndOffset.messageOffset > keepInSyncMessages)  // keepInSyncMessages即replica.lag.max.messages的值

   這段代碼表明若分區leader副本的結束位移(以下皆稱LEO, log end offset)與該follower副本LEO的差值超過了這個閾值,那么就會被視為slow副本,並加入到slowReplicas集合中。該集合中的所有副本都將被認為是與leader副本不同步(out of sync)。但是Kafka 0.9之后的代碼變成了這個樣子:

val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)

顯然,新版本(0.9及以后)的Kafka只使用一個參數來確定滯后副本(lagging replica),而不再使用replica.lag.max.messages參數。這是因為什么原因呢?

  在詳細解釋此事之前我們先明確一些公共的術語以方便后續的討論:

  • AR:assigned replicas。通常情況下,每個分區都會被分配多個副本。具體的副本數量由參數offsets.topic.replication.factor指定。分區的AR數據保存在Zookeeper的/brokers/topics/<topic>節點中
  • ISR:in-sync replicas。與leader副本保持同步狀態的副本集合(leader副本本身也在ISR中)。ISR數據保存在Zookeeper的/brokers/topics/<topic>/partitions/<partitionId>/state節點中
  • High Watermark:副本高水位值,簡稱HW,它表示該分區最新一條已提交消息(committed message)的位移
  • LEO:log end offset。從名字上來看似乎是日志結束位移,但其實是下一條消息的位移,即追加寫下一條消息的位移

  值得一提的,HW表示的是最新一條已提交消息的位移。注意這里是已提交的,說明這條消息已經完全備份過了(fully replicated),而LEO可能會比HW值大——因為對於分區的leader副本而言,它的日志隨時會被追加寫入新消息,而這些新消息很可能還沒有被完全復制到其他follower副本上,所以LEO值可能會比HW值大。兩者的關系可參考下圖:

消費者只能消費到HW線以下的消息,即上圖中綠色的部分;而紫色的消息就是未完全備份的消息,因而不能被消費者消費。

  明白了這些術語之后,還有個問題需要研究下: follower部分與leader副本不同步,這是什么意思?不同步(out of sync)意味着follower副本無法追上leader副本的LEO,而這又是什么意思呢?我們舉個簡單的例子來說明。設想我們有一個topic,它只有一個分區,備份因子是3。假設這三個副本分別保存在broker1,broker2和broker3上。leader副本在broker1上,其他兩個broker上的副本都是follower副本,且當前所有的副本都在ISR中。現在我們設置replica.lag.max.messages等於4——表示只要follower副本落后leader副本的消息數小於4,該follower副本就不會被踢出ISR。如果此時有個producer程序每次給這個topic發送3條消息,那么初始狀態如下:

 

很顯然,目前2個follower副本與leader副本是同步的,即它們都能追上leader副本的LEO。假設此時producer生產了1條新消息給leader副本,而同時broker3上的follower副本經歷了一次Full GC,那么現在的日志狀態如下圖:

 

從上圖可以發現,leader副本的HW值和LEO值已然變得不一樣了。不過更重要的是,最新生產的這條消息是不會被視為“已提交”的,除非broker3被踢出ISR或者broker3上的follower副本追上了leader的LEO。由於replica.lag.max.messages=4,而broker3上的follower副本也只是落后leader副本1條消息,所以此時broker3上的副本並不滿足條件因而也不會被踢出ISR。對於broker3上的副本而言,事情變得相當簡單——只需追上leader的LEO即可。如果我們假設broker3因為Full GC停頓了100ms之后追上了leader的進度,那么此時的日志狀態應該如下圖所示:

 此時一切都很完美了,leader的HW值與LEO值相同;2個follower副本都與leader副本是同步的。

   那么有什么可能的原因會使得follower副本與leader副本不同步呢?歸納起來有三種原因:

  • 速度跟不上——follower副本在一段時間內都沒法追上leader副本的消息寫入速度,比如follower副本所在broker的網絡IO開銷過大導致備份消息的速度慢於從leader處獲取消息的速度
  • 進程卡住了——follower副本在一段時間內根本就沒有向leader副本發起FetchRequest請求(該請求就是獲取消息數據),比如太過頻繁的GC或其他失敗導致
  • 新創建的——如果用戶增加了備份因子,很顯然新follower副本在啟動過程初始肯定是全力追趕leader副本,因而與其是不同步的

  replica.lag.max.messags參數就是用於檢測第一種情況的。當然Kafka還提供了一個參數 replica.lag.time.max.ms來檢測另外兩種情況。比如如果設置 replica.lag.time.max.ms=500ms,只要follower副本每隔500ms都能發送FetchRequest請求給leader,那么該副本就不會被標記成dead從而被踢出ISR。

  由於本文重點關注replica.lag.max.messages參數,那么我們來說一下Kafka檢測第一種情況會碰到的問題。回到之前提到的那個例子,如果producer一次性發送消息的速率是2條/秒,即一個batch都有2條消息,那么顯然設置replica.lag.max.messages=4是個相當安全且合適的數值。為什么? 因為在leader副本接收到producer發送過來的消息之后而follower副本開始備份這些消息之前,follower副本落后leader的消息數不會超過3條消息。但如果follower副本落后leader的消息數超過了3條,那么你肯定希望leader把這個特別慢的follower副本踢出ISR以防止增加producer消息生產的延時。從這個簡單的例子上來看,這個參數似乎工作得很好,為什么要移除它呢?根本原因在於如果要正確設置這個參數的值,需要用戶結合具體使用場景自己去評估——基於這個原因,新版本Kafka把這個參數移除了。

  好了,我來詳細解釋一下這個根本原因。首先,對於一個參數的設置,有一點是很重要的:用戶應該對他們知道的參數進行設置,而不是對他們需要進行猜測的參數進行設置。對於該參數來說,我們只能去猜它應該設置成哪些值,而不是根據我們的需要對其進行設置。為什么?舉個例子,假設在剛才那個topic的環境中producer程序突然發起了一波消息生產的瞬時高峰流量增加,比如producer現在一次性發送4條消息過來了,也就是說與replica.lag.max.messages值相等了。此時,這兩個follower副本都會被認為是與leader副本不同步了,從而被踢出ISR,具體日志狀態如下圖所示:

   從上圖看,這兩個follower副本與leader不再同步,但其實他們都是存活狀態(alive)的且沒有任何性能問題。那么在下次FetchRequest時它們就能追上leader的LEO,並重新被加入ISR——於是就出現了這樣的情況:它們不斷地被踢出ISR然后重新加回ISR,造成了與leader不同步、再同步、又不同步、再次同步的情況發生。想想就知道這是多大的開銷!問題的關鍵就在replica.lag.max.messages這個參數上。用戶通過猜測設置該值,猜測producer的速度,猜測leader副本的入站流量。

  可能有用戶會說該參數默認值是4000,應該足夠使用了吧。但有一點需要注意的是,這個參數是全局的!即所有topic都受到這個參數的影響。假設集群中有兩個topic: t1和t2。假設它們的流量差異非常巨大,t1的消息生產者一次性生產5000條消息,直接就突破了4000這個默認值;而另一個topic,t2,它的消息生產者一次性生產10條消息,那么Kafka就需要相當長的時間才能辨別出t2各個分區中那些滯后的副本。很顯然這種流量差異巨大的topic很容易地在同一個集群上部署,那么這種情況下replica.lag.max.messages參數應該設置什么值呢? 顯然沒有合適的值,對吧?

  綜上所述,新版本的Kafka去除了這個參數,改為只使用一個參數就能夠同時檢測由於slow以及由於進程卡殼而導致的滯后(lagging)——即follower副本落后leader副本的時間間隔。這個唯一的參數就是replica.lag.time.max.ms,默認是10秒。對於第2,3種不同步原因而言,該參數沒有什么具體的變化。但是對於第一種情況,檢測機制有了一些微調——如果一個follower副本落后leader的時間持續性地超過了這個閾值,那么這個副本就要被標記為dead從而被踢出ISR。這樣即使出現剛剛提到的producer瞬時峰值流量,只要follower沒有持續性地落后,它就不會反復地在ISR中移進移出。

  最后說一句,這是Kafka副本調優的一個需求,具體的細節詳見KIP-16 --- Automated Replica Lag Tuning 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM