轉載:https://www.cnblogs.com/lanston/p/14219473.html
消費組和消費者是kafka中比較重要的概念,理解和掌握原理有利於優化kafka性能和處理消費積壓問題。Kafka topic 由多個分區組成,分區分布在集群節點上;
Topic:topic01 PartitionCount:10 ReplicationFactor:2 Configs:
Topic: topic01 Partition: 0 Leader: 1 Replicas: 1,4 Isr: 1,4
Topic: topic01 Partition: 1 Leader: 2 Replicas: 2,5 Isr: 2,5
Topic: topic01 Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: topic01 Partition: 3 Leader: 4 Replicas: 4,2 Isr: 4,2
Topic: topic01 Partition: 4 Leader: 5 Replicas: 5,3 Isr: 5,3
Topic: topic01 Partition: 5 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: topic01 Partition: 6 Leader: 2 Replicas: 2,4 Isr: 2,4
Topic: topic01 Partition: 7 Leader: 3 Replicas: 3,5 Isr: 3,5
Topic: topic01 Partition: 8 Leader: 4 Replicas: 4,1 Isr: 4,1
Topic: topic01 Partition: 9 Leader: 5 Replicas: 5,2 Isr: 5,2
當外部程序消費topic數據時,kafka將其視為消費組(ConsumerGroup),每個消費組包含1個或多個消費者(Consumer),消費者數量最多可以為分區總數量,並不是可以無限量。當消費組中的任意一個消費者終止時,kafka會對消費組進行平衡(Rebalance),再根據存活消費數和消費者分配策略重新分配消費者。在0.10.x版本中,kafka提供兩種分配策略(RangeAssignor、RoundRobinAssignor),0.11.x 版本新增策略(StickyAssignor),結構如下;

1 RangeAssignor 策略
RangeAssignor 以主題為單位,以數據順序排列可用分區,以字典順序排列消費者,將topic分區數除以消費者總數,以確定分配給每個消費者的分區數;如果沒有平均分配,那么前幾個消費者將擁有一個額外的分區。實現代碼;
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList<TopicPartition>()); for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) { String topic = topicEntry.getKey(); List<String> consumersForTopic = topicEntry.getValue(); Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null) continue; Collections.sort(consumersForTopic); int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); //topic分區數除以消費者總數 int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); //計算額外分區 List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); for (int i = 0, n = consumersForTopic.size(); i < n; i++) { int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); } }
比如有兩個topic(topic1 ,topic2) ,每個topic都有三個分區;
- topic1 ,分區:topic1p0,topic1p1,topic1p2
- topic2 ,分區:topic2p0,topic2p1,topic2p2
和一個消費組(consumer_group1),有(consumer1,consumer2)兩個消費者,使用RangeAssignor策略可能會得到如下的分配:
- consumer1: [topic1p0,topic1p1,topic2p0,topic2p1]
- consumer2: [topic1p2,topic2p2]
如果此時消費組(consumer_group1)有新的消費者consumer3加入,使用RangeAssignor策略可能會得到如下的分配:
- consumer1: [topic1p0,topic2p0]
- consumer2: [topic1p2,topic2p2]
- consumer3: [topic1p1,topic2p1]
2 RoundRobinAssignor 策略
RoundRobinAssignor 是kafka默認策略,對所有分區和所有消費者循環分配,分區更均衡;實現代碼;
Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<TopicPartition>()); CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet())); for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) { final String topic = partition.topic(); while (!subscriptions.get(assigner.peek()).topics().contains(topic)) assigner.next(); assignment.get(assigner.next()).add(partition); }
繼續以上例topic和消費組為例,RoundRobinAssignor 策略可能會得到如下的分配;
- consumer1: [topic1p0,topic1p1,topic2p2,]
- consumer2: [topic2p0,topic2p1,topic1p2]
3 StickyAssignor 策略
StickyAssignor 策略是最復雜且是0.11.x 版本出現的新策略,該策略主要作用:
- 使topic分區分配盡可能均勻的分配給消費者
- 當某個消費者終止觸發重新分配時,盡可能保留現有分配,將已經終止的消費者所分配的分區移動到另一個消費者,避免全部分區重新平衡,節省開銷。
這個策略自0.11.x 版本出現后,一直到新版本有不同bug被發現,低版本慎用。
4 java多線程消費實例
public class KafkaTopicConsumer {
private KafkaConsumer<String, String> consumer; private int consumerId=0; //消費實例id private final long timeOut=10000; public KafkaTopicConsumer(int consumerId){ this.consumerId=consumerId; Properties props = new Properties(); props.put("client.id", "client-" + consumerId); props.put("bootstrap.servers","192.168.1.10:9092,192.168.1.11:9092"); props.put("group.id", "test-group03"); props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("enable.auto.commit", "false"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //設置分區策略 props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor"); consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("topic1","topic2")); } public void consume() { while (true){ ConsumerRecords<String, String> records=consumer.poll(timeOut); System.out.println("records count:"+records.count()); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("client-id = %d , topic = %s, partition = %d , offset = %d, key = %s, value = %s", this.consumerId,record.topic(), record.partition(), record.offset(), record.key(), record.value())); } consumer.commitSync(); } } public static void main(String[] args) { int threadSize=Integer.parseInt(args[0]); for (int i = 0; i < threadSize; i++) { int id = i; new Thread() { @Override public void run() { new KafkaTopicConsumer(id).consume(); } }.start(); } }// }
啟動三個多線程實例消費,分區分配到每個消費者的情況;
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
test-group03 topic2 0 0 3333 3333 client-0_/192.168.1.13
test-group03 topic1 0 500 3333 2833 client-0_/192.168.1.13
test-group03 topic2 2 0 3333 3333 client-2_/192.168.1.13
test-group03 topic1 2 500 3333 2833 client-2_/192.168.1.13
test-group03 topic2 1 500 3334 2834 client-1_/192.168.1.13
test-group03 topic1 1 0 3334 3334 client-1_/192.168.1.13
對於大的topic,將topic單獨消費以避免數據積壓和topic各自影響數據處理速度,比如文章開始時提到的10分區的topic(topic01),根據硬件資源和分區策略設置合理的消費者,數據量大時最優的消費者數量為分區總數。
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
test-group02 topic01 6 373460 1026328 652868 client-6_/192.168.1.13
test-group02 topic01 2 375660 1048756 673096 client-2_/192.168.1.13
test-group02 topic01 5 374625 1013157 638532 client-5_/192.168.1.13
test-group02 topic01 3 347001 1066967 719966 client-3_/192.168.1.13
test-group02 topic01 0 375570 1013261 637691 client-0_/192.168.1.13
test-group02 topic01 9 376545 1094088 717543 client-9_/192.168.1.13
test-group02 topic01 8 347082 1066948 719866 client-8_/192.168.1.13
test-group02 topic01 7 375100 1048827 673727 client-7_/192.168.1.13
test-group02 topic01 1 372447 1026467 654020 client-1_/192.168.1.13
test-group02 topic01 4 377052 1093926 716874 client-4_/192.168.1.13
5 總結
Kafka提供三種分配策略(RangeAssignor、RoundRobinAssignor、StickyAssignor),其中StickyAssignor策略是0.11.x 版本新增的,每種策略不盡相同,RangeAssignor策略以主題為單位,以數據順序排列可用分區,以字典順序排列消費者計算分配;RoundRobinAssignor 對所有分區和所有消費者循環均勻分配;但這兩種分配策略當有消費者終止或加入時均會觸發消費組平衡;StickyAssignor 策略當某個消費者終止時,盡可能保留現有分配,將已經終止的消費者所分配的分區移動到另一個消費者,避免全部分區重新平衡,節省開銷;對於topic分區數較多、數量較大使用StickyAssignor策略有較大優勢。
結合測試經驗的觀后感:
RangeAssignor 是針對單個 Topic 的分區進行排序分配的,分配完之后在增加消費者不會觸發再分配,導致后面增加的消費者無效。這個跟文中描述的再均衡有點不太一致
RoundRobinAssignor 的分配策略是將消費組內訂閱的所有 Topic 的分區及所有消費者進行排序后盡量均衡的分配,分配到的分區數的差值不會超過1。消費中間發現效率太低,增加消費者的時候會觸發在分配,可以達到再均衡的效果
StickyAssignor 考慮上一次的分配結果,1. 分區的分配盡量的均衡2. 每一次重分配的結果盡量與上一次分配結果保持一致
