原來代碼如下
KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig.builder(kafka_server, "monmetric") // .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200).setRecordTranslator(JUST_VALUE_FUNC, new Fields("str")) .setRetry(newRetryService()).setOffsetCommitPeriodMs(10000).setFirstPollOffsetStrategy(LATEST) .setMaxUncommittedOffsets(250).build();
主要問題出在setMaxUncommittedOffsets(250)上,該屬性默認值為1000w,其含義為:
它和另外一個參數有關:offset.commit.period.ms,這個參數是控制多久向 Kafka commit 一次。
maxUncommittedOffset = 1000 的執行過程是這樣的:
- 當我們消費了 1000條消息之后,達到了 maxUncommittedOffset,這個時候客戶端(Kafka Spout)已經不能再 poll 了,它要等待 commit;
- 此時 commit 的默認周期是 30000 ms(也就是 30 秒鍾提價一次),這意味着如果不改 commit 的周期,30s 只能消費 1000條。
這就導致了kafka消費慢的問題,我把參數改成默認值就能很快消費了。