kafka數據如何被重復消費


近段時間學習極客時間李玥老師的后端存儲實戰課時,看到一個很多意思的東西:用kafka存儲點擊流的數據,並重復處理。在以往的使用中,kafka只是一個消息傳輸的載體,消息被消費后就不能再次消費。新知識與印象相沖突,於是就有了本篇文章:kafka數據如何被重復消費。

前期理論了解

首先我先去官網糾正了我對kafka的整體了解。

官網對kafka的描述是:一個分布式流平台。怪自己的學藝不精。

其次,我重新看了一下kafka消費者的消費過程:kafka首先通過push/poll(默認為poll)獲取消息,接收消息處理完成后手動/自動提交消費成功,kafka服務器則根據提交情況決定是否移動當前偏移量。

方案確定

kafka消費者讀取數據的位置是通過偏移量判斷,那如果我能將偏移量手動設置為起始位置,就能實現重復消費?這個有搞頭。

如何手動設置偏移量是關鍵。

show me the code

代碼的關鍵主要在於偏移量設置 api 的調用,其余沒什么特別。

要注意的是,代碼中我分別調用了作用不同的設置偏移量,僅作為展示,可按需取用。

最后消費者消息消息時,我只使用默認的拉取條數設置消費一次,可按需進行修改。

/**
 * repeat kafka message
 * @param host kafka host
 * @param groupId kafka consumer group id
 * @param autoCommit whether auto commit consume
 * @param topic consume topic
 * @param consumeTimeOut consume time out
*/
    private void textResetOffset(String host, String groupId, Boolean autoCommit, String topic, Long consumeTimeOut){
        //form a properties to new consumer
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit.toString());
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //subscribe incoming topic
        consumer.subscribe(Collections.singletonList(topic));
        //get consumer consume partitions
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        List<TopicPartition> topicPartitions = new ArrayList<>();
        for(PartitionInfo partitionInfo : partitionInfos){
            TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
            topicPartitions.add(topicPartition);
        }
        // poll data from kafka server to prevent lazy operation
        consumer.poll(Duration.ofSeconds(consumeTimeOut));
        //reset offset from beginning
        consumer.seekToBeginning(topicPartitions);
        //reset designated partition offset by designated spot
        int offset = 20;
        consumer.seek(topicPartitions.get(0), offset);
        //reset offset to end
        consumer.seekToEnd(topicPartitions);
        //consume message as usual
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
        Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
        while (iterator.hasNext()){
            ConsumerRecord<String, String> record = iterator.next();
            log.info("consume data: {}", record.value());
        }
    }
運行結果

需注意的點

在手動設置偏移量時,遇到了一個exception

java.lang.IllegalStateException: No current assignment for partition test-0

翻了一下stackoverflow以及官方文檔后,才了解到設置偏移量是一個lazy operation,官網的解釋如下。

Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the first offset in all partitions only when poll(long) or position(TopicPartition) are called. If no partition is provided, seek to the first offset for all of the currently assigned partitions.

於是我先進行一次 poll 操作后再設置偏移量。

    本文首發於 cartoon的博客
    轉載請注明出處:https://cartoonyu.github.io/cartoon-blog/post/message-queue/kafka數據如何被重復消費/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM