Kafka創建topic命令很簡單,一條命令足矣:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
這條命令會創建一個名為test的topic,有3個分區,每個分區需分配3個副本。那么在這條命令之后Kafka又做了什么事情呢?本文將對此進行一下梳理,完整地闡述Kafka topic是如何創建的。
topic創建主要分為兩個部分:命令行部分+后台(controller)邏輯部分,如下圖所示。主要的思想就是后台邏輯會監聽zookeeper下對應的目錄節點,一旦發起topic創建命令,該命令會創建新的數據節點從而觸發后台的創建邏輯。
簡單來說我們發起的命令行主要做兩件事情:1. 確定分區副本的分配方案(就是每個分區的副本都分配到哪些broker上);2. 創建zookeeper節點,把這個方案寫入/brokers/topics/<topic>節點下
Kafka controller部分主要做下面這些事情:1. 創建分區;2. 創建副本;3. 為每個分區選舉leader、ISR;4.更新各種緩存
下面我們詳細說說其中的邏輯。在開始之前,我們假設本例中要創建的topic名字是test,有3個分區,副本因子(replication-factor)也是3。注意:本文只涉及主要的邏輯,一些非默認行為不在此次討論之中。
命令行部分
我們發起topic創建命令之后,Kafka會做一些基本的校驗,比如是否同時指定了分區數、副本因子或是topic名字中是否含有非法字符等。值得一提的是,0.10版本支持指定broker的機架信息,類似於Hadoop那樣,可以更好地利用局部性原理減少集群中網絡開銷。如果指定了機架信息(broker.rack), Kafka在為分區做副本分配時就會考慮這部分信息,盡可能地為副本挑選不同機架的broker。當然本例中我們暫時不考慮機架信息對於創建topic的影響。
做完基本的校驗之后,Kafka會從zookeeper的/brokers/ids下獲取集群當前存活broker列表然后開始執行副本的分配工作。首先,分區副本的分配有以下3個目標:
- 盡可能地在各個broker之間均勻地分配副本
- 如果分區的某個副本被分配到了一個broker,那么要盡可能地讓該分區的其他副本均勻地分配到其他broker上
- 如果所有broker都指定了機架信息,那么盡可能地讓每個分區的副本都分配到不同的機架上
第3個目標目前對於我們沒什么用,那前兩點是如何做到的?如果直接看源碼可能有些晦澀難懂,概括起來就一句話:隨機挑選一個broker采用輪詢的方式分配每個分區的第一個副本,然后采用增量右移的方式分配其他的副本。好像還是比較難理解,是吧? 那我舉個例子吧:假設你有10個分區p0, p1, p2, ..., p9,每個分區的副本因子都是3,即總共30個副本,要分配在5個broker(b0, b1, b2, b3, b4)上,采用上面的策略就是這樣的:
1 Kafka會從5個broker中隨機選一個broker,假設它選了b0
2 它會依次采用輪詢的方式為所有分區分配第一個副本,如下表所示。即從b0開始依次順序分配broker給10個分區的第一個副本。
3 目前Kafka已經分配了10個副本,剩下的20個副本Kafka會采用增量右移的方式,比如如果前兩行是1,2,3,4,5(第一行) 6,7,8,9,10(第二行),那么第3行右移1位,變成5,1,2,3,4,第4行右移2位,變成:9, 10, 6, 7, 8,以此類推。那么采用這種方式分配的副本方案如下表所示:
當然,如果考慮機架信息,分配算法會有所調整,但基本上也是滿足上面那3個目標的。
對於本文中使用的例子,我們假設分配方案如下:(格式是分區號 -> 副本所在broker Id集合)
0 -> [0,1,2]
1 -> [1,0,2]
2 -> [2,0,1]
確定了分區副本分配方案之后,Kafka會把這個分配方案持久化到zookeeper的/brokers/topics/<topic>節點下,類似於這樣的信息:{"version":1,"partitions":{"0":[0,1,2],"1":[1,0,2],"2":[2,0,1]}}
okay,至此命令行部分的工作就算完成了,此時你應該可以看到Kafka會返回Created topic "test"給你,表明topic創建成功。但是,千萬不要以為Kafka創建topic的工作就完成了,后面還有很多事情要做,即controller要登場了。
后台邏輯部分
所謂的后台邏輯其實是由Kafka的controller負責提供的。Kafka的controller內部保存了很多信息,其中有一個分區狀態機,用於記錄topic各個分區的狀態。這個狀態機內部注冊了一些zookeeper監聽器。Controller在啟動的時候會創建這些監聽器。其中一個監聽器(TopicChangeListener)就是用於監聽zookeeper的/brokers/topics目錄的子節點變化的。一旦該目錄子節點數發生變化就會調用這個監聽器的處理方法。對於上面的例子來說,由於命令行已將分配方案持久化到/brokers/topics/test下,所以會觸發該監聽器的處理方法。
TopicChangeListener監聽器一方面會更新controller的緩存信息(比如更新集群當前所有的topic列表以及更新新增topic的分區副本分配方案緩存等),另一方面就是創建對應的分區及其副本對象並為每個分區確定leader副本及ISR。
至此,整個topic的創建就完成了!
====================================================================================================================
顯然,剛才關於后台controller邏輯部分幾乎就是一筆帶過了,沒有詳細展開。畢竟如果直接講代碼會比較枯燥。一般情況下,我們了解到此程度就可以了。下面將針對代碼詳細分析下controller是如何創建topic的。
上邊提到過,controller內部定義了很多數據結構用於記錄當前集群的各種狀態。在Controller中還分別定義了一個分區狀態機(PartitionStateMachine)和副本狀態機(ReplicaStateMachine),分別記錄各個分區的狀態和狀態流轉,如下面兩張圖所示:
咋一看,這兩張圖似乎差不多,但一個是分區狀態流轉,一個是副本狀態流轉。不管是分區還是副本,只有處於Online狀態的才能正常工作。當然在設置這個狀態之前必須要先完成一些工作。下面詳細說說:
1 首先,分區狀態機的registerPartitionChangeListener方法會注冊一個zookeeper監聽器,監聽到/brokers/topics下新增了test節點之后,立即處理TopicChangeListener的handleChildChange方法
2 handleChildChange方法的具體邏輯是:
2.1 結合controller緩存的topic列表和/brokers/topics目錄下的topic列表,找出新增的topic:test。假設controller topic列表是A,/brokers/topics下列表是B,新增topic列表可由A - B求得
2.2 使用類似的方法,確定已經被刪除的topic集合,即B - A
2.3 更新controller緩存的topic列表(把test加進去,把那些已經被刪除的topic從緩存中踢出去)
2.4 從/brokers/topics/test節點中取出這個topic所有分區的副本分配方案,然后去更新controller對應的這部分信息(其實也是把test的方案加入到緩存中,另外也會把已刪除的topic對應的方案也踢出去)
2.5 調用onNewTopicCreation開始創建topic
3 onNewTopicCreation:創建topic的回調方法,實現真正的創建topic的邏輯:
3.1 注冊分區變更監聽器——之前說過了分區狀態機會注冊一些zookeeper監聽器,剛剛提到的TopicChangeListener只是其中之一,而這里的監聽器是監聽topic的分區變化的。該監聽器就是PartitionModificationListener類,顧名思義,它負責監聽topic下分區的變化情況,具體來說就是監聽/brokers/topics/topic節點的數據,一旦發生變化該監聽器就會被觸發。當然對於創建topic而言,這一步僅僅是注冊而已並不會被觸發,因為在注冊這個監聽器之前Kafka已經把數據寫入這個節點了。所以此時該監聽器不會觸發操作,這是為以后修改topic時候使用的。 既然本次不會觸發監聽器,代碼里面就手動調用onNewPartitionCreation來創建分區了
3.2 調用onNewPartitionCreation方法創建分區
4 onNewPartitionCreation: 這個方法的目的就是創建topic的所有分區對象,主要涉及4個步驟:
4.1 創建分區對象,並設置成NewPartition狀態:既然叫分區狀態機,必然有個地方要保存Kafka集群下所有topic的所有分區的狀態。每當有新topic創建時,就需要把新增topic所有分區加入這部分緩存,以達到同步的效果。新增的分區狀態統一設置成NewPartition
4.2 為每個分區創建對應的副本對象:Kafka首先從controller緩存中找出這個分區對應的分配方案(還記得吧,controller有個地方保存了所有topic的分區副本分配方案,就是從這里找),然后把這個分區下的所有副本都設置成NewReplica狀態——具體來說Kafka是怎么做的呢?首先,它會嘗試去獲取zookeeper中/brokers/topics/test/partitions/<partitionId>/state節點的數據,該節點保存了每個分區的leader副本和ISR信息。不過對於創建topic來說,目前這個topic的所有分區都沒有leader和ISR信息,所以該節點應該還不存在,應該是空——這是正常的,因為后面會開始選舉!所以這里Kafka僅僅是更新副本狀態機的狀態緩存就可以了(忘了說了,既然分區狀態機有個緩存保存集群中所有分區的狀態,那么副本狀態機自然也有類似的緩存來保存集群中所有topic下所有分區的副本的狀態,所以此時還需要更新這部分緩存)
4.3 前2步創建了分區對象和副本對象,並分別設置成了NewPartition和NewReplica狀態。那么這一步就要把分區狀態轉換到OnlinePartition,只有處於此狀態才可以正常使用。這也是這一步需要做的事情:leader選舉! 代碼寫的很冗長,但簡單來說就是選取副本集合中的第一個副本作為leader副本,並把整個副本集合作為ISR。舉例來說,對於test的分區0,它的副本集合是0,1,2,那么分區0的leader副本就是0,ISR就是[0,1,2]。之后Kafka會把這些信息連同controller的epoch和leader的epoch(多說一句,controller epoch值表示controller被易主的次數,leader epoch也是同理)一同寫入zookeeper的/brokers/topics/test/partitions/0/state節點下,之后更新controller的leader緩存。(再多說一句,controller有個地方記錄了topic所有分區的leader和ISR信息)。 okay,現在新增topic的所有分區都選好了leader和ISR,那么就需要讓集群中其他broker知曉—— 因此需要發送UpdateMetadataRequest給當前所有broker——具體的發送方法其實就是將分區的leader和ISR信息打包封裝進一個map然后為map中的每一項都構造一個UpdateMetadataReuqest對象並通過controller的sendRequest方法發給所有存活着的broker(為什么要發送給所有broker?因為LeaderAndIsr請求是唯一一個所有broker都能立即響應而不需要求助於leader broker的請求!) 具體的發送邏輯由於涉及了Kafka底層網絡協議及KafkaApi機制,等以后有機會再詳談吧。。。
4.4 設置副本對象為OnlineReplica:目前所有的分區都已經選好了leader和ISR並已經持久化到zookeeper中,當然還都傳播到了其他broker上。那么這最后一步就是將副本狀態機中緩存的副本狀態從NewReplica轉換到OnlineReplica
okay,至此一個topic就完整地創建出來了~~