一、含義
CommitFailedException異常:位移提交失敗時候拋出的異常。通常該異常被拋出時還會攜帶這樣的一段話:
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
簡單翻譯一下,“位移提交失敗,原因是消費者組開啟了rebalance且已然分配對應分區給其他消費者。這表明poll調用間隔超過了max.poll.interval.ms的值,這通常表示poll循環中的消息處理花費了太長的時間。解決方案有兩個:1. 增加session.timeout.ms值;2. 減少max.poll.records值”
下面這段話完全是我自己的理解,請謹慎聽取~~
在我看來,上面英文中的最后一句話實際上依然是0.10.0.0或之前版本時的解決之道,因為在那些版本中尚未提供max.poll.interval.ms參數,因此session.timeout.ms既用於失敗檢測,也用於控制消息處理時間,同時還承擔着rebalance過程的超時控制。在0.10.1.0版本時社區對該參數的含義進行了解耦,推出了max.poll.interval.ms參數。實際上,在0.10.1.0.0或之后的版本中,作者推薦用戶將session.timeout.ms設置一個很小的值,比如5s,但需要把max.poll.interval.ms設置成平均的消息處理時間。舉個例子,假設你一次poll調用返回的消息數是N,你處理每條消息的平均時間是t0,那么你需要設置max.poll.interval.ms稍稍大於N * t0以保證poll調用間隔不會超過該閾值。
如此來看,上面英文最后一句話中的第一個解決辦法應該修改成:增加max.poll.interval.ms值,而非session.timeout.ms
二、拋出時機
從源代碼方面說,CommitFailedException異常通常發生在手動提交位移時,即用戶顯式調用KafkaConsumer.commitSync()方法。從使用場景來說,有兩種場景可以拋出該異常
2.1 消息處理時間>max.poll.interval.ms時: 如前所述,這是該異常最“正宗”的出現場景。復現也比較容易,用戶只需寫一個consumer程序,訂閱topic(即使用consumer.subscribe),設置max.poll.interval.ms=N,然后在consumer.poll循環中Thread.sleep(>N),之后手動提交位移即可復現,比如:
... props.put("max.poll.interval.ms", 5000); consumer.subscribe(Arrays.asList("topic1", "topic2", ...)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); // 處理消息 Thread.sleep(6000L); consumer.commitSync(); }
2.2 standalone consumer與consumer group沖突時:這里所說的standalone consumer指的是使用KafkaConsumer.assign()而非subscribe()的消費者。當用戶系統中同時出現了standalone consumer和consumer group,並且它們的group id相同時,此時standalone consumer手動提交位移時就會立刻拋出此異常。這是因為group coordinator無法識別這個具有相同group id的consumer,從而向它返回“你不是一個合法的組成員”錯誤。目前Kafka consumer提交位移的代碼中一旦碰到這個錯誤會立即拋出CommitFailedException。
三、個人對CommitFailedException的看法
針對上面的第二種場景,我覺得初始的英文描述中完全沒有提及,這實際上是該異常表述不清晰的一個表現。因為在提交位移的源代碼中broker端返回“無效組成員”后,coordinator有可能認為這是一個新的成員,需要批准它加入組。這對於正常的組rebalance流程來說並沒有什么問題,但對於standalone consumer而言該邏輯就顯得有點不適用了。縱然不修改這個邏輯,至少也要完善CommitFailedException的表述,把這種情況加到異常說明里面。這樣用戶就能明確知曉誘發這種異常的所有場景,而不是像現在這樣:只能嘗試修改max.poll.records或max.poll.interval.ms。要知道對於第二種情況,無論用戶如何設置max.poll.interval.ms或max.poll.records都無法規避。
當然,考慮到consumer端會拋出“The coordinator is not aware of this member”錯誤顯式提醒用戶此問題,因此假設給社區提了jira,開發人員估計也會以此為由給“拍”回來,不過依然還是有優化的空間的。比如在輸出日志中更加清晰提醒用戶這個錯誤是因為用戶的某個“粗心”的standalone consumer“無意冒犯”了一個consumer group而導致的錯誤。