依據Partition和Consumer的Rebalance策略,找到Kafka.Client Rebalance代碼塊,還原本地環境,跟蹤調試,發現自定義Consumer Group 的Consumer並沒有分配到PartionID,如下圖、
據此,基本就可以定位到不同組Consumer無法覆蓋Partition的問題根源了。
仔細閱讀Rebalance代碼,發現Kafka.Client 在獲取consumer時,並沒有根據Group做篩選,獲取到的是所有組的Consumer,如下圖
(此處只有兩個不同組的Consumer類型,每個Consumer會生成五個ConsumerThreadID,用於覆蓋partition)
定位curConsumer變量,從consumerPerTopicMap中獲取,找到consumerPerTopicMap的實現
此處確實沒有過濾
增加group過濾代碼。問題解決。