Spring Boot 整合 KafkaTemplate


具體目錄:

一、生產者實踐

  • 普通生產者

  • 帶回調的生產者

  • 自定義分區器

  • kafka事務提交

二、消費者實踐

  • 簡單消費

  • 指定topic、partition、offset消費

  • 批量消費

  • 監聽異常處理器

  • 消息過濾器

  • 消息轉發

  • 定時啟動/停止監聽器

 

package com.ylt.kafkaspringboot.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.*;


@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;


    // 發送消息
    @GetMapping("/kafka/normal/{message}")
    public void sendMessage1(@PathVariable("message") String normalMessage) {
        kafkaTemplate.send("topic1", normalMessage);
    }


    /**
     * 發送消息,消息發送成功就執行回調方法,參數包含發送消息參數。  流程:先發送成功,listener 后續才去topic中監聽執行消費
     * @param callbackMessage
     */

    @GetMapping("/kafka/callbackOne/{message}")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendMessage2(@PathVariable("message") String callbackMessage) {
        kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
            // 消息發送到的topic
            String topic = success.getRecordMetadata().topic();
            // 消息發送到的分區
            int partition = success.getRecordMetadata().partition();
            // 消息在分區內的offset
            long offset = success.getRecordMetadata().offset();
            System.out.println("發送消息成功:" + topic + "-" + partition + "-" + offset);
        }, failure -> {
            System.out.println("發送消息失敗:" + failure.getMessage());
        });
    }


    @GetMapping("/kafka/callbackTwo/{message}")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendMessage3(@PathVariable("message") String callbackMessage) {
        kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("發送消息失敗:"+ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                System.out.println("發送消息成功:" + result.getRecordMetadata().topic() + "-"
                        + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
            }
        });
    }

    // 第一種 配置事務
    @GetMapping("/kafka/transaction")
    public void sendMessage7(){
        //使用executeInTransaction 需要在yml中配置事務參數,配置成功才能使用executeInTransaction方法,且運行報錯后回滾
        kafkaTemplate.executeInTransaction(operations->{
             operations.send("topic1","test executeInTransaction");
             throw new RuntimeException("fail");
        });

        //沒有在yml配置事務,這里就會出現消息發送成功,異常也出現了。如果配置事務,則改用executeInTransaction 替代send方法
        kafkaTemplate.send("topic1","test executeInTransaction");
        throw new RuntimeException("fail");
    }


    //第二種 配置事務 (注解方式)
    // [1] 需要在yml 配置 transaction-id-prefix: kafka_tx.
    // [2] 在方法上添加@Transactional(rollbackFor = RuntimeException.class)  做為開啟事務並回滾
    // [3] 在注解方式中 任然可以使用.send方法,不需要使用executeInTransaction方法
    @GetMapping("/send2/{input}")
    @Transactional(rollbackFor = RuntimeException.class)
    public String sendToKafka2(@PathVariable String input){
//        this.template.send(topic,input);
        //事務的支持

        kafkaTemplate.send("topic1",input);
        if("error".equals(input))
        {
            throw new RuntimeException("input is error");
        }
        kafkaTemplate.send("topic1",input+"anthor");

        return "send success!"+input;

    }

}

 

@Component
public class KafkaConsumer {


    // 消費監聽
/*
    @KafkaListener(topics = {"topic1"})
    public void onMessage1(ConsumerRecord<?, ?> record){
        // 消費的哪個topic、partition的消息,打印出消息內容
        System.out.println("簡單消費:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }*/



    /**
     * @Title 指定topic、partition、offset消費
     * @Description 同時監聽topic1和topic2,監聽topic1的0號分區、topic2的 "0號和1號" 分區,指向1號分區的offset初始值為8
     * @Author long.yuan
     * @Date 2020/3/22 13:38
     * @Param [record]
     * @return void
     **/
//    @KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = {
//            @TopicPartition(topic = "topic1", partitions = { "0" }),
//            @TopicPartition(topic = "topic2", partitions = {"0"})
//            //沒有1號分區,先去掉
////            @TopicPartition(topic = "topic2", partitions = {"0"}, partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
//    })
//    public void onMessage2(ConsumerRecord<?, ?> record) {
//        System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
//    }

    /**
     * 屬性解釋:
     *
     * ① id:消費者ID;
     *
     * ② groupId:消費組ID;
     *
     * ③ topics:監聽的topic,可監聽多個;
     *
     * ④ topicPartitions:可配置更加詳細的監聽信息,可指定topic、parition、offset監聽。
     *
     * 上面onMessage2監聽的含義:監聽topic1的0號分區,同時監聽topic2的0號分區和topic2的1號分區里面offset從8開始的消息。
     * 注意:topics和topicPartitions不能同時使用;
     */

    /**
     * 批量消費,  批量消費和普通一條的消費 只能二選一 使用
     * @param records
     */
//    @KafkaListener(id = "consumer2",groupId = "felix-group", topics = "topic1")
//    public void onMessage3(List<ConsumerRecord<?, ?>> records) {
//        System.out.println(">>>批量消費一次,records.size()="+records.size());
//        for (ConsumerRecord<?, ?> record : records) {
//            System.out.println(record.value());
//        }
//    }

//    // 將這個異常處理器的BeanName放到@KafkaListener注解的errorHandler屬性里面
//    @KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler")
//    public void onMessage4(ConsumerRecord<?, ?> record) throws Exception {
//        throw new Exception("簡單消費-模擬異常");
//    }


    // 批量消費也一樣,異常處理器的message.getPayload()也可以拿到各條消息的信息
//    @KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler")
//    public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {
//        System.out.println("批量消費一次...");
//        throw new Exception("批量消費-模擬異常");
//    }


    /**
     * 綁定一個filter bean 的name; 如果調用會先判斷filter, 進來或者消失
     * @param record
     */
    // 消息過濾監聽
//    @KafkaListener(topics = {"topic1"}/*,containerFactory = "filterContainerFactory"*/)
//    public void onMessage6(ConsumerRecord<?, ?> record) {
//        System.out.println(record.value());
//    }


    /**
     * @Title 消息轉發
     * @Description 從topic1接收到的消息經過處理后轉發到topic2
     * @Author long.yuan
     * @Date 2020/3/23 22:15
     * @Param [record]
     * @return void
     **/
//    @KafkaListener(topics = {"topic1"})
//    @SendTo("topic2")
//    public String onMessage7(ConsumerRecord<?, ?> record) {
//        return record.value()+"-forward message";
//    }
//
//    @KafkaListener(topics = {"topic2"})
//    public void onMessage8(ConsumerRecord<?, ?> record) {
//        System.out.println( record.value()+"");;
//    }

代碼地址:

 轉:https://mp.weixin.qq.com/s/JvKPbvJ7q0ypqfF3ARBW7Q


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM