kafka-partition分配的策略


partition是kafka中的重要設計概念,處於topic之下,消息都是存在partition中的,

生產的消息實際是發到partition中的,消費某個topic,實際也是從partition中拉取的消息

topic創建時,若不指定分區個數,則使用server.properties中配置的num.partitions值,也可以自己指定

比如我創建了一個10分區的topic:TEST,查看TEST結果如下:

./kafka-topics  --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 10 --topic TEST
./kafka-topics --describe --bootstrap-server localhost:9092 --topic TEST
Topic:TEST    PartitionCount:10    ReplicationFactor:1    Configs:min.insync.replicas=1,segment.bytes=1073741824,retention.ms=604800000,max.message.bytes=1000000,min.cleanable.dirty.ratio=0.5,unclean.leader.election.enable=false,retention.bytes=-1,delete.retention.ms=604800000
    Topic: TEST    Partition: 0    Leader: 44    Replicas: 44    Isr: 44
    Topic: TEST    Partition: 1    Leader: 44    Replicas: 44    Isr: 44
    Topic: TEST    Partition: 2    Leader: 44    Replicas: 44    Isr: 44
    Topic: TEST    Partition: 3    Leader: 44    Replicas: 44    Isr: 44
    Topic: TEST    Partition: 4    Leader: 44    Replicas: 44    Isr: 44
    Topic: TEST    Partition: 5    Leader: 44    Replicas: 44    Isr: 44
    Topic: TEST    Partition: 6    Leader: 44    Replicas: 44    Isr: 44
    Topic: TEST    Partition: 7    Leader: 44    Replicas: 44    Isr: 44
    Topic: TEST    Partition: 8    Leader: 44    Replicas: 44    Isr: 44
    Topic: TEST    Partition: 9    Leader: 44    Replicas: 44    Isr: 44

producer與partition

生產者在往topic發送數據時,ProducerRecord<K,V>有這樣幾個屬性:

    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;

其中決定這條record發送到那個分區,主要由partition和key兩個屬性決定,partition的選取也有一個策略,官網描述如下:

The default partitioning strategy:

If a partition is specified in the record, use it     指定了分區,則消息投遞到指定的分區

If no partition is specified but a key is present choose a partition based on a hash of the key   未指定分區,但指定了key,則基於hash(key)選擇一個分區

If no partition or key is present choose a partition in a round-robin fashion   分區編號和key均未指定,則輪詢選擇,round-robin,老熟客了

未指定分區時,貼一下源碼是怎么計算的:(org.apache.kafka.clients.producer.internals.DefaultPartitioner.partitiono())

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = this.nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            } else {
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

consumer與partition

 借一張網圖說明:假設topic有4個分區(P0-P4),有兩個組(Group A和Group B),A和B分別有2個(C1-C2)和4個消費者(C3-C6)

已知如下事實:

主題下有多個分區;

消費者以組的名義訂閱主題

組內有一個或多個消費者實例

同一時刻,一條消息只能被組里的一個消費者消費

每個分區在某一時刻只會被組里的一個消費者消費(rebalance時這個實施仍然成立)

默認情況下:

如果分區數大於消費者實例個數,按照抽屜原理,必定有一個消費者同時負責多個分區(2個 or 以上)

如果分區數等於消費者實例個數,正好一個消費者消費一個分區

如果分區數小於消費者實例個數,必然有消費者空閑

官網上,通過配置partition.assignment.strategy來規定分區分配策略,默認是range,是一個class:org.apache.kafka.clients.consumer.RangeAssignor

查看源碼如下圖,可配置的Partition分配策略有三個,range,roundRobin,Sticky

  •  Range

以topic為單位處理分區分配(對每個topic獨立分配),先對所有分區按照分區ID進行排序,然后對消費組中的所有消費者進行排序

下圖是三種情況下的分配結果舉例:分別是分區數大於且能整除,分區數大於但不能整除,分區數小於consumer數

 挖了下源碼,截取核心的一段邏輯比較清晰:

public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) {
        Map<String, List<String>> consumersPerTopic = this.consumersPerTopic(subscriptions);
        Map<String, List<TopicPartition>> assignment = new HashMap();
        Iterator i$ = subscriptions.keySet().iterator();
        while(i$.hasNext()) {
            String memberId = (String)i$.next();
            assignment.put(memberId, new ArrayList());
        }
        i$ = consumersPerTopic.entrySet().iterator();
        while(true) {
            String topic;
            List consumersForTopic;
            Integer numPartitionsForTopic;
            do {
                if (!i$.hasNext()) {
                    return assignment;
                }

                Entry<String, List<String>> topicEntry = (Entry)i$.next();
                topic = (String)topicEntry.getKey();
                consumersForTopic = (List)topicEntry.getValue();
                numPartitionsForTopic = (Integer)partitionsPerTopic.get(topic);
            } while(numPartitionsForTopic == null);
//在此之前都是在處理分區和消費者的數據,算法部分從這里開始 //對消費者進行排序,可以看到是一個List<String>,所以這里是按照字典序排序 Collections.sort(consumersForTopic);
//求商和求余:分區數/消費者數 分區數%消費者數
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); int i = 0; //這個算法做的就是求出每個消費者最終分配到的分區,上圖就是這么算出來的 for(int n = consumersForTopic.size(); i < n; ++i) { int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); ((List)assignment.get(consumersForTopic.get(i))).addAll(partitions.subList(start, start + length)); } } }

可以看到字典序排在前面的會分配到更多的分區(假設消費者少於分區),如果topic繼續增多,排頭的這個消費者會分到越來越多額外的分區

  • RoundRobin

將消費組內訂閱的所有Topic的分區及組內所有消費者進行排序后盡量均衡的分配

如果組內的消費者消費的topic相同,可以得出,消費者之間分配到的分區數差值不會超過1,輪詢的意思可自行理解

RoundRobin策略比range策略要稍微進步一點點,分配結果更均衡一些

  • StickyAssignor

有些版本的分配策略,只支持前兩種,稍微新一點的版本,支持這個策略(至少是0.11以上)

前兩種策略,如果消費者已經分配過一次之后,遇到重新分配的情況(比如rebalance),分區的調整會非常大,雖然重新分配的場景屬於少數

那么需要一種策略,在保證盡量均衡的情況下,能盡量減少已經分配過的結果的改動,這樣也能減少很多重新分配的開銷

就是兩個目標:盡量均衡+最少改動,當然,實現算法上也比前兩者復雜很多

broker與partition

創建topic時指定分區數是可以自定義的,通常kafka集群有若干個broker,partition分配到broker也有一個設計

對於這個設計,需要這樣一個思路:

1、每個topic下的partition盡量均勻分布到broker上;

2、每個broker上被分配到的partition個數盡量均勻;

3、分區也需要有副本保證高可用,副本需要盡量均勻的分布到broker上;

基於以上,網上講的通過講分區排序,broker排序,然后簡單的運算取模顯然是不行的,因為這樣的話,排在前面的broker顯然會更辛苦

分區分配的源碼是scala版本,雖然跟java語法有區別,看代碼邏輯推測一下語法,完全能看懂(scala.kafka.admin.AdminUtils.assignReplicasToBrokers())

還是先看作者的注釋,做了下簡短翻譯:

* There are 3 goals of replica assignment:
*
* 1. Spread the replicas evenly among brokers.(分區副本要均勻分布在broker上) * 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.(分區的每個副本要分配到不同的broker上) * 3. If all brokers have rack information, assign the replicas for each partition to different racks if possible(如果broker部署在不同機架上,副本還需要分布到不同機架上) * * To achieve this goal for replica assignment without considering racks, we:(不考慮機架的話,按照如下設計實現:) * 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list. * 2. Assign the remaining replicas of each partition with an increasing shift.

實現思路:

1、從broker-list中選定一個隨機的位置作為開始,將每個partition的第一個replica,按round-robin(輪詢)方式分配到這些broker上

2、每個分區剩下的replica的位置,以一個遞增的shift(挪動)方式分配到其余的broker上

以下是根據源碼邏輯做的簡單的注釋:

def assignReplicasToBrokers(brokerList: Seq[Int],        -----broker列表
                            nPartitions: Int,            -----待分配的partition數
                            replicationFactor: Int,      -----定義的每個分區的副本數
                            fixedStartIndex: Int = -1,   ----- 兩個副本之間的增長間隔值
                            startPartitionId: Int = -1)  -----從topic的哪個分區開始分配
: Map[Int, Seq[Int]] = {                                 
  if (nPartitions <= 0)
    throw new AdminOperationException("number of partitions must be larger than 0")
  if (replicationFactor <= 0)
    throw new AdminOperationException("replication factor must be larger than 0")
  if (replicationFactor > brokerList.size)               -----這個地方說明分區副本數量不能超過broker個數
    throw new AdminOperationException("replication factor: " + replicationFactor +
      " larger than available brokers: " + brokerList.size)
  val ret = new mutable.HashMap[Int, List[Int]]()        -----分配結果,保存為一個HashMap,key為partition id,value為分配的brokers列表
  val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) ---隨機選取一個startingBroker
  var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0  ----指定一個分區作為開始,否則從第一個分區開始(編號0)
  var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)  ----動態的增長間隔值
  for (i <- 0 until nPartitions) {                       ----- 遍歷每個分區來做分配
    if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0))
      nextReplicaShift += 1                              ----- 分區編號能整除brokers.size()時,說明輪詢到brokers末尾了,shift+1
    val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size  ----每個分區的第一個副本單獨分配,一般作為副本的leader
    var replicaList = List(brokerList(firstReplicaIndex))
    for (j <- 0 until replicationFactor - 1)             ---- 給當前分區的每個副本做分配
      replicaList ::= brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
    ret.put(currentPartitionId, replicaList.reverse)
    currentPartitionId = currentPartitionId + 1
  }
  ret.toMap
}
//每個分區除第一個副本外,其他副本通過該算法計算這個副本應該放在哪個broker上
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
  val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
  (firstReplicaIndex + shift) % nBrokers
}

大體過程為,定義分配的broker(自定義否則隨機生成),partition(自定義否則從0開始),shift(自定義否則隨機生成)初始值

for循環nPartition次:

  partitionId是否輪詢到brokers末尾,如果是,則shift+1;

  分配第一個副本到某個broker;

  for循環分配當前分區剩下的副本到broker上;

  分區id+1,繼續分配;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM