自己在使用Spring Kafka 的消費者消費消息的時候的實踐總結:
接口 KafkaDataListener 是spring-kafka提供的一個供消費者接受消息的頂層接口,也是一個空接口; public interface KafkaDataListener<T> {} 對於消費端接收消息的時候,spring-kafka的設計思路是,提供一個頂層接口,提供兩個子類,一個子類是自動提交offset的,另一個子類是手動提交offset的. 無論是自動提交offset還是手動提交offset,又各分為兩種,一種是一次只處理一條消息,另一種是一次可以處理一批消息. 該 KafkaDataListener 頂層接口有兩個實現類:GenericMessageListener 和 GenericAcknowledgingMessageListener, 二者的區別是,前者是自動提交offset,后者是手動提交offset。 1、 GenericMessageListener 該接口是自動提交offset,它的onMessage方法的參數只有一個,就是傳遞過來的一條消息; public interface GenericMessageListener<T> extends KafkaDataListener<T> {void onMessage(T data);} 這個接口又有兩個子接口:MessageListener 和 BatchMessageListener 這兩個接口也都是空接口,二者的區別是,前者一次只處理一條消息,后者一次處理一批消息. //一次處理一條消息 //消費者如果實現該接口的話,如果配置中設置max.poll.records參數大於1的話是無效的,因為它一次只處理一條消息 public interface MessageListener<K, V> extends GenericMessageListener<ConsumerRecord<K, V>> {} //一次可以處理一批消息,每一批次的消息總條數是隨機的,但可以在消費者的配置中設置一個最大值(max.poll.records, //比如設置了最大拉取的消息條數為100,那么onMessage方法每次接受到的消息條數是隨機的,但最大不會超過100) public interface BatchMessageListener<K, V> extends GenericMessageListener<List<ConsumerRecord<K, V>>> {} 2、 GenericAcknowledgingMessageListener 該接口是手動提交offset,它的onMessage方法的參數有兩個,第一個是傳遞過來的一條消息,第二個參數是用於提交offset的對象 public interface GenericAcknowledgingMessageListener<T> extends KafkaDataListener<T> {void onMessage(T data, Acknowledgment acknowledgment);} 這個接口也有兩個子接口:AcknowledgingMessageListener 和 BatchAcknowledgingMessageListener,這兩個接口也都是空接口. //一次只處理一條消息,並手動提交offset,需要在消費者的配置中設置<property name="ackMode" value="MANUAL_IMMEDIATE"/> public interface AcknowledgingMessageListener<K, V> extends GenericAcknowledgingMessageListener<ConsumerRecord<K, V>> {} //一次處理一批消息,處理完這一批消息之后,在批量提交offset,需要在消費者的配置中設置<property name="ackMode" value="MANUAL"/> public interface BatchAcknowledgingMessageListener<K, V> extends GenericAcknowledgingMessageListener<List<ConsumerRecord<K, V>>> {}
下面的消費者繼承的是MessageListener這個監聽器,就是一次處理一條消息,而且是自動提交offset:
1 import com.alibaba.fastjson.JSON; 2 import com.alibaba.fastjson.TypeReference; 3 import com.xxxxxx.consumer.dto.FriendRelationDto; 4 import com.xxxxxx.consumer.dto.MessageDto; 5 import com.xxxxxx.consumer.service.FriendRelationService; 6 import org.apache.kafka.clients.consumer.ConsumerRecord; 7 import org.slf4j.Logger; 8 import org.slf4j.LoggerFactory; 9 import org.springframework.beans.factory.annotation.Autowired; 10 import org.springframework.kafka.listener.MessageListener; 11 import org.springframework.stereotype.Service; 12 import java.io.IOException; 13 14 /** 15 * Created by SYJ on 2017/3/21. 16 */ 17 @Service 18 public class ConsumerService implements MessageListener<Integer, String> { 19 20 private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class); 21 @Autowired 22 private FriendRelationService friendRelationService; 23 24 /** 25 * 消息監聽方法 26 * @param record 27 */ 28 @Override 29 public void onMessage(ConsumerRecord<Integer, String> record) { 30 logger.info("Before receiving:" + record.toString()); 31 String value = record.value(); 32 MessageDto<FriendRelationDto> message = JSON.parseObject(value, new TypeReference<MessageDto<FriendRelationDto>>(){}); 33 try { 34 friendRelationService.process(message.getData()); 35 } catch (IOException e) { 36 e.printStackTrace(); 37 } 38 } 39 }
下面的消費者實現的BatchMessageListener這個監聽器,就是一次接受一批消息,消息的數量是隨機的,但最大不會超過"max.poll.records"參數配置的數量:
1 import com.alibaba.fastjson.JSON; 2 import com.alibaba.fastjson.TypeReference; 3 import com.xxxxxx.consumer.dto.FriendRelationDto; 4 import com.xxxxxx.consumer.dto.MessageDto; 5 import com.xxxxxx.consumer.service.FriendRelationService; 6 import org.apache.kafka.clients.consumer.ConsumerRecord; 7 import org.slf4j.Logger; 8 import org.slf4j.LoggerFactory; 9 import org.springframework.beans.factory.annotation.Autowired; 10 import org.springframework.kafka.listener.BatchMessageListener; 11 import org.springframework.stereotype.Service; 12 13 import java.io.IOException; 14 import java.util.List; 15 16 /** 17 * Created by SYJ on 2017/3/21. 18 */ 19 @Service 20 public class ConsumerService implements BatchMessageListener<Integer, String> { 21 22 private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class); 23 @Autowired 24 private FriendRelationService friendRelationService; 25 26 @Override 27 public void onMessage(List<ConsumerRecord<Integer, String>> recordList) { 28 for (ConsumerRecord<Integer, String> record : recordList) { 29 logger.info("Before receiving:" + record.toString()); 30 String value = record.value(); 31 MessageDto<FriendRelationDto> message = JSON.parseObject(value, new TypeReference<MessageDto<FriendRelationDto>>(){}); 32 try { 33 friendRelationService.process(message.getData()); 34 } catch (IOException e) { 35 e.printStackTrace(); 36 } 37 } 38 39 } 40 }
下面的消費者實現的是AcknowledgingMessageListener這個監聽器,它的特點是一次接收一條消息,可以通過acknowledgment來手動提交offset,需要在消費者的配置中指定<property name="ackMode" value="MANUAL_IMMEDIATE"/>:
1 import com.alibaba.fastjson.JSON; 2 import com.alibaba.fastjson.TypeReference; 3 import com.xxxxxx.consumer.dto.FriendRelationDto; 4 import com.xxxxxx.consumer.dto.MessageDto; 5 import com.xxxxxx.consumer.service.FriendRelationService; 6 import org.apache.kafka.clients.consumer.ConsumerRecord; 7 import org.slf4j.Logger; 8 import org.slf4j.LoggerFactory; 9 import org.springframework.beans.factory.annotation.Autowired; 10 import org.springframework.kafka.listener.AcknowledgingMessageListener; 11 import org.springframework.kafka.support.Acknowledgment; 12 import org.springframework.stereotype.Service; 13 14 import java.io.IOException; 15 16 /** 17 * Created by SYJ on 2017/3/21. 18 */ 19 @Service 20 public class ConsumerService implements AcknowledgingMessageListener<Integer, String> { 21 22 private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class); 23 @Autowired 24 private FriendRelationService friendRelationService; 25 26 @Override 27 public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) { 28 logger.info("Before receiving:" + record.toString()); 29 String value = record.value(); 30 MessageDto<FriendRelationDto> message = JSON.parseObject(value, new TypeReference<MessageDto<FriendRelationDto>>(){}); 31 try { 32 friendRelationService.process(message.getData()); 33 logger.info("===========開始提交offset============="); 34 acknowledgment.acknowledge();//提交offset 35 logger.info("===========已經提交offset============="); 36 } catch (IOException e) { 37 e.printStackTrace(); 38 } 39 } 40 }
下面的消費者實現的是BatchAcknowledgingMessageListener這個監聽器,它的特點是一次可以處理一批消息,並且可以在處理完這一批消息之后提交offset,需要在消費者的配置文件中配置"max.poll.records"參數指定本批消息可以達到的最大值,並指定<property name="ackMode" value="MANUAL"/>:
1 import com.alibaba.fastjson.JSON; 2 import com.alibaba.fastjson.TypeReference; 3 import com.xxxxxx.consumer.dto.FriendRelationDto; 4 import com.xxxxxx.consumer.dto.MessageDto; 5 import com.xxxxxx.consumer.service.FriendRelationService; 6 import org.apache.kafka.clients.consumer.ConsumerRecord; 7 import org.slf4j.Logger; 8 import org.slf4j.LoggerFactory; 9 import org.springframework.beans.factory.annotation.Autowired; 10 import org.springframework.kafka.listener.BatchAcknowledgingMessageListener; 11 import org.springframework.kafka.support.Acknowledgment; 12 import org.springframework.stereotype.Service; 13 14 import java.io.IOException; 15 import java.util.List; 16 17 /** 18 * Created by SYJ on 2017/3/21. 19 */ 20 @Service 21 public class ConsumerService implements BatchAcknowledgingMessageListener<Integer, String> { 22 23 private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class); 24 @Autowired 25 private FriendRelationService friendRelationService; 26 27 28 @Override 29 public void onMessage(List<ConsumerRecord<Integer, String>> recordList, Acknowledgment acknowledgment) { 30 logger.info("Before receiving:" + recordList.toString()); 31 logger.info("本次消息總數:" + recordList.size()); 32 for (ConsumerRecord<Integer, String> record : recordList) { 33 String value = record.value(); 34 MessageDto<FriendRelationDto> message = JSON.parseObject(value, new TypeReference<MessageDto<FriendRelationDto>>() { 35 }); 36 try { 37 friendRelationService.process(message.getData()); 38 } catch (IOException e) { 39 e.printStackTrace(); 40 } 41 } 42 logger.info("===========開始提交offset============="); 43 acknowledgment.acknowledge();//提交offset 44 logger.info("===========已經提交offset============="); 45 } 46 }
下面是spring-kafka消費端的配置文件示例:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://www.springframework.org/schema/beans 5 http://www.springframework.org/schema/beans/spring-beans.xsd"> 6 7 <bean id="consumerProperties" class="java.util.HashMap"> 8 <constructor-arg> 9 <map> 10 <entry key="bootstrap.servers" value="${bootstrap.servers}"/> 11 <!-- 指定消費組名 --> 12 <entry key="group.id" value="friend-group"/> 13 <entry key="enable.auto.commit" value="false"/> 14 <entry key="auto.commit.interval.ms" value="1000"/> 15 <entry key="session.timeout.ms" value="15000"/> 16 <!-- 當使用批量處理消息的時候,每次onMessage方法獲取到的消息總條數雖然是隨機的,但是不會超過此最大值 --> 17 <entry key="max.poll.records" value="50"/> 18 <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/> 19 <!--<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>--> 20 <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> 21 </map> 22 </constructor-arg> 23 </bean> 24 25 <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> 26 <constructor-arg> 27 <ref bean="consumerProperties"/> 28 </constructor-arg> 29 </bean> 30 31 <!-- 消費消息的服務類 --> 32 <bean id="messageListernerConsumerService" class="com.xxxxxxx.consumer.ConsumerService"/> 33 34 <!-- 消費者容器配置信息 --> 35 <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> 36 <constructor-arg value="friend"/> 37 <!--<constructor-arg> 38 <list> 39 <value>zptopic</value> 40 <value>ssmk</value> 41 <value>friend</value> 42 </list> 43 </constructor-arg>--> 44 <property name="messageListener" ref="messageListernerConsumerService"/> 45 46 <!-- 提交offset,批量提交 --> 47 <property name="ackMode" value="MANUAL"/> 48 <!-- 提交offset的方式,處理完一條消息就立即提交 --> 49 <!--<property name="ackMode" value="MANUAL_IMMEDIATE"/>--> 50 </bean> 51 52 <!-- 單線程消息監聽容器,每啟動一個消費者客戶端,只會開啟一個線程來消費 --> 53 <!--<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart"> 54 <constructor-arg ref="consumerFactory"/> 55 <constructor-arg ref="containerProperties"/> 56 </bean>--> 57 58 <!-- 多線程消息監聽容器,每啟動一個消費者客戶端,可以開啟多個線程,開啟多少個線程自己可以通過concurrency來指定 --> 59 <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart"> 60 <constructor-arg ref="consumerFactory"/> 61 <constructor-arg ref="containerProperties"/> 62 <property name="concurrency" value="5"/> 63 </bean> 64 65 </beans>