關於SpringKafka消費者的幾個監聽器:[一次處理單條消息和一次處理一批消息]以及[自動提交offset和手動提交offset]


自己在使用Spring Kafka 的消費者消費消息的時候的實踐總結:

接口 KafkaDataListener 是spring-kafka提供的一個供消費者接受消息的頂層接口,也是一個空接口;
public interface KafkaDataListener<T> {}

對於消費端接收消息的時候,spring-kafka的設計思路是,提供一個頂層接口,提供兩個子類,一個子類是自動提交offset的,另一個子類是手動提交offset的.
無論是自動提交offset還是手動提交offset,又各分為兩種,一種是一次只處理一條消息,另一種是一次可以處理一批消息.

該 KafkaDataListener 頂層接口有兩個實現類:GenericMessageListener 和 GenericAcknowledgingMessageListener,
二者的區別是,前者是自動提交offset,后者是手動提交offset。

1、 GenericMessageListener 
    該接口是自動提交offset,它的onMessage方法的參數只有一個,就是傳遞過來的一條消息;
    public interface GenericMessageListener<T> extends KafkaDataListener<T> {void onMessage(T data);}
    這個接口又有兩個子接口:MessageListener 和 BatchMessageListener
    這兩個接口也都是空接口,二者的區別是,前者一次只處理一條消息,后者一次處理一批消息.
    
    //一次處理一條消息
    //消費者如果實現該接口的話,如果配置中設置max.poll.records參數大於1的話是無效的,因為它一次只處理一條消息
    public interface MessageListener<K, V> extends GenericMessageListener<ConsumerRecord<K, V>> {}
    //一次可以處理一批消息,每一批次的消息總條數是隨機的,但可以在消費者的配置中設置一個最大值(max.poll.records,
    //比如設置了最大拉取的消息條數為100,那么onMessage方法每次接受到的消息條數是隨機的,但最大不會超過100)
    public interface BatchMessageListener<K, V> extends GenericMessageListener<List<ConsumerRecord<K, V>>> {}
    
2、 GenericAcknowledgingMessageListener
    該接口是手動提交offset,它的onMessage方法的參數有兩個,第一個是傳遞過來的一條消息,第二個參數是用於提交offset的對象
    public interface GenericAcknowledgingMessageListener<T> extends KafkaDataListener<T> {void onMessage(T data, Acknowledgment acknowledgment);}
    
    這個接口也有兩個子接口:AcknowledgingMessageListener 和 BatchAcknowledgingMessageListener,這兩個接口也都是空接口.
    //一次只處理一條消息,並手動提交offset,需要在消費者的配置中設置<property name="ackMode" value="MANUAL_IMMEDIATE"/>
    public interface AcknowledgingMessageListener<K, V> extends GenericAcknowledgingMessageListener<ConsumerRecord<K, V>> {}
    //一次處理一批消息,處理完這一批消息之后,在批量提交offset,需要在消費者的配置中設置<property name="ackMode" value="MANUAL"/>
    public interface BatchAcknowledgingMessageListener<K, V> extends GenericAcknowledgingMessageListener<List<ConsumerRecord<K, V>>> {}

 

下面的消費者繼承的是MessageListener這個監聽器,就是一次處理一條消息,而且是自動提交offset:

 1 import com.alibaba.fastjson.JSON;
 2 import com.alibaba.fastjson.TypeReference;
 3 import com.xxxxxx.consumer.dto.FriendRelationDto;
 4 import com.xxxxxx.consumer.dto.MessageDto;
 5 import com.xxxxxx.consumer.service.FriendRelationService;
 6 import org.apache.kafka.clients.consumer.ConsumerRecord;
 7 import org.slf4j.Logger;
 8 import org.slf4j.LoggerFactory;
 9 import org.springframework.beans.factory.annotation.Autowired;
10 import org.springframework.kafka.listener.MessageListener;
11 import org.springframework.stereotype.Service;
12 import java.io.IOException;
13 
14 /**
15  * Created by SYJ on 2017/3/21.
16  */
17 @Service
18 public class ConsumerService implements MessageListener<Integer, String> {
19 
20     private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);
21     @Autowired
22     private FriendRelationService friendRelationService;
23 
24     /**
25      * 消息監聽方法
26      * @param record
27      */
28     @Override
29     public void onMessage(ConsumerRecord<Integer, String> record) {
30         logger.info("Before receiving:" + record.toString());
31         String value = record.value();
32         MessageDto<FriendRelationDto> message = JSON.parseObject(value, new TypeReference<MessageDto<FriendRelationDto>>(){});
33         try {
34             friendRelationService.process(message.getData());
35         } catch (IOException e) {
36             e.printStackTrace();
37         }
38     }
39 }

 

下面的消費者實現的BatchMessageListener這個監聽器,就是一次接受一批消息,消息的數量是隨機的,但最大不會超過"max.poll.records"參數配置的數量:

 1 import com.alibaba.fastjson.JSON;
 2 import com.alibaba.fastjson.TypeReference;
 3 import com.xxxxxx.consumer.dto.FriendRelationDto;
 4 import com.xxxxxx.consumer.dto.MessageDto;
 5 import com.xxxxxx.consumer.service.FriendRelationService;
 6 import org.apache.kafka.clients.consumer.ConsumerRecord;
 7 import org.slf4j.Logger;
 8 import org.slf4j.LoggerFactory;
 9 import org.springframework.beans.factory.annotation.Autowired;
10 import org.springframework.kafka.listener.BatchMessageListener;
11 import org.springframework.stereotype.Service;
12 
13 import java.io.IOException;
14 import java.util.List;
15 
16 /**
17  * Created by SYJ on 2017/3/21.
18  */
19 @Service
20 public class ConsumerService implements BatchMessageListener<Integer, String> {
21 
22     private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);
23     @Autowired
24     private FriendRelationService friendRelationService;
25 
26     @Override
27     public void onMessage(List<ConsumerRecord<Integer, String>> recordList) {
28         for (ConsumerRecord<Integer, String> record : recordList) {
29             logger.info("Before receiving:" + record.toString());
30             String value = record.value();
31             MessageDto<FriendRelationDto> message = JSON.parseObject(value, new TypeReference<MessageDto<FriendRelationDto>>(){});
32             try {
33                 friendRelationService.process(message.getData());
34             } catch (IOException e) {
35                 e.printStackTrace();
36             }
37         }
38 
39     }
40 }

下面的消費者實現的是AcknowledgingMessageListener這個監聽器,它的特點是一次接收一條消息,可以通過acknowledgment來手動提交offset,需要在消費者的配置中指定<property name="ackMode" value="MANUAL_IMMEDIATE"/>:

 1 import com.alibaba.fastjson.JSON;
 2 import com.alibaba.fastjson.TypeReference;
 3 import com.xxxxxx.consumer.dto.FriendRelationDto;
 4 import com.xxxxxx.consumer.dto.MessageDto;
 5 import com.xxxxxx.consumer.service.FriendRelationService;
 6 import org.apache.kafka.clients.consumer.ConsumerRecord;
 7 import org.slf4j.Logger;
 8 import org.slf4j.LoggerFactory;
 9 import org.springframework.beans.factory.annotation.Autowired;
10 import org.springframework.kafka.listener.AcknowledgingMessageListener;
11 import org.springframework.kafka.support.Acknowledgment;
12 import org.springframework.stereotype.Service;
13 
14 import java.io.IOException;
15 
16 /**
17  * Created by SYJ on 2017/3/21.
18  */
19 @Service
20 public class ConsumerService implements AcknowledgingMessageListener<Integer, String> {
21 
22     private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);
23     @Autowired
24     private FriendRelationService friendRelationService;
25 
26     @Override
27     public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) {
28         logger.info("Before receiving:" + record.toString());
29         String value = record.value();
30         MessageDto<FriendRelationDto> message = JSON.parseObject(value, new TypeReference<MessageDto<FriendRelationDto>>(){});
31         try {
32             friendRelationService.process(message.getData());
33             logger.info("===========開始提交offset=============");
34             acknowledgment.acknowledge();//提交offset
35             logger.info("===========已經提交offset=============");
36         } catch (IOException e) {
37             e.printStackTrace();
38         }
39     }
40 }

下面的消費者實現的是BatchAcknowledgingMessageListener這個監聽器,它的特點是一次可以處理一批消息,並且可以在處理完這一批消息之后提交offset,需要在消費者的配置文件中配置"max.poll.records"參數指定本批消息可以達到的最大值,並指定<property name="ackMode" value="MANUAL"/>:

 1 import com.alibaba.fastjson.JSON;
 2 import com.alibaba.fastjson.TypeReference;
 3 import com.xxxxxx.consumer.dto.FriendRelationDto;
 4 import com.xxxxxx.consumer.dto.MessageDto;
 5 import com.xxxxxx.consumer.service.FriendRelationService;
 6 import org.apache.kafka.clients.consumer.ConsumerRecord;
 7 import org.slf4j.Logger;
 8 import org.slf4j.LoggerFactory;
 9 import org.springframework.beans.factory.annotation.Autowired;
10 import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
11 import org.springframework.kafka.support.Acknowledgment;
12 import org.springframework.stereotype.Service;
13 
14 import java.io.IOException;
15 import java.util.List;
16 
17 /**
18  * Created by SYJ on 2017/3/21.
19  */
20 @Service
21 public class ConsumerService implements BatchAcknowledgingMessageListener<Integer, String> {
22 
23     private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);
24     @Autowired
25     private FriendRelationService friendRelationService;
26     
27 
28     @Override
29     public void onMessage(List<ConsumerRecord<Integer, String>> recordList, Acknowledgment acknowledgment) {
30         logger.info("Before receiving:" + recordList.toString());
31         logger.info("本次消息總數:" + recordList.size());
32         for (ConsumerRecord<Integer, String> record : recordList) {
33             String value = record.value();
34             MessageDto<FriendRelationDto> message = JSON.parseObject(value, new TypeReference<MessageDto<FriendRelationDto>>() {
35             });
36             try {
37                 friendRelationService.process(message.getData());
38             } catch (IOException e) {
39                 e.printStackTrace();
40             }
41         }
42         logger.info("===========開始提交offset=============");
43         acknowledgment.acknowledge();//提交offset
44         logger.info("===========已經提交offset=============");
45     }
46 }

 

下面是spring-kafka消費端的配置文件示例:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xsi:schemaLocation="http://www.springframework.org/schema/beans
 5          http://www.springframework.org/schema/beans/spring-beans.xsd">
 6     
 7     <bean id="consumerProperties" class="java.util.HashMap">
 8         <constructor-arg>
 9             <map>
10                 <entry key="bootstrap.servers" value="${bootstrap.servers}"/>
11                 <!-- 指定消費組名 -->
12                 <entry key="group.id" value="friend-group"/>
13                 <entry key="enable.auto.commit" value="false"/>
14                 <entry key="auto.commit.interval.ms" value="1000"/>
15                 <entry key="session.timeout.ms" value="15000"/>
16                 <!-- 當使用批量處理消息的時候,每次onMessage方法獲取到的消息總條數雖然是隨機的,但是不會超過此最大值 -->
17                 <entry key="max.poll.records" value="50"/>
18                 <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
19                 <!--<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>-->
20                 <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
21             </map>
22         </constructor-arg>
23     </bean>
24 
25     <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
26         <constructor-arg>
27             <ref bean="consumerProperties"/>
28         </constructor-arg>
29     </bean>
30 
31     <!-- 消費消息的服務類 -->
32     <bean id="messageListernerConsumerService" class="com.xxxxxxx.consumer.ConsumerService"/>
33 
34     <!-- 消費者容器配置信息 -->
35     <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
36         <constructor-arg value="friend"/>
37         <!--<constructor-arg>
38             <list>
39                 <value>zptopic</value>
40                 <value>ssmk</value>
41                 <value>friend</value>
42             </list>
43         </constructor-arg>-->
44         <property name="messageListener" ref="messageListernerConsumerService"/>
45 
46         <!-- 提交offset,批量提交 -->
47         <property name="ackMode" value="MANUAL"/>
48         <!-- 提交offset的方式,處理完一條消息就立即提交 -->
49         <!--<property name="ackMode" value="MANUAL_IMMEDIATE"/>-->
50     </bean>
51 
52     <!-- 單線程消息監聽容器,每啟動一個消費者客戶端,只會開啟一個線程來消費 -->
53     <!--<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
54         <constructor-arg ref="consumerFactory"/>
55         <constructor-arg ref="containerProperties"/>
56     </bean>-->
57 
58     <!-- 多線程消息監聽容器,每啟動一個消費者客戶端,可以開啟多個線程,開啟多少個線程自己可以通過concurrency來指定 -->
59     <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart">
60         <constructor-arg ref="consumerFactory"/>
61         <constructor-arg ref="containerProperties"/>
62         <property name="concurrency" value="5"/>
63     </bean>
64 
65 </beans>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM