kafka提交異常org.apache.kafka.clients.consumer.CommitFailedException


來源  https://blog.csdn.net/charry_a/article/details/79892107

一、背景
       kafka使用版本為0.10.1.0,今天在測試kafkaConsumer的時候,代碼如下

public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.80.132:9092");
        props.put("group.id", "testId");
        props.put("enable.auto.commit", "false");
        props.put("session.timeout.ms", "10000");
        props.put("max.poll.interval.ms", "3000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("kafka-topic-02"));
        try {
            while(true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                System.out.println("*********開始休眠6s,模擬消息處理時間**********");
                Thread.sleep(6000);
                System.out.println("***************休眠結束*****************");
                consumer.commitSync();
            }
        }catch(Exception e) {
            e.printStackTrace();
        }finally {
            consumer.close();
        }
    一運行,報錯了,報錯如下,意思是我們手動提交的時候失敗了,因為消費者組開啟了rebalanced操作且將對應的分區分配給了其它消費者

*********開始休眠6s,模擬消息處理時間**********
***************休眠結束*****************
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:600)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:498)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1104)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1072)
    at com.tanjie.kafka.ConsumerDemo.main(ConsumerDemo.java:29)
      經過查看官網,發現參數max.poll.interval.ms(官網給得默認值為3000)的意思為,當我們從kafkaServer端poll消息時,poll()的調用之間的最大延遲。 這提供了消費者在獲取更多記錄之前可以空閑的時間量的上限。 如果在此超時到期之前未調用poll(),則認為使用者失敗,並且消費者組將重新平衡以便將分區重新分配給其他消費者,而恰好這里我們設置了Thread.sleep(6000) > max.poll.interval.ms值,也就是我們在手動提交的時候,實際上分區信息已經被分配到整個消費者組里面的其它消費者了。

     了解了上面的原因,修改也很簡單,第一是增大max.poll.interval.ms的值,不過我們在實際的生產中,可以測試一下,假如你每次poll 100條消息,每條消息處理N/s 那么最好將max.poll.interval.ms值設置為 > 100*N,這樣就不會出現上面的異常了。

     經過修改后代碼如下

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.80.132:9092");
props.put("group.id", "testId");
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "10000");
props.put("max.poll.interval.ms", "3000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("kafka-topic-02"));
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100);
System.out.println("*********開始休眠2s,模擬消息處理時間**********");
Thread.sleep(2000);
System.out.println("***************休眠結束*****************");
for(ConsumerRecord<String,String> record : records) {
System.out.println("offest:" + record.offset() + ";value: " + record.value());
}
consumer.commitSync();
}
}catch(Exception e) {
e.printStackTrace();
}finally {
consumer.close();
}
}
     運行結果如下:

*********開始休眠2s,模擬消息處理時間**********
***************休眠結束*****************
*********開始休眠2s,模擬消息處理時間**********
***************休眠結束*****************
offest:18;value: 3f85df8b-4ec6-4b9d-be22-0c269ead55a9
offest:19;value: 34d8b20d-eb24-4cd5-a249-75b41a6fda5b
offest:20;value: dcc4bd2f-912d-4fda-b141-7c759ea3dc24
offest:21;value: d9a7db66-d9b5-4c0f-9287-f0197a7dcdf0
offest:14;value: 2dfab504-da05-4f38-a7be-cb1e0c7a8ea9
offest:15;value: c3170f1b-bbe6-4c58-8fda-c7a548ca4e3c
offest:18;value: 966faa1c-4cf1-44cd-a63e-818940fcb26a
offest:19;value: 7bb5b5aa-bd74-4c2c-9008-58714028a882
offest:20;value: 0f79cacf-c558-4696-985c-0ea0949acd4e
offest:21;value: 471f54a0-529f-4f54-b9fb-706c042839e2
*********開始休眠2s,模擬消息處理時間**********
***************休眠結束*****************
*********開始休眠2s,模擬消息處理時間**********
***************休眠結束*****************
*********開始休眠2s,模擬消息處理時間**********

————————————————
版權聲明:本文為CSDN博主「農村外出務工男」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/charry_a/article/details/79892107


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM