Spring Kafka(五)@KafkaListener的花式操作


1. 消息監聽

對於Kafka中Topic的數據消費,我們一般都選擇使用消息監聽器進行消費,怎么把消息監聽器玩出花來呢,那就得看看它所實現的功能了。

Spring-Kafka中消息監聽大致分為兩種類型,

一種是單條數據消費,

一種是批量消費;

兩者的區別只是在於監聽器一次性獲取消息的數量。

GenericMessageListener是我們實現消息監聽的一個接口,向上擴展的接口有非常多,

比如:單數據消費的MessageListener、批量消費的BatchMessageListener、還有具備ACK機制的AcknowledgingMessageListener和BatchAcknowledgingMessageListener等等。

接下來我們就一一解析一下。

2. GenericMessageListener

這里可以看到GenericMessageListener使用注解標明這是一個函數式接口,默認實現了三種不同參數的onMessage方法。

data就是我們需要接收的數據,Consumer則是消費者類,Acknowledgment則是用來實現Ack機制的類。

這里需要注意一下的是,Consumer對象並不是線程安全的。

@FunctionalInterface public interface GenericMessageListener<T> { void onMessage(T var1); default void onMessage(T data, Acknowledgment acknowledgment) { throw new UnsupportedOperationException("Container should never call this"); } default void onMessage(T data, Consumer<?, ?> consumer) { throw new UnsupportedOperationException("Container should never call this"); } default void onMessage(T data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) { throw new UnsupportedOperationException("Container should never call this"); } }
View Code

接下來先瀏覽一下繼承了GenericMessageListener接口的類。前綴為Batch的接口都是批處理類型的消息監聽接口,里面的參數也都講解過了

public interface MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data); } public interface AcknowledgingMessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment); } public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer); } public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); } public interface BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data); } public interface BatchAcknowledgingMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment); } public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer); } public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); }
View Code

在把@KafkaListener玩出花前,我們還需要了解怎么使用非注解方式去監聽Topic。

我們在創建監聽容器前需要創建一個監聽容器工廠,這里只需要配置一下消費者工廠就好了

,之后我們使用它去創建我們的監聽容器。consumerFactory()這個參數在之前就已經定義過了,這里就不重復貼代碼了。

 @Bean public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; }
View Code

有了監聽容器工廠,我們就可以使用它去創建我們的監聽容器

Bean方式創建監聽容器

@Bean public KafkaMessageListenerContainer demoListenerContainer() { ContainerProperties properties = new ContainerProperties("topic.quick.bean"); properties.setGroupId("bean"); properties.setMessageListener(new MessageListener<Integer,String>() { private Logger log = LoggerFactory.getLogger(this.getClass()); @Override public void onMessage(ConsumerRecord<Integer, String> record) { log.info("topic.quick.bean receive : " + record.toString()); } }); return new KafkaMessageListenerContainer(consumerFactory(), properties); }
View Code

啟動項目我們可以看一下控制台的日志,監聽容器成功分配給某個消費者的結果很清晰的顯示出來了,

順便就寫個測試方法測試一下監聽器能不能正常運行。

2018-09-11 10:36:15.732  INFO 1168 --- [erContainer-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=bean] Successfully joined group with generation 1
2018-09-11 10:36:15.733  INFO 1168 --- [erContainer-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=bean] Setting newly assigned partitions [topic.quick.bean-0] 2018-09-11 10:36:15.733  INFO 1168 --- [erContainer-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [topic.quick.bean-0]
View Code

發送一條消息到指定監聽隊列中

 @Test public void test() { kafkaTemplate.send("topic.quick.bean", "send msg to beanListener"); }
View Code

3. @KafkaListener參數講解

在前幾章入門的時候就已經寫過一個用@KafkaListener注解實現監聽的代碼,這里就貼一下之前寫的代碼

    @KafkaListener(id = "demo", topics = "topic.quick.demo") public void listen(String msgData) { log.info("demo receive : "+msgData); }
View Code

使用@KafkaListener這個注解並不局限於這個監聽容器是單條數據消費還是批量消費,

區分單數據還是多數據消費只需要配置一下注解的containerFactory屬性即可,先講解一下這個監聽方法都能接收寫什么參數吧。

data : 對於data值的類型其實並沒有限定,根據KafkaTemplate所定義的類型來決定。data為List集合的則是用作批量消費。

ConsumerRecord:具體消費數據類,包含Headers信息、分區信息、時間戳等

Acknowledgment:用作Ack機制的接口

Consumer:消費者類,使用該類我們可以手動提交偏移量、控制消費速率等功能

public void listen1(String data) public void listen2(ConsumerRecord<K,V> data) public void listen3(ConsumerRecord<K,V> data, Acknowledgment acknowledgment) public void listen4(ConsumerRecord<K,V> data, Acknowledgment acknowledgment, Consumer<K,V> consumer) public void listen5(List<String> data) public void listen6(List<ConsumerRecord<K,V>> data) public void listen7(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment) public void listen8(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment, Consumer<K,V> consumer) 
View Code

接下來在看看@KafkaListener的注解都提供了什么屬性。

d:消費者的id,當GroupId沒有被配置的時候,默認id為GroupId

containerFactory:上面提到了@KafkaListener區分單數據還是多數據消費只需要配置一下注解的containerFactory屬性就可以了,

這里面配置的是監聽容器工廠,也就是ConcurrentKafkaListenerContainerFactory,配置BeanName

topics:需要監聽的Topic,可監聽多個

topicPartitions:可配置更加詳細的監聽信息,必須監聽某個Topic中的指定分區,或者從offset為200的偏移量開始監聽

errorHandler:監聽異常處理器,配置BeanName

groupId:消費組ID

idIsGroup:id是否為GroupId

clientIdPrefix:消費者Id前綴

beanRef:真實監聽容器的BeanName,需要在 BeanName前加 "__"

public @interface KafkaListener { String id() default ""; String containerFactory() default ""; String[] topics() default {}; String topicPattern() default ""; TopicPartition[] topicPartitions() default {}; String containerGroup() default ""; String errorHandler() default ""; String groupId() default ""; boolean idIsGroup() default true; String clientIdPrefix() default ""; String beanRef() default "__listener"; }
View Code

現在開始才是把監聽容器玩出花來的時刻

4. 使用ConsumerRecord類消費

用ConsumerRecord類接收的好處是什么呢,ConsumerRecord類里面包含分區信息、消息頭、消息體等內容,如果業務需要獲取這些參數時,使用ConsumerRecord會是個不錯的選擇。

如果使用具體的類型接收消息體則更加方便,比如說用String類型去接收消息體。

這里我們編寫一個consumerListener方法,監聽"topic.quick.consumer" Topic,並把ConsumerRecord里面所包含的內容打印到控制台中

@Component public class SingleListener { private static final Logger log = LoggerFactory.getLogger(SingleListener.class); @KafkaListener(id = "consumer", topics = "topic.quick.consumer") public void consumerListener(ConsumerRecord<Integer, String> record) { log.info("topic.quick.consumer receive : " + record.toString()); } }
View Code

編寫測試方法,發送數據到對應的Topic中,運行測試我們可以看到控制台打印的日志,

日志里面包含topic、partition、offset等信息,這其實就是完整的消息儲存結構。

 @Test public void testConsumerRecord() { kafkaTemplate.send("topic.quick.consumer", "test receive by consumerRecord"); }
View Code
2018-09-11 15:52:13.546  INFO 13644 --- [ consumer-0-C-1] com.viu.kafka.listen.SingleListener      : topic.quick.consumer receive : ConsumerRecord(topic = topic.quick.consumer, partition = 0, offset = 0, CreateTime = 1536652333476, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = test receive by consumerRecord)
View Code

5. 批量消費案例

重新創建一份新的消費者配置,配置為一次拉取5條消息
創建一個監聽容器工廠,設置其為批量消費並設置並發量為5,這個並發量根據分區數決定,必須小於等於分區數,否則會有線程一直處於空閑狀態
創建一個分區數為8的Topic
創建監聽方法,設置消費id為batch,clientID前綴為batch,監聽topic.quick.batch,使用batchContainerFactory工廠創建該監聽容器

@Component public class BatchListener { private static final Logger log= LoggerFactory.getLogger(BatchListener.class); private Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); //一次拉取消息數量
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean("batchContainerFactory") public ConcurrentKafkaListenerContainerFactory listenerContainer() { ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory(); container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps())); //設置並發量,小於或等於Topic的分區數
        container.setConcurrency(5); //設置為批量監聽
        container.setBatchListener(true); return container; } @Bean public NewTopic batchTopic() { return new NewTopic("topic.quick.batch", 8, (short) 1); } @KafkaListener(id = "batch",clientIdPrefix = "batch",topics = {"topic.quick.batch"},containerFactory = "batchContainerFactory") public void batchListener(List<String> data) { log.info("topic.quick.batch  receive : "); for (String s : data) { log.info( s); } } }
View Code

緊接着我們啟動項目,控制台的日志信息非常完整,我們可以看到batchListener這個監聽容器的partition分配信息。

我們設置concurrency為5,也就是將會啟動5條線程進行監聽,那我們創建的topic則是有8個partition,意味着將有3條線程分配到2個partition和2條線程分配到1個partition。

我們可以看到這段日志的最后5行,這就是每條線程分配到的partition。

2018-09-11 12:47:49.628  INFO 4708 --- [    batch-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=batch-2, groupId=batch] Successfully joined group with generation 98
2018-09-11 12:47:49.628  INFO 4708 --- [    batch-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=batch-2, groupId=batch] Setting newly assigned partitions [topic.quick.batch-4, topic.quick.batch-5] 2018-09-11 12:47:49.630  INFO 4708 --- [    batch-3-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=batch-3, groupId=batch] Successfully joined group with generation 98
2018-09-11 12:47:49.630  INFO 4708 --- [    batch-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=batch-0, groupId=batch] Successfully joined group with generation 98
2018-09-11 12:47:49.630  INFO 4708 --- [    batch-4-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=batch-4, groupId=batch] Successfully joined group with generation 98
2018-09-11 12:47:49.630  INFO 4708 --- [    batch-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=batch-3, groupId=batch] Setting newly assigned partitions [topic.quick.batch-6] 2018-09-11 12:47:49.630  INFO 4708 --- [    batch-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=batch-0, groupId=batch] Setting newly assigned partitions [topic.quick.batch-0, topic.quick.batch-1] 2018-09-11 12:47:49.630  INFO 4708 --- [    batch-4-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=batch-4, groupId=batch] Setting newly assigned partitions [topic.quick.batch-7] 2018-09-11 12:47:49.631  INFO 4708 --- [    batch-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=batch-1, groupId=batch] Successfully joined group with generation 98
2018-09-11 12:47:49.631  INFO 4708 --- [    batch-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=batch-1, groupId=batch] Setting newly assigned partitions [topic.quick.batch-2, topic.quick.batch-3] 2018-09-11 12:47:49.633  INFO 4708 --- [    batch-3-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [topic.quick.batch-6] 2018-09-11 12:47:49.633  INFO 4708 --- [    batch-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [topic.quick.batch-0, topic.quick.batch-1] 2018-09-11 12:47:49.633  INFO 4708 --- [    batch-4-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [topic.quick.batch-7] 2018-09-11 12:47:49.633  INFO 4708 --- [    batch-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [topic.quick.batch-2, topic.quick.batch-3] 2018-09-11 12:47:49.634  INFO 4708 --- [    batch-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [topic.quick.batch-4, topic.quick.batch-5]
View Code

那我們來編寫一下測試方法,在短時間內發送12條消息到topic中,可以看到運行結果,對應的監聽方法總共拉取了三次數據,

其中兩次為5條數據,一次為2條數據,加起來就是我們在測試方法發送的12條數據。證明我們的批量消費方法是按預期進行的。

 @Autowired private KafkaTemplate kafkaTemplate; @Test public void testBatch() { for (int i = 0; i < 12; i++) { kafkaTemplate.send("topic.quick.batch", "test batch listener,dataNum-" + i); } }
View Code
2018-09-11 12:08:51.840  INFO 12416 --- [    batch-0-C-1] com.viu.kafka.listen.BatchListener : topic.quick.batch receive : 2018-09-11 12:08:51.840  INFO 12416 --- [    batch-0-C-1] com.viu.kafka.listen.BatchListener       : test batch listener,dataNum-5
2018-09-11 12:08:51.840  INFO 12416 --- [    batch-0-C-1] com.viu.kafka.listen.BatchListener       : test batch listener,dataNum-2
2018-09-11 12:08:51.840  INFO 12416 --- [    batch-0-C-1] com.viu.kafka.listen.BatchListener       : test batch listener,dataNum-10
2018-09-11 12:08:51.840  INFO 12416 --- [    batch-0-C-1] com.viu.kafka.listen.BatchListener       : test batch listener,dataNum-6
2018-09-11 12:08:51.840  INFO 12416 --- [    batch-0-C-1] com.viu.kafka.listen.BatchListener       : test batch listener,dataNum-3
2018-09-11 12:08:51.841  INFO 12416 --- [    batch-0-C-1] com.viu.kafka.listen.BatchListener : topic.quick.batch receive : 2018-09-11 12:08:51.841  INFO 12416 --- [    batch-0-C-1] com.viu.kafka.listen.BatchListener       : test batch listener,dataNum-11
2018-09-11 12:08:51.841  INFO 12416 --- [    batch-0-C-1] com.viu.kafka.listen.BatchListener       : test batch listener,dataNum-0
2018-09-11 12:08:51.841  INFO 12416 --- [    batch-0-C-1] com.viu.kafka.listen.BatchListener       : test batch listener,dataNum-8
2018-09-11 12:08:51.841  INFO 12416 --- [    batch-0-C-1] com.viu.kafka.listen.BatchListener       : test batch listener,dataNum-7
2018-09-11 12:08:51.841  INFO 12416 --- [    batch-0-C-1] com.viu.kafka.listen.BatchListener       : test batch listener,dataNum-4
2018-09-11 12:08:51.842  INFO 12416 --- [    batch-0-C-1] com.viu.kafka.listen.BatchListener : topic.quick.batch receive : 2018-09-11 12:08:51.842  INFO 12416 --- [    batch-0-C-1] com.viu.kafka.listen.BatchListener       : test batch listener,dataNum-1
2018-09-11 12:08:51.842  INFO 12416 --- [    batch-0-C-1] com.viu.kafka.listen.BatchListener       : test batch listener,dataNum-9
View Code

注意:設置的並發量不能大於partition的數量,如果需要提高吞吐量,可以通過增加partition的數量達到快速提升吞吐量的效果。

6. 監聽Topic中指定的分區

緊接着剛才編寫的代碼里面編寫新的監聽器,第一眼看到這代碼,媽呀,這注解這么長,哈哈哈,我也不是故意的啊。

這里使用@KafkaListener注解的topicPartitions屬性監聽不同的partition分區。

@TopicPartition:topic--需要監聽的Topic的名稱,partitions --需要監聽Topic的分區id,

partitionOffsets --可以設置從某個偏移量開始監聽

@PartitionOffset:partition --分區Id,非數組,initialOffset --初始偏移量

 @Bean public NewTopic batchWithPartitionTopic() { return new NewTopic("topic.quick.batch.partition", 8, (short) 1); } @KafkaListener(id = "batchWithPartition",clientIdPrefix = "bwp",containerFactory = "batchContainerFactory", topicPartitions = { @TopicPartition(topic = "topic.quick.batch.partition",partitions = {"1","3"}), @TopicPartition(topic = "topic.quick.batch.partition",partitions = {"0","4"}, partitionOffsets = @PartitionOffset(partition = "2",initialOffset = "100")) } ) public void batchListenerWithPartition(List<String> data) { log.info("topic.quick.batch.partition  receive : "); for (String s : data) { log.info(s); } }
View Code

其實和我們剛才寫的批量消費區別只是在注解上多了個屬性,啟動項目我們仔細搜索一下控制台輸出的日志,如果存在該日志則說明成功。

同樣的我們往這個Topic里面寫入一些數據,運行后我們可以看到控制台只監聽到一部分消息,

這是因為創建的Topic的partition數量為8,而我們只監聽了0、1、2、3、4這幾個partition,也就是說5 6 7這三個分區的消息我們並沒有讀取出來。

2018-09-11 14:39:52.045  INFO 12412 --- [Partition-4-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=bwp-4, groupId=batchWithPartition] Fetch offset 100 is out of range for partition topic.quick.batch-2, resetting offset
View Code
 @Test public void testBatch() throws InterruptedException { for (int i = 0; i < 12; i++) { kafkaTemplate.send("topic.quick.batch.partition", "test batch listener,dataNum-" + i); } }
View Code
2018-09-11 14:51:09.063  INFO 1532 --- [Partition-2-C-1] com.viu.kafka.listen.BatchListener : topic.quick.batch.partition receive : 2018-09-11 14:51:09.063  INFO 1532 --- [Partition-2-C-1] com.viu.kafka.listen.BatchListener       : test batch listener,dataNum-4
2018-09-11 14:51:09.064  INFO 1532 --- [Partition-1-C-1] com.viu.kafka.listen.BatchListener : topic.quick.batch.partition receive : 2018-09-11 14:51:09.064  INFO 1532 --- [Partition-1-C-1] com.viu.kafka.listen.BatchListener       : test batch listener,dataNum-2
2018-09-11 14:51:09.075  INFO 1532 --- [Partition-0-C-1] com.viu.kafka.listen.BatchListener : topic.quick.batch.partition receive : 2018-09-11 14:51:09.075  INFO 1532 --- [Partition-0-C-1] com.viu.kafka.listen.BatchListener       : test batch listener,dataNum-1
2018-09-11 14:51:09.078  INFO 1532 --- [Partition-1-C-1] com.viu.kafka.listen.BatchListener : topic.quick.batch.partition receive : 2018-09-11 14:51:09.078  INFO 1532 --- [Partition-1-C-1] com.viu.kafka.listen.BatchListener       : test batch listener,dataNum-10
2018-09-11 14:51:09.091  INFO 1532 --- [Partition-4-C-1] com.viu.kafka.listen.BatchListener : topic.quick.batch.partition receive : 2018-09-11 14:51:09.091  INFO 1532 --- [Partition-4-C-1] com.viu.kafka.listen.BatchListener       : test batch listener,dataNum-5
2018-09-11 14:51:09.095  INFO 1532 --- [Partition-0-C-1] com.viu.kafka.listen.BatchListener : topic.quick.batch.partition receive : 2018-09-11 14:51:09.096  INFO 1532 --- [Partition-0-C-1] com.viu.kafka.listen.BatchListener       : test batch listener,dataNum-9
2018-09-11 14:51:09.097  INFO 1532 --- [Partition-3-C-1] com.viu.kafka.listen.BatchListener : topic.quick.batch.partition receive : 2018-09-11 14:51:09.098  INFO 1532 --- [Partition-3-C-1] com.viu.kafka.listen.BatchListener       : test batch listener,dataNum-7
View Code

7. 注解方式獲取消息頭及消息體

當你接收的消息包含請求頭,以及你監聽方法需要獲取該消息非常多的字段時可以通過這種方式,畢竟get方法代碼量還是稍多點的。

這里使用的是默認的監聽容器工廠創建的,如果你想使用批量消費,把對應的類型改為List即可,比如List<String> data , List<Integer> key。

@Payload:獲取的是消息的消息體,也就是發送內容

@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):獲取發送消息的key

@Header(KafkaHeaders.RECEIVED_PARTITION_ID):獲取當前消息是從哪個分區中監聽到的

@Header(KafkaHeaders.RECEIVED_TOPIC):獲取監聽的TopicName

@Header(KafkaHeaders.RECEIVED_TIMESTAMP):獲取時間戳

@KafkaListener(id = "anno", topics = "topic.quick.anno") public void annoListener(@Payload String data, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) { log.info("topic.quick.anno receive : \n"+
            "data : "+data+"\n"+
            "key : "+key+"\n"+
            "partitionId : "+partition+"\n"+
            "topic : "+topic+"\n"+
            "timestamp : "+ts+"\n" ); }
View Code

監聽容器編寫好了,那就寫個測試方法測試一下。

啟動測試后可以看到監聽方法成功的把我們所需要的數據提取出來了,說明這段代碼也是ojbk的。

 @Test public void testAnno() throws InterruptedException { Map map = new HashMap<>(); map.put(KafkaHeaders.TOPIC, "topic.quick.anno"); map.put(KafkaHeaders.MESSAGE_KEY, 0); map.put(KafkaHeaders.PARTITION_ID, 0); map.put(KafkaHeaders.TIMESTAMP, System.currentTimeMillis()); kafkaTemplate.send(new GenericMessage<>("test anno listener", map)); }
View Code
2018-09-11 15:27:47.108  INFO 7592 --- [     anno-0-C-1] com.viu.kafka.listen.SingleListener : topic.quick.anno receive : data : test anno listener key : 0 partitionId : 0 topic : topic.quick.anno timestamp : 1536650867015
View Code

8. 使用Ack機制確認消費

Kafka的Ack機制相對於RabbitMQ的Ack機制差別比較大,剛入門Kafka的時候我也被搞蒙了,不過能弄清楚Kafka是怎么消費消息的就能理解Kafka的Ack機制了

我先說說RabbitMQ的Ack機制,RabbitMQ的消費可以說是一次性的,也就是你確認消費后就立刻從硬盤或內存中刪除,

而且RabbitMQ粗糙點來說是順序消費,像排隊一樣,一個個順序消費,未被確認的消息則會重新回到隊列中,等待監聽器再次消費。

但Kafka不同,Kafka是通過最新保存偏移量進行消息消費的,而且確認消費的消息並不會立刻刪除,所以我們可以重復的消費未被刪除的數據,

當第一條消息未被確認,而第二條消息被確認的時候,Kafka會保存第二條消息的偏移量,

也就是說第一條消息再也不會被監聽器所獲取,除非是根據第一條消息的偏移量手動獲取。

使用Kafka的Ack機制比較簡單,只需簡單的三步即可:

設置ENABLE_AUTO_COMMIT_CONFIG=false,禁止自動提交

設置AckMode=MANUAL_IMMEDIATE

監聽方法加入Acknowledgment ack 參數

怎么拒絕消息呢,只要在監聽方法中不調用ack.acknowledge()即可

@Component public class AckListener { private static final Logger log= LoggerFactory.getLogger(AckListener.class); private Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean("ackContainerFactory") public ConcurrentKafkaListenerContainerFactory ackContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps())); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps())); return factory; } @KafkaListener(id = "ack", topics = "topic.quick.ack",containerFactory = "ackContainerFactory") public void ackListener(ConsumerRecord record, Acknowledgment ack) { log.info("topic.quick.ack receive : " + record.value()); ack.acknowledge(); } }
View Code

編寫測試方法,運行后可以方法監聽方法能收到消息,緊接着注釋ack.acknowledge()方法,重新測試,

同樣你會發現監聽容器能接收到消息,這個時候如果你重啟項目還是可以看到未被確認的那幾條消息。

 @Test public void testAck() throws InterruptedException { for (int i = 0; i < 5; i++) { kafkaTemplate.send("topic.quick.ack", i+""); } }
View Code

在這段章節開頭之初我就講解了Kafka機制會出現的一些情況,導致沒辦法重復消費未被Ack的消息,解決辦法有如下:

重新將消息發送到隊列中,這種方式比較簡單而且可以使用Headers實現第幾次消費的功能,用以下次判斷

    @KafkaListener(id = "ack", topics = "topic.quick.ack", containerFactory = "ackContainerFactory") public void ackListener(ConsumerRecord record, Acknowledgment ack, Consumer consumer) { log.info("topic.quick.ack receive : " + record.value()); //如果偏移量為偶數則確認消費,否則拒絕消費
        if (record.offset() % 2 == 0) { log.info(record.offset()+"--ack"); ack.acknowledge(); } else { log.info(record.offset()+"--nack"); kafkaTemplate.send("topic.quick.ack", record.value()); } }
View Code

使用Consumer.seek方法,重新回到該未ack消息偏移量的位置重新消費,這種可能會導致死循環,

原因出現於業務一直沒辦法處理這條數據,但還是不停的重新定位到該數據的偏移量上。

    @KafkaListener(id = "ack", topics = "topic.quick.ack", containerFactory = "ackContainerFactory") public void ackListener(ConsumerRecord record, Acknowledgment ack, Consumer consumer) { log.info("topic.quick.ack receive : " + record.value()); //如果偏移量為偶數則確認消費,否則拒絕消費
        if (record.offset() % 2 == 0) { log.info(record.offset()+"--ack"); ack.acknowledge(); } else { log.info(record.offset()+"--nack"); consumer.seek(new TopicPartition("topic.quick.ack",record.partition()),record.offset() ); } }
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM