一、Kafka核心總控制器
定義:kafka集群中的一個負責管理所有分區和副本的狀態的broker。
PS:kafka單台機器也叫集群。
職能:選舉新的leader副本、ISR變更通知所有broker更新其元數據、讓新分區被其他節點感知。
- 當某個分區的leader副本出現故障時,由控制器負責為該分區選舉新的leader副本。
- 當檢測到某個分區的ISR集合發生變化時,由控制器負責通知所有broker更新其元數據信息。
- 當使用kafka-topics.sh腳本為某個topic增加分區數量時,同樣還是由控制器負責讓新分區被其他節點感知到。
Controller選舉原理
- kafka集群啟動時,每個borker去ZK上創建一個“/controller 臨時節點”,誰創建成功誰就是Controller。
- 當這個controller角色的broker宕機了,此時ZK上的臨時節點會消失,集群里其他broker會一直監聽這個臨時節點,發現臨時節點消失了,就競爭再次創建臨時節點,ZK又會保證只有一個broker成為新的controller。
Partition副本選舉Leader原理
- controller感知到分區leader所在的broker掛了,則會從ISR列表里挑第一個broker作為leader。
- 若參數unclean.leader.election.enable為true,代表在ISR列表里所有副本都掛了,則可以在ISR列表以外的副本中選leader。
副本進入ISR的條件
- 副本節點不能產生分區,必須能與zookeeper保持會話以及跟leader副本網絡連通。
- 副本能復制leader上的所有寫操作,並且不能落后太多。
PS:之所以取ISR中的第一個broker升級Leader,是因為第一個broker最先放進ISR 列表,可能是同步數據最多的副本。
二、消費者offset記錄機制
offset記錄機制
- 每個consumer會定期將自己消費分區的offset提交給kafka內部topic:__consumer_offsets。
- 提交過去的時候,key是consumerGroupId+topic+分區號,value就是當前offset的值。
- kafka會定期清理topic里的消息,最后就保留最新的那條數據。
PS:因為__consumer_offsets可能會接收高並發的請求,kafka默認給其分配50個分區(可以通過offsets.topic.num.partitions設置),這樣可以通過加機器的方式抗大並發。



三、消費者Rebalance機制
Rebalance機制
定義:如果消費組里的消費者數量有變化或消費的分區數有變化,kafka會重新分配消費者和消費分區的關系。
PS:每個消費者都會有消費組,如果不指定會生成默認的組。
PS:rebalance只針對subscribe這種不指定分區消費的情況,如果通過assign這種消費方式指定了分區,kafka不會進行rebanlance。
PS:rebalance過程中,消費者無法從kafka消費消息!!!【盡量避免在系統高峰期時發生重平衡。】
觸發場景:
- 1、消費組里的consumer增加或減少了
- 2、動態給topic增加了分區
- 3、消費組訂閱了更多的topic
Rebalance過程
設計原理:分區方案制定過程中有兩個組長,消費者組長負責制訂分區策略,生產者組長負責通知其他消費者分區策略。【同一個消費者組中的消費者沒有聯系,需要通過kafka組長來協調。】

兩個重要角色:
- 組協調器【生產者組長】:每個消費者組都會選擇一個broker作為自己的組協調器coordinator,負責監控這個消費組里的所有消費者的心跳,以及判斷是否宕機,然后開啟消費者rebalance。
- 消費組協調器【消費者組長】:負責制定分區方案,並與組協調器進行通信。
PS:消費者組長不是我們所說的分區Leader!這個概念要區分好!
第一階段:選擇組協調器【生產者組長】
消費者組中的每個consumer啟動時會向kafka集群中的某個節點發送查找組協調器的請求,並跟其建立網絡連接。
組協調器選擇方式
公式:hash(consumer_group_id) % _consumer_offsets主題的分區數。
根據公式獲取到分區地址后,這個分區leader對應的broker就是這個消費者組的組協調器。
第二階段:消費者入組
在成功找到消費組所對應的組協調器后就進入消費組入組的階段,在此階段的消費者會向組協調器發送入組請求。
組協調器從一個消費者組中選擇第一個加入group的consumer作為消費者組長,把消費者組的情況發送給這個broker,接着這個broker會負責制定分區方案。
第三階段:組協調器下發分區方案
消費者組長通過給組協調器發送下發分區策略請求,接着組協調器就把分區方案下發給各個consumer,他們會與指定分區的leader對應的broker進行網絡連接以及消息消費。
Rebalance分區分配策略
- range【默認】:根據分區數/消費者數量,然后給每個消費者分配n個分區。【消費者1:0~3 消費者2:4~6 消費者3:7~9】
- round-robin:輪詢分配機制。【消費者1:0、3、6 消費者2:1、4、7 消費者3:2、5、8】
- sticky:粘性輪詢機制。【當分區增加或者增加消費者時,只會重新分配掛掉的那台或者新的分區。】
PS:range和輪詢當分區增加或者增加消費者時,會重新輪詢。而粘性策略則不會,粘性策略只會重新分配掛掉的那台或者新的分區。
四、生產者發布消息機制
寫入方式
producer采用push模式將消息發布到broker,每條消息都被append到patition中,屬於順序寫磁盤。
PS:順序寫磁盤效率比隨機寫內存要高,可以保障kafka吞吐量。
消息路由
producer發送消息到broker時,會根據分區算法選擇將其存儲到哪一個partition。其路由機制為:
- 1、指定了patition,則直接使用。
- 2、未指定patition但指定key,通過對key的value進行hash選出一個patition。【key的組成見本文上半部分的解析】
- 3、patition和key都未指定,使用輪詢選出一個 patition。
寫入流程

五、高水位
基本概念
- HW:高水位
- LEO:日志末端位移

HW即一個partition對應的ISR中最小的LEO(log-end-offset), consumer最多只能消費到HW所在的位置。
每個replica都有HW,leader和follower各自負責更新自己的HW的狀態。
PS:對於leader新寫入的消息,consumer不能立刻消費,leader會等待該消息被所有ISR中的replicas同步后更新HW,此時消息才能被consumer消費。
快樂圖解~


六、日志分段存儲原理
Kafka一個分區的消息數據對應存儲在一個文件夾下,以topic名稱+分區號命名:

消息在分區內是分段(segment)存儲,每個段的消息都存儲在不一樣的log文件里,這種特性方便old segment file快速被刪除,kafka規定了一個段位的log文 件最大為1G,做這個限制目的是為了方便把 log 文件加載到內存去。
# 【部分消息的offset索引文件】,kafka每次往分區發4K(可配置)消息就會記錄一條當前消息的offset到index文件 # 如果要定位消息的offset會先在這個文件里快速定位,再去log文件里找具體消息 00000000000000000000.index
# 【消息存儲文件】,主要存offset和消息體 00000000000000000000.log
# 【消息的發送時間索引文件】,kafka每次往分區發4K(可配置)消息就會記錄一條當前消息的發送時間戳與對應的offset到timeindex文件 # 如果需要按照時間來定位消息的offset,會先在這個文件里查找 00000000000000000000.timeindex 00000000000005367851.index 00000000000005367851.log 00000000000005367851.timeindex 00000000000009936472.index 00000000000009936472.log 00000000000009936472.timeindex
這個partition有三組segment文件,當儲存滿時每個log文件的大小是一樣的,但是存儲的message數量是不一定相等的(每條的message大小不一致)。
PS:每個日志段文件最大為1G。
- 文件的命名是以該segment最小offset來命名的,如00000000000000.index存儲offset為0~5367850的消息...以此類推。
- kafka就是利用分段+索引的方式來解決查找效率的問題。
