Kafka的普及在很大程度上歸功於它的設計和操作簡單,如何自動調優Kafka副本的工作,挑戰之一:如何避免follower進入和退出同步副本列表(即ISR)。如果某些topic的部分partition長期處於“under replicated”狀態,會增加數據丟失的概率。Kafka通過“多副本機制”實現高可用,當Kafka集群中一個Broker失效情況下仍然保證服務可用。
Kafka日志復制算法保證,如果leader發生故障或掛掉,一個新leader被選舉並且客戶端的消息成功寫入。Kafka確保從同步副本列表中選舉一個副本為leader。
0.副本知識
每個Partition有一個預寫式日志文件,每個Partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到Partition中,Partition中的每個消息都有一個連續的序列號叫做offset, 確定它在分區日志中唯一的位置。

- leader處理對這個partition的所有讀寫請求。
- follower會去復制leader上的數據。
1. in sync 條件
Leader負責跟蹤同步副本列表中所有follower滯后狀態。
同步中的(in sync),Kafka判斷一個節點是否活着有兩個條件:
- 節點必須可以維護和ZooKeeper的連接,Zookeeper通過心跳機制檢查每個節點的連接。——由參數
request.required.acks決定,如果是這個參數生效而移除一個follower,說明這個follower 失效或者死亡。 - 如果節點是個follower,他必須能及時的同步leader的寫操作,延時不能太久。—— 由參數
replica.lag.max.messages決定的,如果是這個參數生效而移除一個follower,說明這個follow是一個“慢副本”。
- 一條消息只有被“in sync” list里的所有follower都從leader復制過去才會被認為已提交。這樣就避免了部分數據被寫進了leader,還沒來得及被任何follower復制就宕機了,而造成數據丟失。
- 而對於producer而言,它可以選擇是否等待消息commit,這可以通過request.required.acks來設置。
- 對於Consumer而言,只能看見被commit的消息。
1.1問題:
- 被移除后的under replica的follower 會繼續拉取leader的數據,等追趕上之后,會被重新加入到“同步副本”。
- 一個消息什么時候被認為是提交的?(意味着可以被consumer消費)
- 直到follower Broker 從同步副本列表中移除
- 或者追趕上leader log end offset,最新的消息才會認為提交。
- 是什么原因導致分區的副本與leader不同步
- 慢副本:在一定周期時間內follower不能追趕上leader。最常見的原因之一是I / O瓶頸導致follower追加復制消息速度慢於從leader拉取速度。
- 卡住副本:在一定周期時間內follower停止從leader拉取請求。follower replica卡住了是由於GC暫停或follower失效或死亡。
- 新啟動副本:當用戶給主題增加副本因子時,新的follower不在同步副本列表中,直到他們完全趕上了leader日志。
- kafka-0.8 相關集群參數配置
replica.lag.time.max.ms=10000 // 根據隊列流量大小和集群負載情況做出判斷並設置一個合適值 replica.lag.max.messages=4000
2. Leader 選舉
當leader宕機了,怎樣在follower中選舉出新的leader?
- 實際上,leader election算法非常多,比如Zookeper的Zab, Raft和Viewstamped Replication。而Kafka所使用的leader election算法更像微軟的PacificA算法。
一種非常常用的選舉leader的方式是“majority vote”(“少數服從多數”),但Kafka並未采用這種方式。這種模式下,如果我們有2f+1個replica(包含leader和follower),那在commit之前必須保證有f+1個replica復制完消息,為了保證正確選出新的leader,fail的replica不能超過f個。---(類似pasox算法)
- 缺點:需要的replica的數量太多,造成性能瓶頸。
leader 選舉算法
Kafka在Zookeeper中動態維護了一個ISR(in-sync replicas) set,這個set里的所有replica都跟上了leader,只有ISR里的成員才有被選為leader的可能。在這種模式下,對於f+1個replica,一個Kafka topic能在保證不丟失已經ommit的消息的前提下容忍f個replica的失敗。在大多數使用場景中,這種模式是非常有利的。
在ISR中至少有一個follower時,Kafka可以確保已經commit的數據不丟失,但如果某一個partition的所有replica都掛了,就無法保證數據不丟失了。這種情況下有兩種可行的方案:
- 等待ISR中的任一個replica“活”過來,並且選它作為leader
- 選擇第一個“活”過來的replica(不一定是ISR中的)作為leader
這就需要在可用性和一致性當中作出一個簡單的平衡。(Kafka0.8.*使用了第二種方式。)
3.平衡partition
- 默認情況下,kafka以RoundRobin方式寫各個partition,讓各個partition的消息量均衡。
- 平衡partition的leader在所有的broker上。
優化leadership election的過程也是很重要的,畢竟這段時間相應的partition處於不可用狀態。
一種簡單的實現是暫停宕機的broker上的所有partition,並為之選舉leader。實際上,Kafka選舉一個broker作為controller,這個controller通過watch Zookeeper檢測所有的broker failure,並負責為所有受影響的parition選舉leader,再將相應的leader調整命令發送至受影響的broker,過程如下圖所示。

4.Controller
負責leader 選舉,每個broker都可成為Controller。
它可以批量的通知leadership的變化,從而使得選舉過程成本更低。如果controller失敗了,所有broker都會嘗試在Zookeeper中創建/controller->{this broker id},如果創建成功(只可能有一個創建成功),則該broker會成為controller。
Controller對Broker failure的處理過程
- Controller在Zookeeper的/brokers/ids節點上注冊Watch。一旦有Broker宕機(本文用宕機代表任何讓Kafka認為其Broker die的情景,包括但不限於機器斷電,網絡不可用,GC導致的Stop The World,進程crash等),其在Zookeeper對應的Znode會自動被刪除,Zookeeper會fire Controller注冊的Watch,Controller即可獲取最新的幸存的Broker列表。
- Controller決定set_p,該集合包含了宕機的所有Broker上的所有Partition。
- 對set_p中的每一個Partition:
3.1 從/brokers/topics/[topic]/partitions/[partition]/state讀取該Partition當前的ISR。
3.2 決定該Partition的新Leader。如果當前ISR中有至少一個Replica還幸存,則選擇其中一個作為新Leader,新的ISR則包含當前ISR中所有幸存的Replica。否則選擇該Partition中任意一個幸存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的數據丟失)。如果該Partition的所有Replica都宕機了,則將新的Leader設置為-1。
3.3 將新的Leader,ISR和新的leader_epoch及controller_epoch寫入/brokers/topics/[topic]/partitions/[partition]/state。注意,該操作只有Controller版本在3.1至3.3的過程中無變化時才會執行,否則跳轉到3.1。
- 直接通過RPC向set_p相關的Broker發送LeaderAndISRRequest命令。Controller可以在一個RPC操作中發送多個命令從而提高效率.
Broker failover順序圖如下所示。
5. 消息保障
kafka能夠保障以下兩點:
- At most once 消息可能會丟,但絕不會重復傳輸
- At least once 消息絕不會丟,但可能會重復傳輸
對於Producer
- 發送不管,at most once
- 發送管ack,at least once
對於Consumer
- 記錄Offset,at least once。
