Kafka消费者组再均衡问题


在Kafka中,当有新消费者加入或者订阅的topic数发生变化时,会触发Rebalance(再均衡:在同一个消费者组当中,分区的所有权从一个消费者转移到另外一个消费者)机制,Rebalance顾名思义就是重新均衡消费者消费。Rebalance的过程如下:

第一步:所有成员都向coordinator发送请求,请求入组。一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。

第二步:leader开始分配消费方案,指明具体哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。

所以对于Rebalance来说,Coordinator起着至关重要的作用,那么怎么查看消费者对应的Coordinator呢,我们知道某个消费者组对应__consumer_offsets中的哪个Partation是通过hash计算出来的:partation=hash("test_group_1")%50=28,表示test_group_1这个消费者组属于28号partation,通过命令:

 

./kafka-topics.sh --zookeeper 192.168.33.11:2181 --describe --topic __consumer_offsets

可以找到28号Partation所对应的信息:

从而可以知道coordinator对应的broker为1

 

在Rebalance期间,消费者会出现无法读取消息,造成整个消费者群组一段时间内不可用,假设现在消费者组当中有A,代码逻辑执行10s,如果消费者组在消费的过程中consumer B加入到了该消费者组,并且B的代码逻辑执行20s,那么当A处理完后先进入Rebalance状态等待,只有当B也处理完后,A和B才真正通过Rebalance重新分配,这样显然A在等待的过程中浪费了资源。

消费者A:

 1 """
 2 consumer_rebalance_a.py a消费者  3 """
 4 import pickle  5 import uuid  6 import time  7 from kafka import KafkaConsumer  8 from kafka.structs import TopicPartition, OffsetAndMetadata  9 from kafka import ConsumerRebalanceListener 10 
11 consumer = KafkaConsumer( 12     bootstrap_servers=['192.168.33.11:9092'], 13     group_id="test_group_1", 14     client_id="{}".format(str(uuid.uuid4())), 15     enable_auto_commit=False, 16     key_deserializer=lambda k: pickle.loads(k), 17     value_deserializer=lambda v: pickle.loads(v) 18 ) 19 
20 # 用来记录最新的偏移量信息.
21 consumer_offsets = {} 22 
23 
24 class MineConsumerRebalanceListener(ConsumerRebalanceListener): 25     def on_partitions_revoked(self, revoked): 26         """
27  再均衡开始之前 下一轮poll之前触发 28  :param revoked: 29  :return: 30         """
31         print('再均衡开始之前被自动触发.') 32         print(revoked, type(revoked)) 33         consumer.commit_async(offsets=consumer_offsets) 34 
35     def on_partitions_assigned(self, assigned): 36         """
37  再均衡完成之后 即将下一轮poll之前 触发 38  :param assigned: 39  :return: 40         """
41         print('在均衡完成之后自动触发.') 42         print(assigned, type(assigned)) 43 
44 
45 consumer.subscribe(topics=('round_topic',), listener=MineConsumerRebalanceListener()) 46 
47 
48 def _on_send_response(*args, **kwargs): 49     """
50  提交偏移量涉及回调函数 51  :param args: 52  :param kwargs: 53  :return: 54     """
55     if isinstance(args[1], Exception): 56         print('偏移量提交异常. {}'.format(args[1])) 57     else: 58         print('偏移量提交成功') 59 
60 
61 try: 62     start_time = time.time() 63     while True: 64         # 再均衡其实是在poll之前完成的
65         consumer_records_dict = consumer.poll(timeout_ms=100) 66 
67         # 处理逻辑.
68         for k, record_list in consumer_records_dict.items(): 69             for record in record_list: 70                 print("topic = {},partition = {},offset = {},key = {},value = {}".format( 71  record.topic, record.partition, record.offset, record.key, record.value) 72  ) 73 
74  consumer_offsets[ 75  TopicPartition(record.topic, record.partition) 76                 ] = OffsetAndMetadata( 77                     record.offset + 1, metadata='偏移量.'
78  ) 79 
80         try: 81             consumer.commit_async(callback=_on_send_response) 82             time.sleep(10) 83         except Exception as e: 84             print('commit failed', str(e)) 85 
86 except Exception as e: 87     print(str(e)) 88 finally: 89     try: 90         # 同步提交偏移量,在消费者异常退出的时候再次提交偏移量,确保偏移量的提交.
91  consumer.commit() 92         print("同步补救提交成功") 93     except Exception as e: 94         consumer.close()

 消费者B:

 1 """
 2 consumer b.py 消费者B  3 """
 4 
 5 import pickle  6 import uuid  7 import time  8 from kafka import KafkaConsumer  9 from kafka.structs import TopicPartition, OffsetAndMetadata  10 from kafka import ConsumerRebalanceListener  11 
 12 consumer = KafkaConsumer(  13     bootstrap_servers=['192.168.33.11:9092'],  14     group_id="test_group_1",  15     client_id="{}".format(str(uuid.uuid4())),  16     enable_auto_commit=False,  # 设置为手动提交偏移量.
 17     key_deserializer=lambda k: pickle.loads(k),  18     value_deserializer=lambda v: pickle.loads(v)  19 )  20 
 21 consumer_offsets = {}  # 用来记录最新的偏移量信息.
 22 
 23 
 24 class MineConsumerRebalanceListener(ConsumerRebalanceListener):  25     def on_partitions_revoked(self, revoked):  26         """
 27  再均衡开始之前 下一轮poll之前触发  28  :param revoked:  29  :return:  30         """
 31         print('再均衡开始之前被自动触发.')  32         print(revoked, type(revoked))  33         consumer.commit_async(offsets=consumer_offsets)  34 
 35     def on_partitions_assigned(self, assigned):  36         """
 37  再均衡完成之后 即将下一轮poll之前 触发  38  :param assigned:  39  :return:  40         """
 41 
 42         print('在均衡完成之后自动触发.')  43         print(assigned, type(assigned))  44 
 45 
 46 consumer.subscribe(topics=('round_topic',), listener=MineConsumerRebalanceListener())  47 
 48 
 49 def _on_send_response(*args, **kwargs):  50     """
 51  提交偏移量涉及回调函数  52  :param args:  53  :param kwargs:  54  :return:  55     """
 56 
 57     if isinstance(args[1], Exception):  58         print('偏移量提交异常. {}'.format(args[1]))  59     else:  60         print('偏移量提交成功')  61 
 62 
 63 try:  64     start_time = time.time()  65     while True:  66         # 再均衡其实是在poll之前完成的
 67         consumer_records_dict = consumer.poll(timeout_ms=100)  68 
 69         record_num = 0  70         for key, record_list in consumer_records_dict.items():  71             for record in record_list:  72                 record_num += 1
 73         print("---->当前批次获取到的消息个数是:{}".format(record_num))  74 
 75         # 处理逻辑.
 76         for k, record_list in consumer_records_dict.items():  77             for record in record_list:  78                 print("topic = {},partition = {},offset = {},key = {},value = {}".format(  79  record.topic, record.partition, record.offset, record.key, record.value)  80  )  81 
 82  consumer_offsets[  83  TopicPartition(record.topic, record.partition)  84                 ] = OffsetAndMetadata(record.offset + 1, metadata='偏移量.')  85 
 86         try:  87             # 轮询一个batch 手动提交一次
 88             consumer.commit_async(callback=_on_send_response)  89             time.sleep(20)  90         except Exception as e:  91             print('commit failed', str(e))  92 
 93 except Exception as e:  94     print(str(e))  95 finally:  96     try:  97         # 同步提交偏移量,在消费者异常退出的时候再次提交偏移量,确保偏移量的提交.
 98  consumer.commit()  99         print("同步补救提交成功") 100     except Exception as e: 101         consumer.close()

消费者A和消费者B是同一个消费者组(test_group_1)的两个消费者,用time.sleep的方式模拟执行时间,A:10s,B:20s;首先A开始消费,当B新加入消费者组的时候会触发Rebalance,可以通过实现再均衡监听器(RebalanceListener)中的on_partitions_revoked和on_partitions_assigned方法来查看再均衡触发前后的partition变化情况,依次启动消费者A和B之后:

消费者A: 再均衡开始之前被自动触发. {TopicPartition(topic='round_topic', partition=0), TopicPartition(topic='round_topic', partition=1), TopicPartition(topic='round_topic', partition=2)} <class 'set'>
<----------------------------------------
----------------------------------------> 在均衡完成之后自动触发. {TopicPartition(topic='round_topic', partition=0), TopicPartition(topic='round_topic', partition=1)} <class 'set'>
<---------------------------------------- 消费者B: 再均衡开始之前被自动触发. set() <class 'set'>
<----------------------------------------
----------------------------------------> 在均衡完成之后自动触发. {TopicPartition(topic='round_topic', partition=2)} <class 'set'>
<----------------------------------------

在等待B的逻辑执行完后,A和B进入再均衡状态;再均衡前A处于partition 0、1、 2三个分区,B不占有任何partition;当再均衡结束后,A占有partition 0、1,B占有partition 2;然后A和B分别开始消费对应的partition。

在上述消费者A和B的代码中重写了RebalanceListener,主要是为了在发生再均衡之前提交最后一个已经处理记录的偏移量,因为再均衡时消费者将失去对一个分区的所有权,如果消费者已经消费了当前partition还没提交offset,这时候发生再均衡会使得消费者重新分配partition,可能使得同一个消息先后被两个消费者消费的情况,实现MineConsumerRebalanceListener再均衡前提交一次offset,确保每一个消费者在触发再均衡前提交最后一次offset:

 1 class MineConsumerRebalanceListener(ConsumerRebalanceListener):  2     def on_partitions_revoked(self, revoked):  3         """
 4  再均衡开始之前 下一轮poll之前触发  5  :param revoked:  6  :return:  7         """
 8         print('再均衡开始之前被自动触发.')  9         print(revoked, type(revoked)) 10         consumer.commit_async(offsets=consumer_offsets) 11 
12     def on_partitions_assigned(self, assigned): 13         """
14  再均衡完成之后 即将下一轮poll之前 触发 15  :param assigned: 16  :return: 17         """
18 
19         print('在均衡完成之后自动触发.') 20         print(assigned, type(assigned))

 

再均衡发生的场景有以下几种:

1. 组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了)
2. 订阅主题数发生变更,如果你使用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance
3. 订阅主题的分区数发生变更
鉴于触发再均衡后会造成资源浪费的问题,所以我们尽量不要触发再均衡

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM