SpringBoot集成Spring For Kafka操作Kafka詳解



參考信息:

環境說明:

  • Kafka 版本:2.3.0
  • Zookeeper 版本:3.4.14
  • SpringBoot 版本:2.1.7.RELEASE
  • Spring For Apache Kafka 版本:2.2.8

一、概念知識

什么是消息中間件

       消息中間件利用高效可靠的消息傳遞機制進行平台無關的數據交流,並基於數據通信來進行分布式系統的集成。通過提供消息傳遞和消息排隊模型,它可以在分布式環境下擴展進程間的通信。

什么是 Kafka

       Apache Kafka 是一個分布式高吞吐量的流消息系統,Kafka 建立在 ZooKeeper 同步服務之上。它與 Apache Storm 和 Spark 完美集成,用於實時流數據分析,與其他消息傳遞系統相比,Kafka具有更好的吞吐量,內置分區,數據副本和高度容錯功能,因此非常適合大型消息處理應用場景。

Kafka 特性

  • 高並發: 支持數千個客戶端同時讀寫。
  • 可擴展性: kafka集群支持熱擴展。
  • 容錯性: 允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗)。
  • 持久性、可靠性: 消息被持久化到本地磁盤,並且支持數據備份防止數據丟失。
  • 高吞吐量、低延遲: Kafka每秒可以處理幾十萬消息,延遲最低只有幾毫秒,每個消息主題topic可以分多個區,消費者組(consumer group)對消息分區(partition)進行消費。

使用場景

  • 日志收集: 可以用 kafka 收集各種服務的日志,通過kafka以統一接口服務的方式開放給各種消費者,如 hadoop,Hbase,Solr 等。
  • 消息系統: 解耦生產者和消費者、緩存消息等。
  • 用戶活動跟蹤: Kafka 經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁,搜索,點擊等活動,這些活動信息被各個服務器發布到 kafka 的 topic 中,然后訂閱者通過訂閱這些 topic 來做實時的監控分析,或者裝載到 hadoop、數據倉庫中做離線分析和挖掘。
  • 運營指標: Kafka也經常用來記錄運營監控數據,包括收集各種分布式應用的數據,比如報警和報告等。
  • 流式處理: 比如 spark streaming 和 storm。

基本概念

  • Broker: 消息中間件處理節點,一個 Kafka 節點就是一個 Broker,一個或者多個 Broker 可以組成一個 Kafka 集群。
  • Topic: Kafka 的消息通過 Topic 主題來分類,Topic類似於關系型數據庫中的表,每個 Topic 包含一個或多(Partition)分區。
  • Partition: 多個分區會分布在Kafka集群的不同服務節點上,消息以追加的方式寫入一個或多個分區中。
  • LogSegment: 每個分區又被划分為多個日志分段 LogSegment 組成,日志段是 Kafka 日志對象分片的最小單位。LogSegment 算是一個邏輯概念,對應一個具體的日志文件(”.log” 的數據文件)和兩個索引文件(”.index” 和 “.timeindex”,分別表示偏移量索引文件和消息時間戳索引文件)組成。
  • Offset: 每個分區中都由一系列有序的、不可變的消息組成,這些消息被順序地追加到 Partition 中,每個消息都有一個連續的序列號稱之為 Offset 偏移量,用於在 Partition 內唯一標識消息。
  • Message: 消息是 Kafka 中存儲的最小最基本的單位,即為一個 commit log,由一個固定長度的消息頭和一個可變長度的消息體組成。
  • Producer: 消息的生產者,負責發布消息到 Kafka Broker,生產者在默認情況下把消息均衡地分布到主題的所有分區上,用戶也可以自定義分區器來實現消息的分區路由。
  • Consumer: 消息的消費者,從 Kafka Broker 讀取消息的客戶端,消費者把每個分區最后讀取的消息的 Offset 偏移量保存在 Zookeeper 或 Kafka 上,如果消費者關閉或重啟,它的讀取狀態不會丟失。
  • Consumer Group: 每個 Consumer 屬於一個特定的 Consumer Group(若不指定 Group Name則屬於默認的 group),一個或多個 Consumer 組成的群組可以共同消費一個 Topic 中的消息,但每個分區只能被群組中的一個消費者操作。

生產者 ACKS 機制

       ACKS 參數指定了必須要有多少個分區副本接收到消息,生產者才會認為消息寫入是發送消息成功的,這個參數對消息丟失的可能性會產生重要影響,主參數有如下選項:

  • acks=0: 把消息發送到kafka就認為發送成功。
  • acks=1: 把消息發送到kafka leader分區,並且寫入磁盤就認為發送成功。
  • acks=all: 把消息發送到 Kafka Leader 分區,並且 Leader 分區的副本 Follower 對消息進行了同步就認為發送成功。

消費者更新 Offset 偏移量兩種方式

詳情可以查看參考的一篇文章:https://www.jianshu.com/p/d5cd34e429a2

       消費者把每個分區最后讀取的悄息偏移量提交保存在 Zookeeper 或 Kafka 上,如果消費者關閉或重啟,它的讀取狀態不會丟失,KafkaConsumer API 提供了很多種方式來提交偏移量,但是不同的提交方式會產生不同的數據影響。

  • 自動提交:

       如果 enable.auto.commit 被設置為 true,那么消費者會自動提交當前處理到的偏移量存入 Zookeeper,自動提交的時間間隔為5s,通過 atuo.commit.interval.ms 屬性設置,自動提交是非常方便,但是自動提交會出現消息被重復消費的風險,可以通過修改提交時間間隔來更頻繁地提交偏移量,減小可能出現重復悄息的時間窗,不過這種情況是無也完全避免的。

  • 手動提交:

       鑒於 Kafka 自動提交 Offset 的不靈活性和不精確性(只能是按指定頻率的提交),Kafka提供了手動提交 Offset 策略,將 auto.commit.offset 自動提交參數設置為 false 來關閉自動提交開啟手動模式,手動提交能對偏移量更加靈活精准地控制,以保證消息不被重復消費以及消息不被丟失。

二、SpringBoot 操作 Kafka 示例

1、Maven 引入 Kafka 相關組件

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.7.RELEASE</version> </parent> <groupId>club.mydlq</groupId> <artifactId>springboot-kafka-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springboot-kafka-demo</name> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project> 

2、Topic 配置

配置 Topic,每次程序啟動時檢測 Kafka 中是否存在已經配置的 Topic,如果不存在就創建。

import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.KafkaAdmin; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaTopicConfig { /** * 定義一個KafkaAdmin的bean,可以自動檢測集群中是否存在topic,不存在則創建 */ @Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> configs = new HashMap<>(); // 指定多個kafka集群多個地址,例如:192.168.2.11,9092,192.168.2.12:9092,192.168.2.13:9092 configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092"); return new KafkaAdmin(configs); } /** * 創建 Topic */ @Bean public NewTopic topicinfo() { // 創建topic,需要指定創建的topic的"名稱"、"分區數"、"副本數量(副本數數目設置要小於Broker數量)" return new NewTopic("test", 3, (short) 0); } } 

3、Producer 配置

(1)、創建 Producer 配置類

創建 Producer 配置類,對 Kafka 生產者進行配置,在配置中需要設置三個 Bean 分別為:

  • kafkaTemplate:kafka template 實例,用於 Spring 中的其它對象引入該 Bean,通過其向 Kafka 發送消息。
  • producerFactory:producer 工廠,用於對 kafka producer 進行配置。
  • producerConfigs:對 kafka producer 參數進行配置。
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; // 設置@Configuration、@EnableKafka兩個注解,聲明Config並且打開KafkaTemplate能力。 @Configuration @EnableKafka public class KafkaProducerConfig { /** * Producer Template 配置 */ @Bean(name="kafkaTemplate") public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } /** * Producer 工廠配置 */ public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } /** * Producer 參數配置 */ public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); // 指定多個kafka集群多個地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"); // 重試次數,0為不啟用重試機制 props.put(ProducerConfig.RETRIES_CONFIG, 0); // acks=0 把消息發送到kafka就認為發送成功 // acks=1 把消息發送到kafka leader分區,並且寫入磁盤就認為發送成功 // acks=all 把消息發送到kafka leader分區,並且leader分區的副本follower對消息進行了同步就任務發送成功 props.put(ProducerConfig.ACKS_CONFIG,"1"); // 生產者空間不足時,send()被阻塞的時間,默認60s props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000); // 控制批處理大小,單位為字節 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); // 批量發送,延遲為1毫秒,啟用該功能能有效減少生產者發送消息次數,從而提高並發量 props.put(ProducerConfig.LINGER_MS_CONFIG, 1); // 生產者可以使用的總內存字節來緩沖等待發送到服務器的記錄 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); // 消息的最大大小限制,也就是說send的消息大小不能超過這個限制, 默認1048576(1MB) props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576); // 鍵的序列化方式 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 值的序列化方式 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 壓縮消息,支持四種類型,分別為:none、lz4、gzip、snappy,默認為none。 // 消費者默認支持解壓,所以壓縮設置在生產者,消費者無需設置。 props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none"); return props; } } 

(2)、創建 Producer Service 向 kafka 發送數據

創建 Producer Service 引入 KafkaTemplate 對象,再創建 sendMessageSyncsendMessageAsync 兩個方法,分別利用“同步/異步”兩種方法向 kafka 發送消息。

import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @Service public class KafkaProducerService { @Autowired private KafkaTemplate kafkaTemplate; /** * producer 同步方式發送數據 * @param topic topic名稱 * @param message producer發送的數據 */ public void sendMessageSync(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException { kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS); } /** * producer 異步方式發送數據 * @param topic topic名稱 * @param message producer發送的數據 */ public void sendMessageAsync(String topic, String message) { ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, message); future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() { @Override public void onSuccess(SendResult<Integer, String> result) { System.out.println("success"); } @Override public void onFailure(Throwable ex) { System.out.println("failure"); } }); } } 

(3)、創建 Producer Controller 調用 Producer Service 產生數據

Spring Controller 類,用於調用 Producer Service 中的方法向 kafka 發送消息。

import club.mydlq.springbootkafkademo.service.ProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @RestController public class KafkaProducerController { @Autowired private KafkaProducerService producerService; @GetMapping("/sync") public void sendMessageSync() throws InterruptedException, ExecutionException, TimeoutException { producerService.sendMessageSync("test","同步發送消息測試"); } @GetMapping("/async") public void sendMessageAsync(){ producerService.sendMessageAsync("test","異步發送消息測試"); } } 

4、Consumer 配置

(1)、創建 Consumer 配置類

創建 Consumer 配置類,對 Kafka 消費者進行配置,在配置中需要設置三個 Bean 分別為:

  • kafkaListenerContainerFactory:kafka container 工廠,負責創 建container,當使用@KafkaListener時需要提供。
  • consumerFactory:consumer 工廠,用於對 kafka consumer 進行配置。
  • consumerConfigs:對 kafka consumer 參數進行配置。
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConsumerConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); // 設置消費者工廠 factory.setConsumerFactory(consumerFactory()); // 消費者組中線程數量 factory.setConcurrency(3); // 拉取超時時間 factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); // Kafka地址 propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"); // 是否自動提交offset偏移量(默認true) propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 自動提交的頻率(ms) propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // Session超時設置 propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); // 鍵的反序列化方式 propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 值的反序列化方式 propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // offset偏移量規則設置: // (1)、earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 // (2)、latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 // (3)、none:topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常 propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return propsMap; } } 

(2)、創建 Consumer Service 監聽 Kafka 數據

import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class KafkaConsumerService { @KafkaListener(topics = {"test"},groupId = "group1", containerFactory="kafkaListenerContainerFactory") public void kafkaListener(String message){ System.out.println(message); } } 

三、SpringBoot 操作 Kafka 詳解

1、Producer Template 發送消息幾種方法

KafkaTemplate 類提供了非常方便的方法將數據發送到 kafka 的 Topic,以下清單顯示了該類的提供的相關方法,詳情可以查看 KafkaTemplate 類方法文檔

// 設定data,向kafka發送消息 ListenableFuture<SendResult<K, V>> sendDefault(V data); // 設定key、data,向kafka發送消息 ListenableFuture<SendResult<K, V>> sendDefault(K key, V data); // 設定partition、key、data,向kafka發送消息 ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data); // 設定partition、timestamp、key、data,向kafka發送消息 ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data); // 設定topic、data,向kafka發送消息 ListenableFuture<SendResult<K, V>> send(String topic, V data); // 設定topic、key、data,向kafka發送消息 ListenableFuture<SendResult<K, V>> send(String topic, K key, V data); // 設定topic、partition、key、data,向kafka發送消息 ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data); // 設定topic、partition、timestamp、 key、data,向kafka發送消息 ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data); // 創建ProducerRecord對象,在ProducerRecord中設置好topic、partion、key、value等信息,然后向kafka發送消息 ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record); // 創建Spring的Message對象,然后向kafka發送消息 ListenableFuture<SendResult<K, V>> send(Message<?> message); // 獲取指標信息 Map<MetricName, ? extends Metric> metrics(); // 顯示Topic分區信息 List<PartitionInfo> partitionsFor(String topic); //在生產者上執行一些任意操作並返回結果。 <T> T execute(ProducerCallback<K, V, T> callback); // 生產者刷新消息 void flush(); // 用於執行生產者方法后異步回調 interface ProducerCallback<K, V, T> { T doInKafka(Producer<K, V> producer); } 

下面將寫個使用示例,這里改下上面向 kafka service 發送數據的例子,通過不同的方法向 kafka 發送消息,具體代碼如下:

import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.SendResult; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import java.util.Date; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @Service public class ProducerService { @Autowired private KafkaTemplate kafkaTemplate; /** * producer 同步方式發送數據 * * @param topic topic名稱 * @param message producer發送的數據 */ public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException { //------- 方法:send(String topic, @Nullable V data) kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS); //------- 方法:send(String topic, K key, @Nullable V data) kafkaTemplate.send(topic, key, message).get(10, TimeUnit.SECONDS); //------- 方法:send(String topic, K key, @Nullable V data) kafkaTemplate.send(topic, 0, message).get(10, TimeUnit.SECONDS); //------- 方法:send(String topic, Integer partition, K key, @Nullable V data) kafkaTemplate.send(topic, 0, key, message).get(10, TimeUnit.SECONDS); //------- 方法:send(String topic, Integer partition, Long timestamp, K key, @Nullable V data) kafkaTemplate.send(topic, 0, new Date().getTime(),key, message).get(10, TimeUnit.SECONDS); //------- 方法:send(Message<?> message) Message msg = MessageBuilder.withPayload("Send Message(payload,headers) Test") .setHeader(KafkaHeaders.MESSAGE_KEY, key) .setHeader(KafkaHeaders.TOPIC, topic) .setHeader(KafkaHeaders.PREFIX,"kafka_") .build(); kafkaTemplate.send(msg).get(10, TimeUnit.SECONDS); //------- 方法:send(ProducerRecord<K, V> record) ProducerRecord<String, String> producerRecord1 = new ProducerRecord<>("test", "Send ProducerRecord(topic,value) Test"); ProducerRecord<String, String> producerRecord2 = new ProducerRecord<>("test", "", "Send ProducerRecord(topic,key,value) Test"); kafkaTemplate.send(producerRecord1).get(10, TimeUnit.SECONDS); kafkaTemplate.send(producerRecord2).get(10, TimeUnit.SECONDS); } /** * producer 異步方式發送數據 * * @param topic topic名稱 * @param message producer發送的數據 */ public void sendMessageAsync(String topic, String key, String message) { //------- 方法:send(String topic, @Nullable V data) ListenableFuture<SendResult<Integer, String>> future1 = kafkaTemplate.send(topic, message); //------- 方法:send(String topic, K key, @Nullable V data) ListenableFuture<SendResult<Integer, String>> future2 = kafkaTemplate.send(topic, key, message); //------- 方法:send(String topic, K key, @Nullable V data) ListenableFuture<SendResult<Integer, String>> future3 = kafkaTemplate.send(topic, 0, message); //------- 方法:send(String topic, Integer partition, K key, @Nullable V data) ListenableFuture<SendResult<Integer, String>> future4 = kafkaTemplate.send(topic, 0, key, message); //------- 方法:send(String topic, Integer partition, Long timestamp, K key, @Nullable V data) ListenableFuture<SendResult<Integer, String>> future5 = kafkaTemplate.send(topic, 0, new Date().getTime(),key, message); //------- 方法:send(Message<?> message) Message msg = MessageBuilder.withPayload("Send Message(payload,headers) Test") .setHeader(KafkaHeaders.MESSAGE_KEY, key) .setHeader(KafkaHeaders.TOPIC, topic) .setHeader(KafkaHeaders.PREFIX,"kafka_") .build(); ListenableFuture<SendResult<Integer, String>> future6 = kafkaTemplate.send(msg); //------- 方法:send(ProducerRecord<K, V> record) ProducerRecord<String, String> producerRecord1 = new ProducerRecord<>("test", "Send ProducerRecord(topic,value) Test"); ProducerRecord<String, String> producerRecord2 = new ProducerRecord<>("test", "", "Send ProducerRecord(topic,key,value) Test"); ListenableFuture<SendResult<Integer, String>> future7 = kafkaTemplate.send(producerRecord1); ListenableFuture<SendResult<Integer, String>> future8 = kafkaTemplate.send(producerRecord2); // 設置異步發送消息獲取發送結果后執行的動作 ListenableFutureCallback listenableFutureCallback = new ListenableFutureCallback<SendResult<Integer, String>>() { @Override public void onSuccess(SendResult<Integer, String> result) { System.out.println("success"); } @Override public void onFailure(Throwable ex) { System.out.println("failure"); } }; // 將listenableFutureCallback與異步發送消息對象綁定 future1.addCallback(listenableFutureCallback); future2.addCallback(listenableFutureCallback); future3.addCallback(listenableFutureCallback); future4.addCallback(listenableFutureCallback); future5.addCallback(listenableFutureCallback); future6.addCallback(listenableFutureCallback); future7.addCallback(listenableFutureCallback); future8.addCallback(listenableFutureCallback); } } 

2、Kafka Consumer 監聽 Kafka 消息

當我們需要接收 kafka 中的消息時需要使用消息監聽器,Spring For Kafka 提供了八種消息監聽器接口,接口如下:

/** * 當使用"自動提交"或"ontainer-managed"中一個提交方法提交offset偏移量時, * 使用此接口處理Kafka consumer poll()操作接收到的各個ConsumerRecord實例。 */ public interface MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data); } /** * 當使用手動提交offset偏移量時,使用此接口處理從Kafka consumer poll()操作接收到的各個ConsumerRecord實例。 */ public interface AcknowledgingMessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment); } /** * 當使用"自動提交"或"ontainer-managed"中一個提交方法提交offset偏移量時, * 使用此接口處理Kafka consumer poll()操作接收到的各個ConsumerRecord * 實例。並提供可訪問的consumer對象。 */ public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer); } /** * 當使用手動提交offset偏移量時,使用此接口處理從Kafka consumer poll()操作 * 接收到的各個ConsumerRecord實例。並提供可訪問的consumer對象。 */ public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); } /** * 當使用"自動提交"或"ontainer-managed"中一個提交方法提交offset偏移量時, * 使用此接口處理從Kafka consumer poll()操作接收到的所有ConsumerRecord實例。 * * 注意:使用此接口時不支持ACK的AckMode.RECORD模式,因為監聽器已獲得完整的批處理。 */ public interface BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data); } /** * 當使用手動提交offset偏移量時,使用此接口處理從Kafka consumer poll()操作接收到的所有ConsumerRecord實例。 */ public interface BatchAcknowledgingMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment); } /** * 當使用"自動提交"或"ontainer-managed"中一個提交方法提交offset偏移量時, * 使用此接口處理從Kafka consumer poll()操作接收到的所有ConsumerRecord實例。 * 並提供可訪問的consumer對象。 * * 注意:使用此接口時不支持ACK的AckMode.RECORD模式,因為監聽器已獲得完整的批處理。 */ public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer); } /** * 當使用手動提交offset偏移量時,使用此接口處理從Kafka consumer poll()操作接收到的 * 所有ConsumerRecord實例。並提供可訪問的consumer對象。 */ public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); } 

上面接口中的方法歸總就是:

序號 消費方式 自動提交Offset偏移量 提供Consumer對象
1 單條
2 單條
3 單條
4 單條
5 批量
6 批量
7 批量
8 批量

Spring For Kafka 提供了消息監聽器接口的兩種實現類,分別是:

  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer 利用單個線程來接收全部主題中全部分區上的所有消息。
ConcurrentMessageListenerContainer 代理的一個或多個 KafkaMessageListenerContainer 實例,來實現多個線程消費。

下面將創建一個 KafkaMessageListenerContainer 實例來監聽 Kafka 消息:

@Configuration @EnableKafka public class ConsumerConfigDemo { @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return propsMap; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } /** * 創建 KafkaMessageListenerContainer 實例監聽 kafka 消息 */ @Bean public KafkaMessageListenerContainer demoListenerContainer() { // 創建container配置參數,並指定要監聽的 topic 名稱 ContainerProperties properties = new ContainerProperties("test"); // 設置消費者組名稱 properties.setGroupId("group2"); // 設置監聽器監聽 kafka 消息 properties.setMessageListener(new MessageListener<Integer,String>() { @Override public void onMessage(ConsumerRecord<Integer, String> record) { System.out.println("消息:" + record); } }); return new KafkaMessageListenerContainer(consumerFactory(), properties); } } 

       上面示例啟動后將監聽 topic 名稱為 “test” 的 kafka 消息,不過這樣啟動只是單線程消費,如果想多線程消費就得創建多個實例來監控該 topic 不同的分區。但是這樣操作來完成消費者多線程消費比較麻煩,所以一般使用 Spring For Kafka 組件時都會創建 KafkaListenerContainerFactory Bean 來代理多個 KafkaMessageListenerContainer 完成消費者多線程消費。

3、使用 @KafkaListener 注解監聽 Kafka 消息

       為了使創建 kafka 監聽器更加簡單,Spring For Kafka 提供了 @KafkaListener 注解,該 @KafkaListener 注解配置方法上,凡是帶上此注解的方法就會被標記為是 Kafka 消息監聽器,所以可以用 @KafkaListener 注解快速創建消息監聽器。

下面寫幾個例子來簡單描述下使用方法:

(1)、監聽單個 Topic 示例

這里先寫一個簡單使用 @KafkaListener 完成消息監聽的示例。

@Configuration @EnableKafka public class ConsumerConfigDemo { @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return propsMap; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // 創建3個線程並發消費 factory.setConcurrency(3); // 設置拉取數據超時時間 factory.getContainerProperties().setPollTimeout(3000); return factory; } /** * ---使用@KafkaListener注解來標記此方法為kafka消息監聽器,創建消費組group1監聽test topic */ @KafkaListener(topics = {"test"},groupId = "group1") public void kafkaListener(String message){ System.out.println("消息:"+message); } } 

(2)、監聽多個 Topic 示例

使用 @KafkaListener 也可以監控多個 topic 的消息,示例如下:

@KafkaListener(topics = {"test1", "test2"}, groupId = "group1") public void kafkaListener(String message){ System.out.println("消息:"+message); } 

(3)、監聽某個 Topic 的某個分區示例

單獨監聽某個分區息,示例如下:

@KafkaListener(id = "id0", groupId = "group1", topicPartitions = { @TopicPartition(topic = "test", partitions = { "0" }) }) public void kafkaListener1(String message) { System.out.println("消息:"+message); } @KafkaListener(id = "id1", groupId = "group1", topicPartitions = { @TopicPartition(topic = "test", partitions = { "1", "2" }) }) public void kafkaListener2(String message) { System.out.println("消息:"+message); } 

(4)、監聽多個 Topic 的分區示例

同時監聽多個 topic 的分區,示例如下:

@KafkaListener(id = "test", group = "group1", topicPartitions = { @TopicPartition(topic = "test1", partitions = {"0"}), @TopicPartition(topic = "test2", partitions = {"0", "1"}) }) public void kafkaListener(String message) { System.out.print(message); } 

(5)、獲取監聽的 topic 消息頭中的元數據

可以從消息頭中獲取有關消息的元數據,例如:

@KafkaListener(topics = "test", groupId = "group1") public void kafkaListener(@Payload String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) { System.out.println("主題:" + topic); System.out.println("鍵key:" + key); System.out.println("消息:" + message); } 

(6)、監聽 topic 進行批量消費

如果參數配置中設置為批量消費,則 @KafkaListener 注解的方法的參數要使用 List 來接收,例如:

@KafkaListener(topics = "test", groupId = "group1") public void kafkaListener(List<String> messages) { for(String msg:messages){ System.out.println(msg); } } 

(7)、監聽 topic 並手動提交 Offset 偏移量

如果設置為手動提交 Offset 偏移量,並且設置 Ack 模式為 MANUAL 或 MANUAL_IMMEDIATE,則需要在方法參數中引入 Acknowledgment 對象,並執行它的 acknowledge() 方法來提交偏移量。

@KafkaListener(topics = "test",groupId = "group5") public void kafkaListener(List<String> messages, Acknowledgment acknowledgment) { for(String msg:messages){ System.out.println(msg); } // 觸發提交offset偏移量 acknowledgment.acknowledge(); } 

4、使用 @KafkaListener 模糊匹配多個 Topic

使用 @KafkaListener 注解時,可以添加參數 topicPattern ,輸入通配符來對多個 topic 進行監聽,例如這里使用 “test.*” 將監聽所有以 test 開頭的 topic 的消息。

@KafkaListener(topicPattern = "test.*",groupId = "group6") public void annoListener2(String messages) { System.err.println(messages); } 

5、使用 @SendTo 注解轉發消息

       在平時處理業務邏輯時候,經常需要接收 kafka 中某個 topic 的消息,進行一系列處理來完成業務邏輯,然后再進行轉發到一個新的 topic 中,由於這種業務需求,Spring For Kafka 提供了 @SendTo 注解,只要在 @KafkaListener 與 @SendTo 注解在同一個方法上,並且該方法存在返回值,那么就能將監聽的數據在方法內進行處理后 return,然后轉發到 @SendTo 注解內設置的 topic 中。

完成上面操作需要幾個步驟:

  1. 配置 Producer 參數,並創建 kafkaTemplate Bean。
  2. 配置KafkaListenerContainerFactory的ReplyTemplate,將 kafkaTemplate 對象添加到其中。
  3. 創建消息監聽器方法,設置該方法擁有返回值,並添加 @KafkaListener 與 @SendTo 兩個注解,並在 @SendTo 注解中輸入消息轉發的 Topic。

(1)、配置 Producer 參數,並創建 kafkaTemplate Bean

@Configuration @EnableKafka public class KafkaProducerConfig { /** * kafkaTemplate Bean */ @Bean(name="kafkaTemplate") public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } } 

(2)、配置KafkaListenerContainerFactory的ReplyTemplate,將 kafkaTemplate 對象添加到其中

@Configuration @EnableKafka public class KafkaConsumerConfig { @Autowired private KafkaTemplate kafkaTemplate; @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); // ---設置ReplyTemplate參數,將kafkaTemplate對象加入 factory.setReplyTemplate(kafkaTemplate); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return propsMap; } 

(3)、創建消息監聽器方法,設置該方法擁有返回值,並添加 @KafkaListener 與 @SendTo 兩個注解,並在 @SendTo 注解中輸入消息轉發的 Topic。

@Service public class KafkaConsumerMessage { /** * 監聽test1 topic,設置返回值為string類型,並添加@SendTo注解,將消息轉發到 test2 */ @KafkaListener(topics = "test1",groupId = "group1") @SendTo("test2") public String kafkaListener1(String messages) { System.out.println(messages); String newMsg = messages + "消息轉發測試"; // 將處理后的消息返回 return newMsg; } /** * 監聽 test2 topic */ @KafkaListener(topics = "test2",groupId = "group2") public void kafkaListener2(String messages) { System.err.println(messages); } } 

6、Kafka Consumer 並發批量消費消息

(1)、設置並發數與開啟批量

  • kafkaListenerContainerFactory 設置 factory.setConcurrency(3) 設置並發,這個值不能超過topic分區數目
  • kafkaListenerContainerFactory 設置 factory.setBatchListener(true) 開啟批量
  • consumerConfigs 配置 ConsumerConfig.MAX_POLL_RECORDS_CONFIG 值,來設置批量消費每次最多消費多少條消息記錄
@Configuration @EnableKafka public class ConsumerConfigDemo1 { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // 消費者組中線程數量,例如topic有3個分區,為了加快消費將並發設置為3 factory.setConcurrency(3); // 拉取超時時間 factory.getContainerProperties().setPollTimeout(3000); // 當使用批量監聽器時需要設置為true factory.setBatchListener(true); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); // Kafka地址 propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // 是否自動提交 propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 自動提交的頻率 propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // Session超時設置 propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); // 鍵的反序列化方式 propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 值的反序列化方式 propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 批量消費每次最多消費多少條消息記錄 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10"); return propsMap; } } 

(2)、設置分區消費

有多個分區的 Topic,可以設置多個注解單獨監聽 Topic 各個分區以提高效率。

@Component public class ConsumerMessage { @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = "test2", partitions = { "0" }) }) public void listenPartition0(List<ConsumerRecord<?, ?>> records) { System.out.println("Id0 Listener, Thread ID: " + Thread.currentThread().getId()); System.out.println("Id0 records size " + records.size()); for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); System.out.println("Received: " + record); if (kafkaMessage.isPresent()) { Object message = record.value(); String topic = record.topic(); System.out.printf(topic + " p0 Received message=" + message); } } } @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = "test2", partitions = { "1" }) }) public void listenPartition1(List<ConsumerRecord<?, ?>> records) { System.out.println("Id1 Listener, Thread ID: " + Thread.currentThread().getId()); System.out.println("Id1 records size " + records.size()); for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); System.out.println("Received: " + record); if (kafkaMessage.isPresent()) { Object message = record.value(); String topic = record.topic(); System.out.printf(topic + " p1 Received message=" + message); } } } @KafkaListener(id = "id2", topicPartitions = { @TopicPartition(topic = "test2", partitions = { "2" }) }) public void listenPartition2(List<ConsumerRecord<?, ?>> records) { System.out.println("Id2 Listener, Thread ID: " + Thread.currentThread().getId()); System.out.println("Id2 records size " + records.size()); for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); System.out.println("Received: " + record); if (kafkaMessage.isPresent()) { Object message = record.value(); String topic = record.topic(); System.out.printf(topic + " p2 Received message=" + message); } } } } 

7、暫停和恢復 Listener Containers

Spring For Kafka 提供 start()pause() 和 resume() 方法來操作監聽容器的啟動、暫停和恢復。

  • start():啟動監聽容器。
  • pause():暫停監聽容器。
  • resume():恢復監聽容器。

       這些方法一般可以靈活操作 kafka 的消費,例如進行服務進行升級,暫停消費者進行消費;例如在白天高峰期不進行服務消費,等到晚上再進行,這時候可以設置定時任務,白天關閉消費者消費到晚上開啟;考慮到這些情況,利用 start()、pause()、resume() 這些方法能很好控制消費者進行消費。這里寫一個簡單例子,通過 cotroller 操作暫停、恢復消費者監聽容器。

@RestController public class KafkaController { @Autowired private KafkaListenerEndpointRegistry registry; /** * 暫停監聽容器 */ @GetMapping("/pause") public void pause(){ registry.getListenerContainer("pause.resume").pause(); } /** * 恢復監聽容器 */ @GetMapping("/resume") public void resume(){ //判斷監聽容器是否啟動,未啟動則將其啟動,否則進行恢復監聽容器 if (!registry.getListenerContainer("pause.resume").isRunning()) { registry.getListenerContainer("pause.resume").start(); } registry.getListenerContainer("pause.resume").resume(); } } 

在上面例子中,調用 /pause 接口可以暫停消費者監聽容器,調用 /resume 接口可以恢復消費者監聽容器。

8、過濾監聽器中的消息

在接收消息時候可以創建一個過濾器來過濾接收的消息,這樣方便我們不必處理全部消息,只接收我們需要的消息進行處理。

在 kafkaListenerContainerFactory 中配置一個過濾器 RecordFilterStrategy 對象過濾消息,這里演示下如何操作:

@Configuration @EnableKafka public class ConsumerConfigDemo { @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return propsMap; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); // 設置過濾器,只接收消息內容中包含 "test" 的消息 RecordFilterStrategy recordFilterStrategy = new RecordFilterStrategy() { @Override public boolean filter(ConsumerRecord consumerRecord) { String value = consumerRecord.value().toString(); if (value !=null && value.contains("test")) { System.err.println(consumerRecord.value()); // 返回 false 則接收消息 return false; } // 返回 true 則拋棄消息 return true; } }; // 將過濾器添添加到參數中 factory.setRecordFilterStrategy(recordFilterStrategy); return factory; } /** * 監聽消息,接收過濾器過濾后的消息 */ @KafkaListener(topics = {"test"},groupId = "group1") public void kafkaListener(String message){ System.out.println("消息:"+message); } } 

9、監聽器異常處理

(1)、單消息消費異常處理器

@Service public class ConsumerService { /** * 消息監聽器 */ @KafkaListener( topics = {"test"},groupId = "group1",errorHandler = "listenErrorHandler") public void listen(String message) { System.out.println(message); // 創建異常,觸發異常處理器 throw new NullPointerException("測試錯誤處理器"); } /** * 異常處理器 */ @Bean public ConsumerAwareListenerErrorHandler listenErrorHandler() { return new ConsumerAwareListenerErrorHandler() { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) { System.out.println("message:" + message.getPayload()); System.out.println("exception:" + e.getMessage()); consumer.seek(new TopicPartition(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class), message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)), message.getHeaders().get(KafkaHeaders.OFFSET, Long.class)); return null; } }; } } 

(2)、批量消費異常處理器

@Service public class ConsumerService { /** * 消息監聽器 */ @KafkaListener( topics = {"test"},groupId = "group1",errorHandler = "listenErrorHandler") public void listen(List<String> messages) { for(String msg:messages){ System.out.println(msg); } // 創建異常,觸發異常處理器 throw new NullPointerException("測試錯誤處理器"); } /** * 異常處理器 */ @Bean public ConsumerAwareListenerErrorHandler listenErrorHandler() { return new ConsumerAwareListenerErrorHandler() { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) { System.out.println("message:" + message.getPayload()); System.out.println("exception:" + e.getMessage()); consumer.seek(new TopicPartition(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class), message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)), message.getHeaders().get(KafkaHeaders.OFFSET, Long.class)); return null; } }; } } 

(3)、全局異常處理

將異常處理器添加到 kafkaListenerContainerFactory 中來設置全局異常處理。

@Configuration @EnableKafka public class ConsumerConfigDemo { @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return propsMap; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); // 將單條消息異常處理器添加到參數中 factory.setErrorHandler(errorHandler); // 將批量消息異常處理器添加到參數中 //factory.setErrorHandler(errorHandler); return factory; } /** * 單消息消費異常處理器 */ @Bean public ConsumerAwareListenerErrorHandler listenErrorHandler() { return new ConsumerAwareListenerErrorHandler() { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) { System.out.println("message:" + message.getPayload()); System.out.println("exception:" + e.getMessage()); consumer.seek(new TopicPartition(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class), message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)), message.getHeaders().get(KafkaHeaders.OFFSET, Long.class)); return null; } }; } /** * 批量息消費異常處理器 */ @Bean public ConsumerAwareListenerErrorHandler listenErrorHandler() { return new ConsumerAwareListenerErrorHandler() { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) { System.out.println("message:" + message.getPayload()); System.out.println("exception:" + e.getMessage()); consumer.seek(new TopicPartition(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class), message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)), message.getHeaders().get(KafkaHeaders.OFFSET, Long.class)); return null; } }; } /** * 監聽消息,接收過濾器過濾后的消息 */ @KafkaListener(topics = {"test"},groupId = "group1") public void kafkaListener(String message){ System.out.println("消息:"+message); } } 

10、Kafka Consumer 手動/自動提交 Offset

       在kafka的消費者中有一個非常關鍵的機制,那就是 offset 機制。它使得 Kafka 在消費的過程中即使掛了或者引發再均衡問題重新分配 Partation,當下次重新恢復消費時仍然可以知道從哪里開始消費。

       Kafka中偏移量的自動提交是由參數 enable_auto_commit 和 auto_commit_interval_ms 控制的,當 enable_auto_commit=true 時,Kafka在消費的過程中會以頻率為 auto_commit_interval_ms 向 Kafka 自帶的 topic(__consumer_offsets) 進行偏移量提交,具體提交到哪個 Partation 是以算法:”partation=hash(group_id)%50” 來計算的。

在 Spring 中對 Kafka 設置手動或者自動提交Offset如下:

(1)、自動提交

自動提交需要配置下面兩個參數:

  • auto.commit.enable=true:是否將offset維護交給kafka自動提交到zookeeper中維護,設置為true。
  • auto.commit.interval.ms=10000:自動提交時間間隔。

配置示例如下:

@Configuration @EnableKafka public class ConsumerConfigDemo { @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); // ---設置自動提交Offset為true propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return propsMap; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // 消費者線程數 factory.setConcurrency(3); // 拉取超時時間 factory.getContainerProperties().setPollTimeout(3000); return factory; } /** * -------------接收消息------------- */ @KafkaListener(topics = {"test"}, groupId = "group1") public void kafkaListener(String message){ System.out.println("消息:"+message); } 

(2)、手動提交

手動提交需要配置下面一個參數:

  • auto.commit.enable=false:是否將offset維護交給kafka自動提交到zookeeper中維護,設置為false。

然后需要在程序中設置ack模式,從而進行手動提交維護offset。

@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); 設置ACK模式(手動提交模式,這里有七種) factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD); return factory; } 

在 kafkaListenerContainerFactory 配置中設置 AckMode,它有七種模式分別為:

  • RECORD: 每處理完一條記錄后提交。
  • BATCH(默認): 每次poll一批數據后提交一次,頻率取決於每次poll的調用頻率。
  • TIME: 每次間隔ackTime的時間提交。
  • COUNT: 處理完poll的一批數據后並且距離上次提交處理的記錄數超過了設置的ackCount就提交。
  • COUNT_TIME: TIME和COUNT中任意一條滿足即提交。
  • MANUAL: 手動調用Acknowledgment.acknowledge()后,並且處理完poll的這批數據后提交。
  • MANUAL_IMMEDIATE: 手動調用Acknowledgment.acknowledge()后立即提交。

注意:如果設置 AckMode 模式為 MANUAL 或者 MANUAL_IMMEDIATE,則需要對監聽消息的方法中,引入 Acknowledgment 對象參數,並調用 acknowledge() 方法進行手動提交

手動提交下這里將列出七種ACK模式示例,如下:

  • ACK 模式: RECORD
  • 描述: 每處理完一條記錄后提交。
@Configuration @EnableKafka public class ConsumerConfigDemo { @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); // ---設置自動提交Offset為false propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return propsMap; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); // 設置ACK模式為RECORD factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD); return factory; } /** * -------------接收消息------------- */ @KafkaListener(topics = {"test"}, groupId = "group1") public void kafkaListener(String message){ System.out.println("消息:"+message); } 
  • ACK 模式: BATCH
  • 描述: 每次poll一批數據后提交一次,頻率取決於每次poll的調用頻率。
@Configuration @EnableKafka public class ConsumerConfigDemo { @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); // ---設置自動提交Offset為false propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 設置每次批量消費數目,例如生產者生成10條數據,設置此值為4,那么需要三次批消費(三次中每次消費數目為:4,4,2)才能完成 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4"); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return propsMap; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); // 開啟批量消費監聽器 factory.setBatchListener(true); // 設置ACK模式為BATCH factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); return factory; } /** * -------------接收消息------------- * 批量消費時,設置參數為List來接收數據 */ @KafkaListener(topics = {"test"}, groupId = "group1") public void kafkaListener(List<String> message){ System.out.println("消息:"+message); } 
  • ACK 模式: COUNT
  • 描述: 處理完poll的一批數據后並且距離上次提交處理的記錄數超過了設置的ackCount值就提交。
@Configuration @EnableKafka public class ConsumerConfigDemo { @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); // ---設置自動提交Offset為false propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return propsMap; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); // 設置ACK模式為COUNT factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.COUNT); // 設置AckCount數目,每接收AckCount條記錄數就提交Offset偏移量 factory.getContainerProperties().setAckCount(10); return factory; } /** * -------------接收消息------------- */ @KafkaListener(topics = {"test"}, groupId = "group1") public void kafkaListener(String message){ System.out.println("消息:"+message); } 
  • ACK 模式: TIME
  • 描述: 每次間隔ackTime的時間提交。
@Configuration @EnableKafka public class ConsumerConfigDemo { @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); // ---設置自動提交Offset為false propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return propsMap; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); // 設置ACK模式為TIME factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.TIME); // 設置提交Ack的時間間隔,單位(ms) factory.getContainerProperties().setAckTime(1000); return factory; } /** * -------------接收消息------------- */ @KafkaListener(topics = {"test"}, groupId = "group1") public void kafkaListener(String message){ System.out.println("消息:"+message); } 
  • ACK 模式: COUNT_TIME。
  • 描述: 每次間隔ackTime的時間或處理完poll的一批數據后並且距離上次提交處理的記錄數超過了設置的ackCount值,任意一條滿足即提交。
@Configuration @EnableKafka public class ConsumerConfigDemo { @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); // ---設置自動提交Offset為false propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return propsMap; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); // 設置ACK模式為COUNT_TIME factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.COUNT_TIME); // 設置提交Ack的時間間隔,單位(ms) factory.getContainerProperties().setAckTime(1000); // 設置AckCount數目,每接收AckCount條記錄數就提交Offset偏移量 factory.getContainerProperties().setAckCount(10); return factory; } /** * -------------接收消息------------- */ @KafkaListener(topics = {"test"}, groupId = "group1") public void kafkaListener(String message){ System.out.println("消息:"+message); } 
  • ACK 模式: MANUAL
  • 描述: 手動調用Acknowledgment.acknowledge()后,並且處理完poll的這批數據后提交。
@Configuration @EnableKafka public class ConsumerConfigDemo { @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); // ---設置自動提交Offset為false propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 設置每次批量消費數目,例如生產者生成10條數據,設置此值為4,那么需要三次批消費(三次中每次消費數目為:4,4,2)才能完成 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4"); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return propsMap; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); // 開啟批量消費監聽器 factory.setBatchListener(true); // 設置ACK模式為MANUAL factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); return factory; } /** * -------------接收消息------------- * 批量消費時,設置參數為List來接收數據,並且因為ack模式為MANUAL,所以需要手動調用acknowledge()方法提交 */ @KafkaListener(topics = {"test"}, groupId = "group1") public void kafkaListener(List<String> message, Acknowledgment acknowledgment){ System.out.println("消息:"+message); // 手動執行acknowledge()提交offset偏移量 acknowledgment.acknowledge(); } 
  • ACK 模式: MANUAL_IMMEDIATE
  • 描述: 手動調用Acknowledgment.acknowledge()后立即提交。
@Configuration @EnableKafka public class ConsumerConfigDemo { @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); // ---設置自動提交Offset為false propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 設置每次批量消費數目,例如生產者生成10條數據,設置此值為4,那么需要三次批消費(三次中每次消費數目為:4,4,2)才能完成 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4"); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return propsMap; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); // 開啟批量消費監聽器 factory.setBatchListener(true); // 設置ACK模式為MANUAL_IMMEDIATE factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } /** * -------------接收消息------------- * 批量消費時,設置參數為List來接收數據,並且因為ack模式為MANUAL,所以需要手動調用acknowledge()方法提交 */ @KafkaListener(topics = {"test"}, groupId = "group1") public void kafkaListener(List<String> message, Acknowledgment acknowledgment){ System.out.println("消息:"+message); // 手動執行acknowledge()提交offset偏移量 acknowledgment.acknowledge(); } 

http://www.mydlq.club/article/34/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM