Spring Boot 整合 KafkaTemplate


具体目录:

一、生产者实践

  • 普通生产者

  • 带回调的生产者

  • 自定义分区器

  • kafka事务提交

二、消费者实践

  • 简单消费

  • 指定topic、partition、offset消费

  • 批量消费

  • 监听异常处理器

  • 消息过滤器

  • 消息转发

  • 定时启动/停止监听器

 

package com.ylt.kafkaspringboot.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.*;


@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;


    // 发送消息
    @GetMapping("/kafka/normal/{message}")
    public void sendMessage1(@PathVariable("message") String normalMessage) {
        kafkaTemplate.send("topic1", normalMessage);
    }


    /**
     * 发送消息,消息发送成功就执行回调方法,参数包含发送消息参数。  流程:先发送成功,listener 后续才去topic中监听执行消费
     * @param callbackMessage
     */

    @GetMapping("/kafka/callbackOne/{message}")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendMessage2(@PathVariable("message") String callbackMessage) {
        kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
            // 消息发送到的topic
            String topic = success.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordMetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
        }, failure -> {
            System.out.println("发送消息失败:" + failure.getMessage());
        });
    }


    @GetMapping("/kafka/callbackTwo/{message}")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendMessage3(@PathVariable("message") String callbackMessage) {
        kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送消息失败:"+ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                        + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
            }
        });
    }

    // 第一种 配置事务
    @GetMapping("/kafka/transaction")
    public void sendMessage7(){
        //使用executeInTransaction 需要在yml中配置事务参数,配置成功才能使用executeInTransaction方法,且运行报错后回滚
        kafkaTemplate.executeInTransaction(operations->{
             operations.send("topic1","test executeInTransaction");
             throw new RuntimeException("fail");
        });

        //没有在yml配置事务,这里就会出现消息发送成功,异常也出现了。如果配置事务,则改用executeInTransaction 替代send方法
        kafkaTemplate.send("topic1","test executeInTransaction");
        throw new RuntimeException("fail");
    }


    //第二种 配置事务 (注解方式)
    // [1] 需要在yml 配置 transaction-id-prefix: kafka_tx.
    // [2] 在方法上添加@Transactional(rollbackFor = RuntimeException.class)  做为开启事务并回滚
    // [3] 在注解方式中 任然可以使用.send方法,不需要使用executeInTransaction方法
    @GetMapping("/send2/{input}")
    @Transactional(rollbackFor = RuntimeException.class)
    public String sendToKafka2(@PathVariable String input){
//        this.template.send(topic,input);
        //事务的支持

        kafkaTemplate.send("topic1",input);
        if("error".equals(input))
        {
            throw new RuntimeException("input is error");
        }
        kafkaTemplate.send("topic1",input+"anthor");

        return "send success!"+input;

    }

}

 

@Component
public class KafkaConsumer {


    // 消费监听
/*
    @KafkaListener(topics = {"topic1"})
    public void onMessage1(ConsumerRecord<?, ?> record){
        // 消费的哪个topic、partition的消息,打印出消息内容
        System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }*/



    /**
     * @Title 指定topic、partition、offset消费
     * @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8
     * @Author long.yuan
     * @Date 2020/3/22 13:38
     * @Param [record]
     * @return void
     **/
//    @KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = {
//            @TopicPartition(topic = "topic1", partitions = { "0" }),
//            @TopicPartition(topic = "topic2", partitions = {"0"})
//            //没有1号分区,先去掉
////            @TopicPartition(topic = "topic2", partitions = {"0"}, partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
//    })
//    public void onMessage2(ConsumerRecord<?, ?> record) {
//        System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
//    }

    /**
     * 属性解释:
     *
     * ① id:消费者ID;
     *
     * ② groupId:消费组ID;
     *
     * ③ topics:监听的topic,可监听多个;
     *
     * ④ topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。
     *
     * 上面onMessage2监听的含义:监听topic1的0号分区,同时监听topic2的0号分区和topic2的1号分区里面offset从8开始的消息。
     * 注意:topics和topicPartitions不能同时使用;
     */

    /**
     * 批量消费,  批量消费和普通一条的消费 只能二选一 使用
     * @param records
     */
//    @KafkaListener(id = "consumer2",groupId = "felix-group", topics = "topic1")
//    public void onMessage3(List<ConsumerRecord<?, ?>> records) {
//        System.out.println(">>>批量消费一次,records.size()="+records.size());
//        for (ConsumerRecord<?, ?> record : records) {
//            System.out.println(record.value());
//        }
//    }

//    // 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
//    @KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler")
//    public void onMessage4(ConsumerRecord<?, ?> record) throws Exception {
//        throw new Exception("简单消费-模拟异常");
//    }


    // 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
//    @KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler")
//    public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {
//        System.out.println("批量消费一次...");
//        throw new Exception("批量消费-模拟异常");
//    }


    /**
     * 绑定一个filter bean 的name; 如果调用会先判断filter, 进来或者消失
     * @param record
     */
    // 消息过滤监听
//    @KafkaListener(topics = {"topic1"}/*,containerFactory = "filterContainerFactory"*/)
//    public void onMessage6(ConsumerRecord<?, ?> record) {
//        System.out.println(record.value());
//    }


    /**
     * @Title 消息转发
     * @Description 从topic1接收到的消息经过处理后转发到topic2
     * @Author long.yuan
     * @Date 2020/3/23 22:15
     * @Param [record]
     * @return void
     **/
//    @KafkaListener(topics = {"topic1"})
//    @SendTo("topic2")
//    public String onMessage7(ConsumerRecord<?, ?> record) {
//        return record.value()+"-forward message";
//    }
//
//    @KafkaListener(topics = {"topic2"})
//    public void onMessage8(ConsumerRecord<?, ?> record) {
//        System.out.println( record.value()+"");;
//    }

代码地址:

 转:https://mp.weixin.qq.com/s/JvKPbvJ7q0ypqfF3ARBW7Q


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM