一般在SpringBoot使用kafka,通常用
@KafkaListener
注解來進行監聽消費。然而某些時候,我們不需要監聽而要以定時拉取的方式進行消費,本文主要就是簡單記錄此方式的實現方法。
//批次大小
private static Integer batchSize = 3;
//批次時間
private static Integer batchTime = 5;
@Resource
private KafkaProperties kafkaProperties;
@Test
void kafkaTest() {
//配置消費者
Map<String, Object> properties = kafkaProperties.buildConsumerProperties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");//指定消費組
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, batchSize); //指定批次消費條數
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); //禁用自動提交
//建立消費者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
//獲取所有partition信息
List<PartitionInfo> partitionList = kafkaConsumer.partitionsFor("test-consumer");
Map<TopicPartition, Integer> topicPartitionMap = MapUtil.newHashMap();
partitionList.forEach(item
-> topicPartitionMap.put(new TopicPartition(item.topic(), item.partition()), item.partition()));
//訂閱topic並設置起始offset
kafkaConsumer.assign(topicPartitionMap.keySet());
topicPartitionMap.forEach(kafkaConsumer::seek);
//啟動消費線程(僅用作示例)
((Runnable) () -> {
Duration duration = Duration.ofSeconds(batchTime);
long batchTimeMs = batchTime * 1000L;
Map<Integer, ConsumerRecord<String, String>> recordMap = MapUtil.newHashMap();
while (true) {
try {
TimeInterval interval = DateUtil.timer();
ConsumerRecords<String, String> records = kafkaConsumer.poll(duration);
int count = records.count();
log.info("測試消費獲取到數據 => {} 條", count);
if (count > 0) {
//處理數據
List<String> values = CollUtil.newArrayList();
records.forEach(item -> values.add(item.value()));
//記錄當前批次每個Partition最小offset
for (ConsumerRecord<String, String> item : records) {
values.add(item.value());
if (recordMap.containsKey(item.partition())) {
ConsumerRecord<String, String> original = recordMap.get(item.partition());
if (item.offset() < original.offset()) {
recordMap.put(item.partition(), item);
}
} else {
recordMap.put(item.partition(), item);
}
}
//執行業務,拋出異常
throw new RuntimeException("測試錯誤");
//同步提交offset
kafkaConsumer.commitSync();
//正常提交后清除記錄
recordMap.clear();
}
//批次消費達到上限,不休眠直接進行下一次消費
if (batchSize == count) {
continue;
}
//計算消費耗時並休眠
long used = interval.intervalMs();
if (used < batchTimeMs) {
ThreadUtil.safeSleep(batchTimeMs - used);
}
} catch (Exception e) {
log.error("消費出錯 => {}", e.getMessage());
recordMap.forEach((k, v) -> kafkaConsumer.seek(new TopicPartition(v.topic(), v.partition()), v.offset()));
log.error(ExceptionUtil.stacktraceToString(e));
ThreadUtil.safeSleep(batchTimeMs);
}
}
}).run();
}
(備注:主要涉及依賴:spring-kafka、hutool)
文章轉載自我的個人博客:https://blog.fordes.top,歡迎訪問交流,文章如有謬誤請務必指出~