partition是kafka中的重要設計概念,處於topic之下,消息都是存在partition中的,
生產的消息實際是發到partition中的,消費某個topic,實際也是從partition中拉取的消息
topic創建時,若不指定分區個數,則使用server.properties中配置的num.partitions值,也可以自己指定
比如我創建了一個10分區的topic:TEST,查看TEST結果如下:
./kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 10 --topic TEST
./kafka-topics --describe --bootstrap-server localhost:9092 --topic TEST
Topic:TEST PartitionCount:10 ReplicationFactor:1 Configs:min.insync.replicas=1,segment.bytes=1073741824,retention.ms=604800000,max.message.bytes=1000000,min.cleanable.dirty.ratio=0.5,unclean.leader.election.enable=false,retention.bytes=-1,delete.retention.ms=604800000 Topic: TEST Partition: 0 Leader: 44 Replicas: 44 Isr: 44 Topic: TEST Partition: 1 Leader: 44 Replicas: 44 Isr: 44 Topic: TEST Partition: 2 Leader: 44 Replicas: 44 Isr: 44 Topic: TEST Partition: 3 Leader: 44 Replicas: 44 Isr: 44 Topic: TEST Partition: 4 Leader: 44 Replicas: 44 Isr: 44 Topic: TEST Partition: 5 Leader: 44 Replicas: 44 Isr: 44 Topic: TEST Partition: 6 Leader: 44 Replicas: 44 Isr: 44 Topic: TEST Partition: 7 Leader: 44 Replicas: 44 Isr: 44 Topic: TEST Partition: 8 Leader: 44 Replicas: 44 Isr: 44 Topic: TEST Partition: 9 Leader: 44 Replicas: 44 Isr: 44
producer與partition
生產者在往topic發送數據時,ProducerRecord<K,V>有這樣幾個屬性:
private final String topic; private final Integer partition; private final Headers headers; private final K key; private final V value; private final Long timestamp;
其中決定這條record發送到那個分區,主要由partition和key兩個屬性決定,partition的選取也有一個策略,官網描述如下:
The default partitioning strategy: If a partition is specified in the record, use it 指定了分區,則消息投遞到指定的分區 If no partition is specified but a key is present choose a partition based on a hash of the key 未指定分區,但指定了key,則基於hash(key)選擇一個分區 If no partition or key is present choose a partition in a round-robin fashion 分區編號和key均未指定,則輪詢選擇,round-robin,老熟客了
未指定分區時,貼一下源碼是怎么計算的:(org.apache.kafka.clients.producer.internals.DefaultPartitioner.partitiono())
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = this.nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return ((PartitionInfo)availablePartitions.get(part)).partition(); } else { return Utils.toPositive(nextValue) % numPartitions; } } else { return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
consumer與partition

借一張網圖說明:假設topic有4個分區(P0-P4),有兩個組(Group A和Group B),A和B分別有2個(C1-C2)和4個消費者(C3-C6)
已知如下事實:
主題下有多個分區;
消費者以組的名義訂閱主題
組內有一個或多個消費者實例
同一時刻,一條消息只能被組里的一個消費者消費
每個分區在某一時刻只會被組里的一個消費者消費(rebalance時這個實施仍然成立)
默認情況下:
如果分區數大於消費者實例個數,按照抽屜原理,必定有一個消費者同時負責多個分區(2個 or 以上)
如果分區數等於消費者實例個數,正好一個消費者消費一個分區
如果分區數小於消費者實例個數,必然有消費者空閑
官網上,通過配置partition.assignment.strategy來規定分區分配策略,默認是range,是一個class:org.apache.kafka.clients.consumer.RangeAssignor
查看源碼如下圖,可配置的Partition分配策略有三個,range,roundRobin,Sticky

- Range
以topic為單位處理分區分配(對每個topic獨立分配),先對所有分區按照分區ID進行排序,然后對消費組中的所有消費者進行排序
下圖是三種情況下的分配結果舉例:分別是分區數大於且能整除,分區數大於但不能整除,分區數小於consumer數

挖了下源碼,截取核心的一段邏輯比較清晰:
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { Map<String, List<String>> consumersPerTopic = this.consumersPerTopic(subscriptions); Map<String, List<TopicPartition>> assignment = new HashMap(); Iterator i$ = subscriptions.keySet().iterator(); while(i$.hasNext()) { String memberId = (String)i$.next(); assignment.put(memberId, new ArrayList()); } i$ = consumersPerTopic.entrySet().iterator(); while(true) { String topic; List consumersForTopic; Integer numPartitionsForTopic; do { if (!i$.hasNext()) { return assignment; } Entry<String, List<String>> topicEntry = (Entry)i$.next(); topic = (String)topicEntry.getKey(); consumersForTopic = (List)topicEntry.getValue(); numPartitionsForTopic = (Integer)partitionsPerTopic.get(topic); } while(numPartitionsForTopic == null);
//在此之前都是在處理分區和消費者的數據,算法部分從這里開始 //對消費者進行排序,可以看到是一個List<String>,所以這里是按照字典序排序 Collections.sort(consumersForTopic);
//求商和求余:分區數/消費者數 分區數%消費者數 int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); int i = 0; //這個算法做的就是求出每個消費者最終分配到的分區,上圖就是這么算出來的 for(int n = consumersForTopic.size(); i < n; ++i) { int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); ((List)assignment.get(consumersForTopic.get(i))).addAll(partitions.subList(start, start + length)); } } }
可以看到字典序排在前面的會分配到更多的分區(假設消費者少於分區),如果topic繼續增多,排頭的這個消費者會分到越來越多額外的分區
-
RoundRobin
將消費組內訂閱的所有Topic的分區及組內所有消費者進行排序后盡量均衡的分配
如果組內的消費者消費的topic相同,可以得出,消費者之間分配到的分區數差值不會超過1,輪詢的意思可自行理解
RoundRobin策略比range策略要稍微進步一點點,分配結果更均衡一些
- StickyAssignor
有些版本的分配策略,只支持前兩種,稍微新一點的版本,支持這個策略(至少是0.11以上)
前兩種策略,如果消費者已經分配過一次之后,遇到重新分配的情況(比如rebalance),分區的調整會非常大,雖然重新分配的場景屬於少數
那么需要一種策略,在保證盡量均衡的情況下,能盡量減少已經分配過的結果的改動,這樣也能減少很多重新分配的開銷
就是兩個目標:盡量均衡+最少改動,當然,實現算法上也比前兩者復雜很多
broker與partition
創建topic時指定分區數是可以自定義的,通常kafka集群有若干個broker,partition分配到broker也有一個設計
對於這個設計,需要這樣一個思路:
1、每個topic下的partition盡量均勻分布到broker上;
2、每個broker上被分配到的partition個數盡量均勻;
3、分區也需要有副本保證高可用,副本需要盡量均勻的分布到broker上;
基於以上,網上講的通過講分區排序,broker排序,然后簡單的運算取模顯然是不行的,因為這樣的話,排在前面的broker顯然會更辛苦
分區分配的源碼是scala版本,雖然跟java語法有區別,看代碼邏輯推測一下語法,完全能看懂(scala.kafka.admin.AdminUtils.assignReplicasToBrokers())
還是先看作者的注釋,做了下簡短翻譯:
* There are 3 goals of replica assignment:
*
* 1. Spread the replicas evenly among brokers.(分區副本要均勻分布在broker上) * 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.(分區的每個副本要分配到不同的broker上) * 3. If all brokers have rack information, assign the replicas for each partition to different racks if possible(如果broker部署在不同機架上,副本還需要分布到不同機架上) * * To achieve this goal for replica assignment without considering racks, we:(不考慮機架的話,按照如下設計實現:) * 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list. * 2. Assign the remaining replicas of each partition with an increasing shift.
實現思路:
1、從broker-list中選定一個隨機的位置作為開始,將每個partition的第一個replica,按round-robin(輪詢)方式分配到這些broker上
2、每個分區剩下的replica的位置,以一個遞增的shift(挪動)方式分配到其余的broker上
以下是根據源碼邏輯做的簡單的注釋:
def assignReplicasToBrokers(brokerList: Seq[Int], -----broker列表 nPartitions: Int, -----待分配的partition數 replicationFactor: Int, -----定義的每個分區的副本數 fixedStartIndex: Int = -1, ----- 兩個副本之間的增長間隔值 startPartitionId: Int = -1) -----從topic的哪個分區開始分配 : Map[Int, Seq[Int]] = { if (nPartitions <= 0) throw new AdminOperationException("number of partitions must be larger than 0") if (replicationFactor <= 0) throw new AdminOperationException("replication factor must be larger than 0") if (replicationFactor > brokerList.size) -----這個地方說明分區副本數量不能超過broker個數 throw new AdminOperationException("replication factor: " + replicationFactor + " larger than available brokers: " + brokerList.size) val ret = new mutable.HashMap[Int, List[Int]]() -----分配結果,保存為一個HashMap,key為partition id,value為分配的brokers列表 val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) ---隨機選取一個startingBroker var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0 ----指定一個分區作為開始,否則從第一個分區開始(編號0) var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) ----動態的增長間隔值 for (i <- 0 until nPartitions) { ----- 遍歷每個分區來做分配 if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0)) nextReplicaShift += 1 ----- 分區編號能整除brokers.size()時,說明輪詢到brokers末尾了,shift+1 val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size ----每個分區的第一個副本單獨分配,一般作為副本的leader var replicaList = List(brokerList(firstReplicaIndex)) for (j <- 0 until replicationFactor - 1) ---- 給當前分區的每個副本做分配 replicaList ::= brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size)) ret.put(currentPartitionId, replicaList.reverse) currentPartitionId = currentPartitionId + 1 } ret.toMap } //每個分區除第一個副本外,其他副本通過該算法計算這個副本應該放在哪個broker上 private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = { val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1) (firstReplicaIndex + shift) % nBrokers }
大體過程為,定義分配的broker(自定義否則隨機生成),partition(自定義否則從0開始),shift(自定義否則隨機生成)初始值
for循環nPartition次:
partitionId是否輪詢到brokers末尾,如果是,則shift+1;
分配第一個副本到某個broker;
for循環分配當前分區剩下的副本到broker上;
分區id+1,繼續分配;
