Kafka Topic Partition Replica Assignment實現原理及資源隔離方案


本文共分為三個部分:
 
  • Kafka Topic創建方式
  • Kafka Topic Partitions Assignment實現原理
  • Kafka資源隔離方案
 
1. Kafka Topic創建方式
 
Kafka Topic創建方式有以下兩種表現形式:
 
(1)創建Topic時直接指定Topic Partition Replica與Kafka Broker之間的存儲映射關系
 
/usr/lib/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper ZooKeeperHost:ZooKeeperPort --create --topic TopicName --replica-assignment id0:id1:id2,id3:id4:id5,id6:id7:id8
 
其中,“id0:id1:id2,id3:id4:id5,id6:id7:id8”表示Topic TopicName一共有3個Partition(以“,”分隔),每個Partition均有3個Replica(以“:”分隔),Topic Partition Replica與Kafka Broker之間的對應關系如下:
 
Partition0 Replica:Broker id0、Broker id1、Broker id2;
Partition1 Replica:Broker id3、Broker id4、Broker id5;
Partition2 Replica:Broker id6、Broker id7、Broker id8;
 
(2)創建Topic時由Kafka自動分配Topic Partition Replica與Kafka Broker之間的存儲映射關系
 
/usr/lib/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper ZooKeeperHost:ZooKeeperPort --create --topic TopicName
 
第(1)種方式完全依靠人為手工指定,這里僅僅探討使用第(2)種方式創建Topic時,“自動分配”是如何實現的。
 
2. Kafka Topic Partition Replica Assignment實現原理
 
Replica Assignment的目標有兩個:
 
(1)使Partition Replica能夠均勻地分配至各個Kafka Broker(負載均衡);
(2)如果Partition的第一個Replica分配至某一個Kafka Broker,那么這個Partition的其它Replica則需要分配至其它的Kafka Brokers,即Partition Replica分配至不同的Broker;
 
注意,這里有一個約束條件:Topic Partition Replicas Size <= Kafka Brokers Size。
 
“自動分配”的核心工作過程如下:
 
隨機選取一個StartingBroker(Broker id0、Broker id1、Broker id2、...),隨機選取IncreasingShift初始值([0,nBrokers - 1])
 
(1)從StartingBroker開始,使用輪詢的方式依次將各個Partition的Replicas分配至各個Broker;
 
對於每一個Partition,Replicas的分配過程如下:
 
(2)Partition的第一個Replica分配至StartingBroker;
 
(3)根據IncreasingShift計算第n(n>=2)個Replica的Shift(即與第1個Replica的間隔量),依據Shift將其分配至相應的Broker;
 
(4)StartingBroker移至下一個Broker;
 
(5)如果Brokers已經被輪詢完一次,則IncreasingShift遞增一;否則,繼續(2)。
 
假設有5個Brokers(broker-0、broker-1、broker-2、broker-3、broker-4),Topic有10個Partition(p0、p1、p2、p3、p4、p5、p6、p7、p8、p9),每一個Partition有3個Replica,依據上述工作過程,分配結果如下:
 
broker-0  broker-1  broker-2  broker-3  broker-4
p0           p1            p2           p3            p4       (1st replica)
p5           p6            p7           p8            p9       (1st replica)
p4           p0            p1           p2            p3       (2nd replica)
p8           p9            p5           p6            p7       (2nd replica)
p3           p4            p0           p1            p2       (3nd replica)
p7           p8            p9           p5            p6       (3nd replica)
 
詳細步驟如下:
 
選取broker-0作為StartingBroker,IncreasingShift初始值為1,
 
對於p0,replica1分配至broker-0,IncreasingShift為1,所以replica2分配至broker-1,replica3分配至broker-2;
對於p1,replica1分配至broker-1,IncreasingShift為1,所以replica2分配至broker-2,replica3分配至broker-3;
對於p2,replica1分配至broker-2,IncreasingShift為1,所以replica2分配至broker-3,replica3分配至broker-4;
對於p3,replica1分配至broker-3,IncreasingShift為1,所以replica2分配至broker-4,replica3分配至broker-1;
對於p4,replica1分配至broker-4,IncreasingShift為1,所以replica2分配至broker-0,replica3分配至broker-1;
 
注:IncreasingShift用於計算Shift,Shift表示Partition的第n(n>=2)個Replica與第1個Replica之間的間隔量。如果IncreasingShift值為m,那么Partition的第2個Replica與第1個Replica的間隔量為m + 1,第3個Replica與第1個Replica的間隔量為m + 2,...,依次類推。Shift的取值范圍:[1,brokerSize - 1]。
 
此時,broker-0、broker-1、broker-2、broker-3、broker-4分別作為StartingBroker被輪詢分配一次,繼續輪詢;但IncreasingShift遞增為2。
 
對於p5,replica1分配至broker-0,IncreasingShift為2,所以replica2分配至broker-2,replica3分配至broker-3;
對於p6,replica1分配至broker-1,IncreasingShift為2,所以replica2分配至broker-3,replica3分配至broker-4;
對於p7,replica1分配至broker-2,IncreasingShift為2,所以replica2分配至broker-4,replica3分配至broker-0;
對於p8,replica1分配至broker-3,IncreasingShift為2,所以replica2分配至broker-0,replica3分配至broker-1;
對於p9,replica1分配至broker-4,IncreasingShift為2,所以replica2分配至broker-1,replica3分配至broker-2;
 
此時,broker-0、broker-1、broker-2、broker-3、broker-4分別作為StartingBroker再次被輪詢一次,如果還有其它Partition,則繼續輪詢,IncreasingShift遞增為3,依次類推。
 
這里有幾點需要注意:
 
(1)為什么要隨機選取StartingBroker,而不是每次都選取broker-0作為StartingBroker?
 
以broker-0、broker-1、broker-2、broker-3、broker-4為例,因為分配過程是以輪詢方式進行的,如果每次都選取broker-0作為StartingBroker,那么Brokers列表中的前面部分將有可能被分配相對比較多的Partition Replicas,從而導致這部分Brokers負載較高,隨機選取可以保證相對比較好的均勻效果。
 
(2)為什么Brokers列表每次輪詢一次,IncreasingShift值都需要遞增1?
 
Kafka Topic Partition數目較多的情況下,Partition的第1個Replica與第n(n>=2)個Replica之間的間隔量隨着IncreasingShift的變化面變化,能夠更好的均勻分配Replica。
 
scala.kafka.admin.AdminUtils.assignReplicasToBrokers()實現上述Topic Partition Replica與Broker之間的分配過程,源碼如下:
 
 
brokerList:Kafka Brokers列表;
nPartitions:Topic待分配的Partition數目;
replicationFactor:Topic Partition Replica數目;
fixedStartIndex:如果顯示指定,默認值為0;它的值與兩個變量值相關:startIndex和nextReplicaShift,詳情見后;
startPartitionId:從Topic的哪一個Partition開始分配,通常情況下是0,Topic增加Partition時該值不為0。
 
val ret = new mutable.HashMap[Int, List[Int]]()
 
分配結果保存至一個Map變量ret,key為Partition Id,value為分配的Brokers列表。
 
 val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
 var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0
 
 var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
 
startIndex表示StartingBroker,currentPartitionId表示當前為哪個Partition分配Brokers,nextReplicaShift表示當前的IncreasingShit值。
 
 
接下來就是一個循環,用於為每一個Partition的Replicas分配Brokers,其中Partition的第1個Replica由“(currentPartitionId + startIndex) % brokerList.size”決定,其余的Replica由“replicaIndex()”決定。
 
 
shift表示着第n(n >= 2)個Replica與第一個Replica之間的間隔量,“1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)”的計算方式非常巧妙,它保證了shift的取值范圍:[1,nBrokers](大家可以自己體會一下)。
 
3. Kafka資源隔離方案
 
實時數據處理場景中,如果數據量比較大,為了保證寫入/消費的吞吐量,我們創建Topic時通常會指定比較大的Partition數目,從而使得數據盡可能地被分散至更多的Partition,Partition被盡可能均勻的分配至Kafka集群中的各個Broker,從負載均衡的角度看,一切都很美好。從業務的角度看,會有資源競爭的問題,畢竟Kafka Broker機器的帶寬資源是有限的,在帶寬比較緊張的情形下,任何一個業務方的數據量波動(這里僅指數據量增加),所有的業務方都會受到影響;從運維的角度看,會有可用性的問題,任何一台Kafka Broker機器都負載着所有Topic的數據傳輸、存儲,如果出現宕機的情況,將會波及到所有的Topic。針對這種情況,我們提出了划分資源池的資源隔離方案:
 
 
Kafka集群有9台Brokers組成:broker-1、broker-2、...、broker-9,創建9個Topic:t1、t2、...、t9,每個Topic有9個Partition(假設Replica為1),如上圖所示,我們將9台Brokers切分成3個資源池:Pool1(broker-1、broker-2、broker-3)、Pool2(broker-4、broker-5、broker-6)、Pool3(broker-7、broker-8、broker-9),Topic的分配情況如下:
 
Pool1:t1、t2、t3
Pool2:t4、t5、t6
Pool3:t7、t8、t9
 
可以看出,這三個資源池的物理資源是完全獨立的,三個資源池實際上相當於三個小集群。
 
這種資源池的划分方式不但可以做到物理資源的隔離,還可以一定程度上解決異構機型(MEM、DISK)帶來的問題,可以把機型相似的機器組成一個資源池。實際實施時需要綜合考慮業務情況、機器情況,合理划分資源池,並根據具體的Topic情況將其分配至合適的資源池內。
 
Kafka Topic的創建也變為兩步:
 
(1)使用kafka-topics.sh創建Topic;
(2)使用kafka-reassign-partitions.sh移動Topic Partition Replicas至指定的資源池(具體的Brokers列表)。
 
 
 
 
 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM