stom消费kafka消息速度慢的问题


原来代码如下

        KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig.builder(kafka_server,
                "monmetric")
//                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
                .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200).setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
                .setRetry(newRetryService()).setOffsetCommitPeriodMs(10000).setFirstPollOffsetStrategy(LATEST)
                .setMaxUncommittedOffsets(250).build();

 主要问题出在setMaxUncommittedOffsets(250)上,该属性默认值为1000w,其含义为:

它和另外一个参数有关:offset.commit.period.ms,这个参数是控制多久向 Kafka commit 一次。

maxUncommittedOffset = 1000 的执行过程是这样的:

  1. 当我们消费了 1000条消息之后,达到了 maxUncommittedOffset,这个时候客户端(Kafka Spout)已经不能再 poll 了,它要等待 commit;
  2. 此时 commit 的默认周期是 30000 ms(也就是 30 秒钟提价一次),这意味着如果不改 commit 的周期,30s 只能消费 1000条。

这就导致了kafka消费慢的问题,我把参数改成默认值就能很快消费了。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM