具體目錄:
一、生產者實踐
-
普通生產者
-
帶回調的生產者
-
自定義分區器
-
kafka事務提交
二、消費者實踐
-
簡單消費
-
指定topic、partition、offset消費
-
批量消費
-
監聽異常處理器
-
消息過濾器
-
消息轉發
-
定時啟動/停止監聽器
package com.ylt.kafkaspringboot.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.*; @RestController public class KafkaProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; // 發送消息 @GetMapping("/kafka/normal/{message}") public void sendMessage1(@PathVariable("message") String normalMessage) { kafkaTemplate.send("topic1", normalMessage); } /** * 發送消息,消息發送成功就執行回調方法,參數包含發送消息參數。 流程:先發送成功,listener 后續才去topic中監聽執行消費 * @param callbackMessage */ @GetMapping("/kafka/callbackOne/{message}") @Transactional(rollbackFor = RuntimeException.class) public void sendMessage2(@PathVariable("message") String callbackMessage) { kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> { // 消息發送到的topic String topic = success.getRecordMetadata().topic(); // 消息發送到的分區 int partition = success.getRecordMetadata().partition(); // 消息在分區內的offset long offset = success.getRecordMetadata().offset(); System.out.println("發送消息成功:" + topic + "-" + partition + "-" + offset); }, failure -> { System.out.println("發送消息失敗:" + failure.getMessage()); }); } @GetMapping("/kafka/callbackTwo/{message}") @Transactional(rollbackFor = RuntimeException.class) public void sendMessage3(@PathVariable("message") String callbackMessage) { kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable ex) { System.out.println("發送消息失敗:"+ex.getMessage()); } @Override public void onSuccess(SendResult<String, Object> result) { System.out.println("發送消息成功:" + result.getRecordMetadata().topic() + "-" + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset()); } }); } // 第一種 配置事務 @GetMapping("/kafka/transaction") public void sendMessage7(){ //使用executeInTransaction 需要在yml中配置事務參數,配置成功才能使用executeInTransaction方法,且運行報錯后回滾 kafkaTemplate.executeInTransaction(operations->{ operations.send("topic1","test executeInTransaction"); throw new RuntimeException("fail"); }); //沒有在yml配置事務,這里就會出現消息發送成功,異常也出現了。如果配置事務,則改用executeInTransaction 替代send方法 kafkaTemplate.send("topic1","test executeInTransaction"); throw new RuntimeException("fail"); } //第二種 配置事務 (注解方式) // [1] 需要在yml 配置 transaction-id-prefix: kafka_tx. // [2] 在方法上添加@Transactional(rollbackFor = RuntimeException.class) 做為開啟事務並回滾 // [3] 在注解方式中 任然可以使用.send方法,不需要使用executeInTransaction方法 @GetMapping("/send2/{input}") @Transactional(rollbackFor = RuntimeException.class) public String sendToKafka2(@PathVariable String input){ // this.template.send(topic,input); //事務的支持 kafkaTemplate.send("topic1",input); if("error".equals(input)) { throw new RuntimeException("input is error"); } kafkaTemplate.send("topic1",input+"anthor"); return "send success!"+input; } }
@Component public class KafkaConsumer { // 消費監聽 /* @KafkaListener(topics = {"topic1"}) public void onMessage1(ConsumerRecord<?, ?> record){ // 消費的哪個topic、partition的消息,打印出消息內容 System.out.println("簡單消費:"+record.topic()+"-"+record.partition()+"-"+record.value()); }*/ /** * @Title 指定topic、partition、offset消費 * @Description 同時監聽topic1和topic2,監聽topic1的0號分區、topic2的 "0號和1號" 分區,指向1號分區的offset初始值為8 * @Author long.yuan * @Date 2020/3/22 13:38 * @Param [record] * @return void **/ // @KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = { // @TopicPartition(topic = "topic1", partitions = { "0" }), // @TopicPartition(topic = "topic2", partitions = {"0"}) // //沒有1號分區,先去掉 //// @TopicPartition(topic = "topic2", partitions = {"0"}, partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8")) // }) // public void onMessage2(ConsumerRecord<?, ?> record) { // System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value()); // } /** * 屬性解釋: * * ① id:消費者ID; * * ② groupId:消費組ID; * * ③ topics:監聽的topic,可監聽多個; * * ④ topicPartitions:可配置更加詳細的監聽信息,可指定topic、parition、offset監聽。 * * 上面onMessage2監聽的含義:監聽topic1的0號分區,同時監聽topic2的0號分區和topic2的1號分區里面offset從8開始的消息。 * 注意:topics和topicPartitions不能同時使用; */ /** * 批量消費, 批量消費和普通一條的消費 只能二選一 使用 * @param records */ // @KafkaListener(id = "consumer2",groupId = "felix-group", topics = "topic1") // public void onMessage3(List<ConsumerRecord<?, ?>> records) { // System.out.println(">>>批量消費一次,records.size()="+records.size()); // for (ConsumerRecord<?, ?> record : records) { // System.out.println(record.value()); // } // } // // 將這個異常處理器的BeanName放到@KafkaListener注解的errorHandler屬性里面 // @KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler") // public void onMessage4(ConsumerRecord<?, ?> record) throws Exception { // throw new Exception("簡單消費-模擬異常"); // } // 批量消費也一樣,異常處理器的message.getPayload()也可以拿到各條消息的信息 // @KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler") // public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception { // System.out.println("批量消費一次..."); // throw new Exception("批量消費-模擬異常"); // } /** * 綁定一個filter bean 的name; 如果調用會先判斷filter, 進來或者消失 * @param record */ // 消息過濾監聽 // @KafkaListener(topics = {"topic1"}/*,containerFactory = "filterContainerFactory"*/) // public void onMessage6(ConsumerRecord<?, ?> record) { // System.out.println(record.value()); // } /** * @Title 消息轉發 * @Description 從topic1接收到的消息經過處理后轉發到topic2 * @Author long.yuan * @Date 2020/3/23 22:15 * @Param [record] * @return void **/ // @KafkaListener(topics = {"topic1"}) // @SendTo("topic2") // public String onMessage7(ConsumerRecord<?, ?> record) { // return record.value()+"-forward message"; // } // // @KafkaListener(topics = {"topic2"}) // public void onMessage8(ConsumerRecord<?, ?> record) { // System.out.println( record.value()+"");; // }
代碼地址:
轉:https://mp.weixin.qq.com/s/JvKPbvJ7q0ypqfF3ARBW7Q
