kafka手動拉取消費Topic


一般在SpringBoot使用kafka,通常用@KafkaListener注解來進行監聽消費。然而某些時候,我們不需要監聽而要以定時拉取的方式進行消費,本文主要就是簡單記錄此方式的實現方法。


//批次大小
private static Integer batchSize = 3;
//批次時間
private static Integer batchTime = 5;

@Resource
private KafkaProperties kafkaProperties;

@Test
void kafkaTest() {

    //配置消費者
    Map<String, Object> properties = kafkaProperties.buildConsumerProperties();
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");//指定消費組
    properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, batchSize); //指定批次消費條數
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); //禁用自動提交
    //建立消費者
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    //獲取所有partition信息
    List<PartitionInfo> partitionList = kafkaConsumer.partitionsFor("test-consumer");
    Map<TopicPartition, Integer> topicPartitionMap = MapUtil.newHashMap();
    partitionList.forEach(item
            -> topicPartitionMap.put(new TopicPartition(item.topic(), item.partition()), item.partition()));
    //訂閱topic並設置起始offset
    kafkaConsumer.assign(topicPartitionMap.keySet());
    topicPartitionMap.forEach(kafkaConsumer::seek);

    //啟動消費線程(僅用作示例)
    ((Runnable) () -> {
        Duration duration = Duration.ofSeconds(batchTime);
        long batchTimeMs = batchTime * 1000L;
        Map<Integer, ConsumerRecord<String, String>> recordMap = MapUtil.newHashMap();
        while (true) {
            try {
                TimeInterval interval = DateUtil.timer();
                ConsumerRecords<String, String> records = kafkaConsumer.poll(duration);

                int count = records.count();
                log.info("測試消費獲取到數據 => {} 條", count);
                if (count > 0) {
                    //處理數據
                    List<String> values = CollUtil.newArrayList();
                    records.forEach(item -> values.add(item.value()));
                    //記錄當前批次每個Partition最小offset
                    for (ConsumerRecord<String, String> item : records) {
                        values.add(item.value());
                        if (recordMap.containsKey(item.partition())) {
                            ConsumerRecord<String, String> original = recordMap.get(item.partition());
                            if (item.offset() < original.offset()) {
                                recordMap.put(item.partition(), item);
                            }
                        } else {
                            recordMap.put(item.partition(), item);
                        }
                    }
                    //執行業務,拋出異常
                    throw new RuntimeException("測試錯誤");
                    //同步提交offset
                    kafkaConsumer.commitSync();
                    //正常提交后清除記錄
                    recordMap.clear();
                }

                //批次消費達到上限,不休眠直接進行下一次消費
                if (batchSize == count) {
                    continue;
                }
                //計算消費耗時並休眠
                long used = interval.intervalMs();
                if (used < batchTimeMs) {
                    ThreadUtil.safeSleep(batchTimeMs - used);
                }

            } catch (Exception e) {
                log.error("消費出錯 => {}", e.getMessage());
                recordMap.forEach((k, v) -> kafkaConsumer.seek(new TopicPartition(v.topic(), v.partition()), v.offset()));
                log.error(ExceptionUtil.stacktraceToString(e));
                ThreadUtil.safeSleep(batchTimeMs);
            }
        }
    }).run();
}

(備注:主要涉及依賴:spring-kafkahutool)


文章轉載自我的個人博客:https://blog.fordes.top,歡迎訪問交流,文章如有謬誤請務必指出~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM