stom消費kafka消息速度慢的問題


原來代碼如下

        KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig.builder(kafka_server,
                "monmetric")
//                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
                .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200).setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
                .setRetry(newRetryService()).setOffsetCommitPeriodMs(10000).setFirstPollOffsetStrategy(LATEST)
                .setMaxUncommittedOffsets(250).build();

 主要問題出在setMaxUncommittedOffsets(250)上,該屬性默認值為1000w,其含義為:

它和另外一個參數有關:offset.commit.period.ms,這個參數是控制多久向 Kafka commit 一次。

maxUncommittedOffset = 1000 的執行過程是這樣的:

  1. 當我們消費了 1000條消息之后,達到了 maxUncommittedOffset,這個時候客戶端(Kafka Spout)已經不能再 poll 了,它要等待 commit;
  2. 此時 commit 的默認周期是 30000 ms(也就是 30 秒鍾提價一次),這意味着如果不改 commit 的周期,30s 只能消費 1000條。

這就導致了kafka消費慢的問題,我把參數改成默認值就能很快消費了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM