Kafka消費端數據過濾方案


前言:

kafka一些常用命令:

cd /opt/module/kafka

查看kafka主題:

./kafka-topics.sh --list --zookeeper localhost:2181

 查看主題詳情

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafkaTopic1

kafka消費端數據過濾方案:

在生產端不做配置,只管按特定主題生產數據。

在消費端,對特定主題數據進行分組獲取,然后在獲取過程中對符合業務條件的數據進行處理,否則跳過,但還是會告訴kafka我已經消費過了。

示例代碼:

生產端:

 public String hello()
    {
        Properties props = new Properties();

        props.put("bootstrap.servers", "192.168.11.101:9092");
        props.put("acks", "all");
        props.put("retries", "5");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //生產者實例
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        x++;
        producer.send(new ProducerRecord<String, String>(topic, "key1", "key1->value1:"+x));
        producer.send(new ProducerRecord<String, String>(topic, "key1", "key1->value11:"+x));
        producer.send(new ProducerRecord<String, String>(topic, "key2", "key2->value2:"+x));
        producer.send(new ProducerRecord<String, String>(topic, "key2", "key2->value22:"+x));
        producer.send(new ProducerRecord<String, String>(topic, "key3", "key3->value3:"+x));
        producer.send(new ProducerRecord<String, String>(topic, "key3", "key3->value33:"+x));


        return "ok";
    }

消費端:

 @KafkaListeners(value = {@KafkaListener(topics = {"kafkaTopic1"},groupId = "group1")})
    public void aaaa(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
        String processRecord = (String) record.value();
        String x = (String)record.key();
        if(x.equals("key1"))
        {
            System.out.printf("group1獨立獲取key1:"+processRecord+"\r\n");

        }
        acknowledgment.acknowledge();
    }
    @KafkaListeners(value = {@KafkaListener(topics = {"kafkaTopic1"},groupId = "group2")})
    public void bbbb(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
        String processRecord = (String) record.value();
        String x = (String)record.key();
        if(x.equals("key2"))
        {
            System.out.printf("group2獨立獲取key2:"+processRecord+"\r\n");

        }
        acknowledgment.acknowledge();
    }
   @KafkaListeners(value = {@KafkaListener(topics = {"kafkaTopic1"},groupId = "group3")})
    public void ccc(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
        String processRecord = (String) record.value();
        String x = (String)record.key();
        if(x.equals("key3"))
        {
            System.out.printf("group3獨立獲取key3:"+processRecord+"\r\n");

        }
        acknowledgment.acknowledge();
    }

其中,group1和group2在調試環境下運行,group3打包成jar后,在命令行窗口中運行,在調用生產端產生數據后,消費端的消費情況如下:

完全符合預期。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM