打算在一個項目同時使用兩個consumer消費兩個topic,在配置文件中配置了consumer的默認groupid,未給兩個consumer指定各自的groupid,於是兩個consumer都使用同一個groupid
# 指定默認消費者group id
spring.kafka.consumer.group-id=test-message-group
但在斷點調試過程中發現兩個consumer偶爾正常工作,偶爾卻在不斷的rebanlance,並且伴隨着心跳發送失敗。特別在頻繁讀取數據或者斷點調試時間比較長的時候頻繁出現。而在一個consumer無法成功rebanlance時,無法消費數據。
異常狀態的輸出日志如下,一個consumer無法完成rebanlance,另一個conumser則無法發送心跳
[Consumer clientId=consumer-1, groupId=test-message-group] Attempt to heartbeat failed since group is rebalancing [Consumer clientId=consumer-1, groupId=test-message-group] Attempt to heartbeat failed since group is rebalancing [Consumer clientId=consumer-1, groupId=test-message-group] Attempt to heartbeat failed since group is rebalancing [Consumer clientId=consumer-2, groupId=test-message-group] (Re-)joining group [Consumer clientId=consumer-2, groupId=test-message-group] (Re-)joining group [Consumer clientId=consumer-1, groupId=test-message-group] Attempt to heartbeat failed since group is rebalancing [Consumer clientId=consumer-1, groupId=test-message-group] Attempt to heartbeat failed since group is rebalancing
兩個consumer正常狀態下的輸出日志如下,均顯示為 Successfully joined group with generation XXX
[Consumer clientId=consumer-1, groupId=test-message-group] (Re-)joining group [Consumer clientId=consumer-2, groupId=test-message-group] Successfully joined group with generation 125 [Consumer clientId=consumer-1, groupId=test-message-group] Successfully joined group with generation 125 [Consumer clientId=consumer-1, groupId=test-message-group] Setting newly assigned partitions: HolderMsg-0, HolderMsg-1, HolderMsg-2 [Consumer clientId=consumer-2, groupId=test-message-group] Setting newly assigned partitions: TcMsg-2, TcMsg-0, TcMsg-1 [Consumer clientId=consumer-1, groupId=test-message-group] Setting offset for partition HolderMsg-0 to the committed offset FetchPosition{offset=7, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=192.168.202.128:9092 (id: 0 rack: null), epoch=0}} [Consumer clientId=consumer-2, groupId=test-message-group] Setting offset for partition TcMsg-2 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=192.168.202.128:9092 (id: 0 rack: null), epoch=0}} [Consumer clientId=consumer-1, groupId=test-message-group] Setting offset for partition HolderMsg-1 to the committed offset FetchPosition{offset=5, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=192.168.202.128:9092 (id: 0 rack: null), epoch=0}} [Consumer clientId=consumer-2, groupId=test-message-group] Setting offset for partition TcMsg-0 to the committed offset FetchPosition{offset=2, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=192.168.202.128:9092 (id: 0 rack: null), epoch=0}} [Consumer clientId=consumer-1, groupId=test-message-group] Setting offset for partition HolderMsg-2 to the committed offset FetchPosition{offset=7, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=192.168.202.128:9092 (id: 0 rack: null), epoch=0}} [Consumer clientId=consumer-2, groupId=test-message-group] Setting offset for partition TcMsg-1 to the committed offset FetchPosition{offset=3, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=192.168.202.128:9092 (id: 0 rack: null), epoch=0}}
在反復的調試過程中,我意識到這兩個consumer反復重新加入group中,而他們的group是同一個,猜測是兩個consumer使用同一個groupid導致,相互影響。在分別為兩個consumer指定了單獨的groupid之后,異常的情況沒有再出現。
基於這種現象,查詢了一些資料,在一篇blog中找到了比較詳細的講解
https://olnrao.wordpress.com/2015/05/15/apache-kafka-case-of-mysterious-rebalances/
文章中提到,消費者在zookeeper中注冊中,消費者注冊標識符(Consumer Identifiers Registry)是保存在zookeeper的/consumers/[group_id]/ids/[consumer_connector_id]的路徑下,這些消費者注冊節點形成一棵樹,當有消費者加入或離開時,樹上所有的消費者都會被通知到,從而進行rebanlance。
消費者在zookeeper注冊的路徑與topic並沒有關系,反而與groupid綁定,這是因為同一個consumer可以消費不同的topic。如果不同的consumer使用同一個groupid消費不同的topic,而任何一個topic的consumer出現加入或離開等變化時,所有groupid組里的consumer都會發生rebanlance。從而可能導致上面調試時出現的問題。
所以kafka 不同的consumer需要使用不同的group id,以減小相互之間的影響。
