具体目录:
一、生产者实践
-
普通生产者
-
带回调的生产者
-
自定义分区器
-
kafka事务提交
二、消费者实践
-
简单消费
-
指定topic、partition、offset消费
-
批量消费
-
监听异常处理器
-
消息过滤器
-
消息转发
-
定时启动/停止监听器
package com.ylt.kafkaspringboot.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.*; @RestController public class KafkaProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; // 发送消息 @GetMapping("/kafka/normal/{message}") public void sendMessage1(@PathVariable("message") String normalMessage) { kafkaTemplate.send("topic1", normalMessage); } /** * 发送消息,消息发送成功就执行回调方法,参数包含发送消息参数。 流程:先发送成功,listener 后续才去topic中监听执行消费 * @param callbackMessage */ @GetMapping("/kafka/callbackOne/{message}") @Transactional(rollbackFor = RuntimeException.class) public void sendMessage2(@PathVariable("message") String callbackMessage) { kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> { // 消息发送到的topic String topic = success.getRecordMetadata().topic(); // 消息发送到的分区 int partition = success.getRecordMetadata().partition(); // 消息在分区内的offset long offset = success.getRecordMetadata().offset(); System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset); }, failure -> { System.out.println("发送消息失败:" + failure.getMessage()); }); } @GetMapping("/kafka/callbackTwo/{message}") @Transactional(rollbackFor = RuntimeException.class) public void sendMessage3(@PathVariable("message") String callbackMessage) { kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable ex) { System.out.println("发送消息失败:"+ex.getMessage()); } @Override public void onSuccess(SendResult<String, Object> result) { System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-" + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset()); } }); } // 第一种 配置事务 @GetMapping("/kafka/transaction") public void sendMessage7(){ //使用executeInTransaction 需要在yml中配置事务参数,配置成功才能使用executeInTransaction方法,且运行报错后回滚 kafkaTemplate.executeInTransaction(operations->{ operations.send("topic1","test executeInTransaction"); throw new RuntimeException("fail"); }); //没有在yml配置事务,这里就会出现消息发送成功,异常也出现了。如果配置事务,则改用executeInTransaction 替代send方法 kafkaTemplate.send("topic1","test executeInTransaction"); throw new RuntimeException("fail"); } //第二种 配置事务 (注解方式) // [1] 需要在yml 配置 transaction-id-prefix: kafka_tx. // [2] 在方法上添加@Transactional(rollbackFor = RuntimeException.class) 做为开启事务并回滚 // [3] 在注解方式中 任然可以使用.send方法,不需要使用executeInTransaction方法 @GetMapping("/send2/{input}") @Transactional(rollbackFor = RuntimeException.class) public String sendToKafka2(@PathVariable String input){ // this.template.send(topic,input); //事务的支持 kafkaTemplate.send("topic1",input); if("error".equals(input)) { throw new RuntimeException("input is error"); } kafkaTemplate.send("topic1",input+"anthor"); return "send success!"+input; } }
@Component public class KafkaConsumer { // 消费监听 /* @KafkaListener(topics = {"topic1"}) public void onMessage1(ConsumerRecord<?, ?> record){ // 消费的哪个topic、partition的消息,打印出消息内容 System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value()); }*/ /** * @Title 指定topic、partition、offset消费 * @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8 * @Author long.yuan * @Date 2020/3/22 13:38 * @Param [record] * @return void **/ // @KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = { // @TopicPartition(topic = "topic1", partitions = { "0" }), // @TopicPartition(topic = "topic2", partitions = {"0"}) // //没有1号分区,先去掉 //// @TopicPartition(topic = "topic2", partitions = {"0"}, partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8")) // }) // public void onMessage2(ConsumerRecord<?, ?> record) { // System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value()); // } /** * 属性解释: * * ① id:消费者ID; * * ② groupId:消费组ID; * * ③ topics:监听的topic,可监听多个; * * ④ topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。 * * 上面onMessage2监听的含义:监听topic1的0号分区,同时监听topic2的0号分区和topic2的1号分区里面offset从8开始的消息。 * 注意:topics和topicPartitions不能同时使用; */ /** * 批量消费, 批量消费和普通一条的消费 只能二选一 使用 * @param records */ // @KafkaListener(id = "consumer2",groupId = "felix-group", topics = "topic1") // public void onMessage3(List<ConsumerRecord<?, ?>> records) { // System.out.println(">>>批量消费一次,records.size()="+records.size()); // for (ConsumerRecord<?, ?> record : records) { // System.out.println(record.value()); // } // } // // 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面 // @KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler") // public void onMessage4(ConsumerRecord<?, ?> record) throws Exception { // throw new Exception("简单消费-模拟异常"); // } // 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息 // @KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler") // public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception { // System.out.println("批量消费一次..."); // throw new Exception("批量消费-模拟异常"); // } /** * 绑定一个filter bean 的name; 如果调用会先判断filter, 进来或者消失 * @param record */ // 消息过滤监听 // @KafkaListener(topics = {"topic1"}/*,containerFactory = "filterContainerFactory"*/) // public void onMessage6(ConsumerRecord<?, ?> record) { // System.out.println(record.value()); // } /** * @Title 消息转发 * @Description 从topic1接收到的消息经过处理后转发到topic2 * @Author long.yuan * @Date 2020/3/23 22:15 * @Param [record] * @return void **/ // @KafkaListener(topics = {"topic1"}) // @SendTo("topic2") // public String onMessage7(ConsumerRecord<?, ?> record) { // return record.value()+"-forward message"; // } // // @KafkaListener(topics = {"topic2"}) // public void onMessage8(ConsumerRecord<?, ?> record) { // System.out.println( record.value()+"");; // }
代码地址:
转:https://mp.weixin.qq.com/s/JvKPbvJ7q0ypqfF3ARBW7Q