前言
GenericMessageListener
@FunctionalInterface public interface GenericMessageListener<T> { void onMessage(T var1); default void onMessage(T data, Acknowledgment acknowledgment) { throw new UnsupportedOperationException("Container should never call this"); } default void onMessage(T data, Consumer<?, ?> consumer) { throw new UnsupportedOperationException("Container should never call this"); } default void onMessage(T data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) { throw new UnsupportedOperationException("Container should never call this"); } }
接下來先瀏覽一下繼承了GenericMessageListener接口的類。前綴為Batch的接口都是批處理類型的消息監聽接口,里面的參數也都講解過了
public interface MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data); } public interface AcknowledgingMessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment); } public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer); } public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); } public interface BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data); } public interface BatchAcknowledgingMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment); } public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer); } public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); }
@KafkaListener參數講解
@KafkaListener(id = "demo", topics = "topic.quick.demo") public void listen(String msgData) { log.info("demo receive : "+msgData); }
- data : 對於data值的類型其實並沒有限定,根據KafkaTemplate所定義的類型來決定。data為List集合的則是用作批量消費。
- ConsumerRecord:具體消費數據類,包含Headers信息、分區信息、時間戳等
- Acknowledgment:用作Ack機制的接口
- Consumer:消費者類,使用該類我們可以手動提交偏移量、控制消費速率等功能
public void listen1(String data) public void listen2(ConsumerRecord<K,V> data) public void listen3(ConsumerRecord<K,V> data, Acknowledgment acknowledgment) public void listen4(ConsumerRecord<K,V> data, Acknowledgment acknowledgment, Consumer<K,V> consumer) public void listen5(List<String> data) public void listen6(List<ConsumerRecord<K,V>> data) public void listen7(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment) public void listen8(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment, Consumer<K,V> consumer)
接下來在看看@KafkaListener的注解都提供了什么屬性。
- id:消費者的id,當GroupId沒有被配置的時候,默認id為GroupId
- containerFactory:上面提到了@KafkaListener區分單數據還是多數據消費只需要配置一下注解的containerFactory屬性就可以了,這里面配置的是監聽容器工廠,也就是ConcurrentKafkaListenerContainerFactory,配置BeanName
- topics:需要監聽的Topic,可監聽多個
- topicPartitions:可配置更加詳細的監聽信息,必須監聽某個Topic中的指定分區,或者從offset為200的偏移量開始監聽
- errorHandler:監聽異常處理器,配置BeanName
- groupId:消費組ID
- idIsGroup:id是否為GroupId
- clientIdPrefix:消費者Id前綴
- beanRef:真實監聽容器的BeanName,需要在 BeanName前加 "__"
public @interface KafkaListener { String id() default ""; String containerFactory() default ""; String[] topics() default {}; String topicPattern() default ""; TopicPartition[] topicPartitions() default {}; String containerGroup() default ""; String errorHandler() default ""; String groupId() default ""; boolean idIsGroup() default true; String clientIdPrefix() default ""; String beanRef() default "__listener"; }
使用ConsumerRecord類消費
用ConsumerRecord類接收的好處是什么呢,ConsumerRecord類里面包含分區信息、消息頭、消息體等內容,如果業務需要獲取這些參數時,使用ConsumerRecord會是個不錯的選擇。
如果使用具體的類型接收消息體則更加方便,比如說用String類型去接收消息體。
這里我們編寫一個consumerListener方法,監聽"topic.quick.consumer" Topic,並把ConsumerRecord里面所包含的內容打印到控制台中
@Component public class SingleListener { private static final Logger log = LoggerFactory.getLogger(SingleListener.class); @KafkaListener(id = "consumer", topics = "topic.quick.consumer") public void consumerListener(ConsumerRecord<Integer, String> record) { log.info("topic.quick.consumer receive : " + record.toString()); } }
批量消費
- 重新創建一份新的消費者配置,配置為一次拉取5條消息
- 創建一個監聽容器工廠,設置其為批量消費並設置並發量為5,這個並發量根據分區數決定,必須小於等於分區數,否則會有線程一直處於空閑狀態
- 創建一個分區數為8的Topic
- 創建監聽方法,設置消費id為batch,clientID前綴為batch,監聽topic.quick.batch,使用batchContainerFactory工廠創建該監聽容器
@Component public class BatchListener { private static final Logger log= LoggerFactory.getLogger(BatchListener.class); private Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); //一次拉取消息數量 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean("batchContainerFactory") public ConcurrentKafkaListenerContainerFactory listenerContainer() { ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory(); container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps())); //設置並發量,小於或等於Topic的分區數 container.setConcurrency(5); //設置為批量監聽 container.setBatchListener(true); return container; } @Bean public NewTopic batchTopic() { return new NewTopic("topic.quick.batch", 8, (short) 1); } @KafkaListener(id = "batch",clientIdPrefix = "batch",topics = {"topic.quick.batch"},containerFactory = "batchContainerFactory") public void batchListener(List<String> data) { log.info("topic.quick.batch receive : "); for (String s : data) { log.info( s); } } }
注意:設置的並發量不能大於partition的數量,如果需要提高吞吐量,可以通過增加partition的數量達到快速提升吞吐量的效果。
注解方式獲取消息頭及消息體
當你接收的消息包含請求頭,以及你監聽方法需要獲取該消息非常多的字段時可以通過這種方式,畢竟get方法代碼量還是稍多點的。這里使用的是默認的監聽容器工廠創建的,如果你想使用批量消費,把對應的類型改為List即可,比如List<String> data , List<Integer> key。
- @Payload:獲取的是消息的消息體,也就是發送內容
- @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):獲取發送消息的key
- @Header(KafkaHeaders.RECEIVED_PARTITION_ID):獲取當前消息是從哪個分區中監聽到的
- @Header(KafkaHeaders.RECEIVED_TOPIC):獲取監聽的TopicName
- @Header(KafkaHeaders.RECEIVED_TIMESTAMP):獲取時間戳
@KafkaListener(id = "anno", topics = "topic.quick.anno") public void annoListener(@Payload String data, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) { log.info("topic.quick.anno receive : \n"+ "data : "+data+"\n"+ "key : "+key+"\n"+ "partitionId : "+partition+"\n"+ "topic : "+topic+"\n"+ "timestamp : "+ts+"\n" ); }
使用Ack機制確認消費
Kafka的Ack機制相對於RabbitMQ的Ack機制差別比較大,剛入門Kafka的時候我也被搞蒙了,不過能弄清楚Kafka是怎么消費消息的就能理解Kafka的Ack機制了。
我先說說RabbitMQ的Ack機制,RabbitMQ的消費可以說是一次性的,也就是你確認消費后就立刻從硬盤或內存中刪除,
而且RabbitMQ粗糙點來說是順序消費,像排隊一樣,一個個順序消費,未被確認的消息則會重新回到隊列中,等待監聽器再次消費。
但Kafka不同,Kafka是通過最新保存偏移量進行消息消費的,而且確認消費的消息並不會立刻刪除,所以我們可以重復的消費未被刪除的數據,
當第一條消息未被確認,而第二條消息被確認的時候,Kafka會保存第二條消息的偏移量,也就是說第一條消息再也不會被監聽器所獲取,除非是根據第一條消息的偏移量手動獲取。
使用Kafka的Ack機制比較簡單,只需簡單的三步即可:
- 設置ENABLE_AUTO_COMMIT_CONFIG=false,禁止自動提交
- 設置AckMode=MANUAL_IMMEDIATE
- 監聽方法加入Acknowledgment ack 參數
怎么拒絕消息呢,只要在監聽方法中不調用ack.acknowledge()即可
@Component public class AckListener { private static final Logger log= LoggerFactory.getLogger(AckListener.class); private Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean("ackContainerFactory") public ConcurrentKafkaListenerContainerFactory ackContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps())); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps())); return factory; } @KafkaListener(id = "ack", topics = "topic.quick.ack",containerFactory = "ackContainerFactory") public void ackListener(ConsumerRecord record, Acknowledgment ack) { log.info("topic.quick.ack receive : " + record.value()); ack.acknowledge(); } }
重復消費未被Ack的消息
在這段章節開頭之初我就講解了Kafka機制會出現的一些情況,導致沒辦法重復消費未被Ack的消息,解決辦法有如下:
1、重新將消息發送到隊列中,這種方式比較簡單而且可以使用Headers實現第幾次消費的功能,用以下次判斷
@KafkaListener(id = "ack", topics = "topic.quick.ack", containerFactory = "ackContainerFactory") public void ackListener(ConsumerRecord record, Acknowledgment ack, Consumer consumer) { log.info("topic.quick.ack receive : " + record.value()); //如果偏移量為偶數則確認消費,否則拒絕消費 if (record.offset() % 2 == 0) { log.info(record.offset()+"--ack"); ack.acknowledge(); } else { log.info(record.offset()+"--nack"); kafkaTemplate.send("topic.quick.ack", record.value()); } }
2、使用Consumer.seek方法,重新回到該未ack消息偏移量的位置重新消費,這種可能會導致死循環,原因出現於業務一直沒辦法處理這條數據,但還是不停的重新定位到該數據的偏移量上。
@KafkaListener(id = "ack", topics = "topic.quick.ack", containerFactory = "ackContainerFactory") public void ackListener(ConsumerRecord record, Acknowledgment ack, Consumer consumer) { log.info("topic.quick.ack receive : " + record.value()); //如果偏移量為偶數則確認消費,否則拒絕消費 if (record.offset() % 2 == 0) { log.info(record.offset()+"--ack"); ack.acknowledge(); } else { log.info(record.offset()+"--nack"); consumer.seek(new TopicPartition("topic.quick.ack",record.partition()),record.offset() ); } }
引用:
https://www.jianshu.com/p/a64defb44a23