本地啟動kafka后,不斷報一下信息:
表示本地consumer節點在不斷的重新加入group,並且不斷伴隨着offset commit失敗。
具體原因是因為ConsumerCoordinator沒有向GroupCoordinator在規定的時間內同步心跳導致GroupCoordinator以為本地consumer節點掛掉了,引發了partition在consumerGroup里的rebalance。當rebalance后,之前該consumer擁有的分區和offset信息就失效了,同時導致不斷的報auto offset commit failed。
為什么ConsumerCoordinator會同步心跳失敗呢,那是因為我把consumer的maxPollIntervalMs設置得太小,1s,而kafka默認是300s。
這個參數的作用主要是如下:
分割----------------------------------------------------------------------------------------------------------------------
最近用kafka 做一個監控
通過flume采集數據推給kafka producer ,再由consumer來消費,過了一天發現消息隊列有堆積,
去查日志發現報錯:
Auto offset commit failed for group 0: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
查了一下
大概的意思是消費者的消費速度小於生產者的產生速度,所以導致了消息堆積,但是通過之前的測試可以知道.消費者的速度其實是遠大於生產者的. 那么是什么原因呢? 真相只有一個:重復消費!
consumer在堆積的消息隊列中拿出部分消息來消費,但是這個拿取是隨機的(不確定,目前看來是),如果生產者一次產生的數據量過大,1秒鍾5W條,那么consumer是有可能在規定的時間內(session.timeout.ms)消費不完的.
如果設置的是自動提交(enable.auto.commit),就會出現提交失敗的情況,提交失敗就會回滾,這部分數據就相當於沒有被消費過,然后consumer繼續去拿數據如果還沒消費完就還是回滾,這樣循環下去.(但是我發現有時候數據還是會被消費的,比如說數據小於兩萬條/次,所以我說取得數量是隨機的,應該可以設置).
那么就目前解決方法來看:
一個是增加session.timeout.ms的時間,
一個是設置不要自動提交(enable.auto.commit=false).
最后我采取的方案是不要自動提交,因為如果隨着我增加session.timeout.ms的時間,取得數據量也增加了那還是會超時(而且我也不知道怎么設置每次取得數量)
最后成功解決問題.
接觸kafka時間不足一天,以上是本人一些愚見.
附spring-kafka配置圖
原文鏈接:https://blog.csdn.net/StrideBin/article/details/78040177
原文鏈接:https://blog.csdn.net/pml18710973036/article/details/87207071