SpringKafka——消息監聽


 

前言

Spring-Kafka中消息監聽大致分為兩種類型,一種是單條數據消費,一種是批量消費;兩者的區別只是在於監聽器一次性獲取消息的數量。
GenericMessageListener是我們實現消息監聽的一個接口,向上擴展的接口有非常多,
比如:單數據消費的MessageListener、批量消費的BatchMessageListener、還有具備ACK機制的AcknowledgingMessageListener和BatchAcknowledgingMessageListener等等。
 
 

GenericMessageListener

這里可以看到GenericMessageListener使用注解標明這是一個函數式接口,默認實現了三種不同參數的onMessage方法。
data就是我們需要接收的數據,Consumer則是消費者類,Acknowledgment則是用來實現Ack機制的類。
這里需要注意一下的是,Consumer對象並不是線程安全的。
@FunctionalInterface
public interface GenericMessageListener<T> {
    void onMessage(T var1);

    default void onMessage(T data, Acknowledgment acknowledgment) {
        throw new UnsupportedOperationException("Container should never call this");
    }

    default void onMessage(T data, Consumer<?, ?> consumer) {
        throw new UnsupportedOperationException("Container should never call this");
    }

    default void onMessage(T data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
        throw new UnsupportedOperationException("Container should never call this");
    }
}

接下來先瀏覽一下繼承了GenericMessageListener接口的類。前綴為Batch的接口都是批處理類型的消息監聽接口,里面的參數也都講解過了

public interface MessageListener<K, V> {
    void onMessage(ConsumerRecord<K, V> data);
}

public interface AcknowledgingMessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}

public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
    void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}

public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}

public interface BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data);
}

public interface BatchAcknowledgingMessageListener<K, V> {
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}

public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}

public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}

 

@KafkaListener參數講解

@KafkaListener(id = "demo", topics = "topic.quick.demo")
public void listen(String msgData) {
    log.info("demo receive : "+msgData);
}
使用@KafkaListener這個注解並不局限於這個監聽容器是單條數據消費還是批量消費,區分單數據還是多數據消費只需要配置一下注解的containerFactory屬性即可,先講解一下這個監聽方法都能接收寫什么參數吧。
  • data : 對於data值的類型其實並沒有限定,根據KafkaTemplate所定義的類型來決定。data為List集合的則是用作批量消費。
  • ConsumerRecord:具體消費數據類,包含Headers信息、分區信息、時間戳等
  • Acknowledgment:用作Ack機制的接口
  • Consumer:消費者類,使用該類我們可以手動提交偏移量、控制消費速率等功能
public void listen1(String data) 

public void listen2(ConsumerRecord<K,V> data) 

public void listen3(ConsumerRecord<K,V> data, Acknowledgment acknowledgment) 

public void listen4(ConsumerRecord<K,V> data, Acknowledgment acknowledgment, Consumer<K,V> consumer) 

public void listen5(List<String> data) 

public void listen6(List<ConsumerRecord<K,V>> data) 

public void listen7(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment) 

public void listen8(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment, Consumer<K,V> consumer)

接下來在看看@KafkaListener的注解都提供了什么屬性。

  • id:消費者的id,當GroupId沒有被配置的時候,默認id為GroupId
  • containerFactory:上面提到了@KafkaListener區分單數據還是多數據消費只需要配置一下注解的containerFactory屬性就可以了,這里面配置的是監聽容器工廠,也就是ConcurrentKafkaListenerContainerFactory,配置BeanName
  • topics:需要監聽的Topic,可監聽多個
  • topicPartitions:可配置更加詳細的監聽信息,必須監聽某個Topic中的指定分區,或者從offset為200的偏移量開始監聽
  • errorHandler:監聽異常處理器,配置BeanName
  • groupId:消費組ID
  • idIsGroup:id是否為GroupId
  • clientIdPrefix:消費者Id前綴
  • beanRef:真實監聽容器的BeanName,需要在 BeanName前加 "__"
public @interface KafkaListener {
    String id() default "";

    String containerFactory() default "";

    String[] topics() default {};

    String topicPattern() default "";

    TopicPartition[] topicPartitions() default {};

    String containerGroup() default "";

    String errorHandler() default "";

    String groupId() default "";

    boolean idIsGroup() default true;

    String clientIdPrefix() default "";

    String beanRef() default "__listener";
}

 

使用ConsumerRecord類消費

用ConsumerRecord類接收的好處是什么呢,ConsumerRecord類里面包含分區信息、消息頭、消息體等內容,如果業務需要獲取這些參數時,使用ConsumerRecord會是個不錯的選擇。

如果使用具體的類型接收消息體則更加方便,比如說用String類型去接收消息體。

這里我們編寫一個consumerListener方法,監聽"topic.quick.consumer" Topic,並把ConsumerRecord里面所包含的內容打印到控制台中

@Component
public class SingleListener {

    private static final Logger log = LoggerFactory.getLogger(SingleListener.class);

    @KafkaListener(id = "consumer", topics = "topic.quick.consumer")
    public void consumerListener(ConsumerRecord<Integer, String> record) {
        log.info("topic.quick.consumer receive : " + record.toString());
    }
}

 

批量消費

  • 重新創建一份新的消費者配置,配置為一次拉取5條消息
  • 創建一個監聽容器工廠,設置其為批量消費並設置並發量為5,這個並發量根據分區數決定,必須小於等於分區數,否則會有線程一直處於空閑狀態
  • 創建一個分區數為8的Topic
  • 創建監聽方法,設置消費id為batch,clientID前綴為batch,監聽topic.quick.batch,使用batchContainerFactory工廠創建該監聽容器
@Component
public class BatchListener {

    private static final Logger log= LoggerFactory.getLogger(BatchListener.class);

    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        //一次拉取消息數量
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean("batchContainerFactory")
    public ConcurrentKafkaListenerContainerFactory listenerContainer() {
        ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
        //設置並發量,小於或等於Topic的分區數
        container.setConcurrency(5);
        //設置為批量監聽
        container.setBatchListener(true);
        return container;
    }

    @Bean
    public NewTopic batchTopic() {
        return new NewTopic("topic.quick.batch", 8, (short) 1);
    }


    @KafkaListener(id = "batch",clientIdPrefix = "batch",topics = {"topic.quick.batch"},containerFactory = "batchContainerFactory")
    public void batchListener(List<String> data) {
        log.info("topic.quick.batch  receive : ");
        for (String s : data) {
            log.info(  s);
        }
    }

}

注意:設置的並發量不能大於partition的數量,如果需要提高吞吐量,可以通過增加partition的數量達到快速提升吞吐量的效果。

 

注解方式獲取消息頭及消息體

當你接收的消息包含請求頭,以及你監聽方法需要獲取該消息非常多的字段時可以通過這種方式,畢竟get方法代碼量還是稍多點的。這里使用的是默認的監聽容器工廠創建的,如果你想使用批量消費,把對應的類型改為List即可,比如List<String> data , List<Integer> key。

  • @Payload:獲取的是消息的消息體,也就是發送內容
  • @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):獲取發送消息的key
  • @Header(KafkaHeaders.RECEIVED_PARTITION_ID):獲取當前消息是從哪個分區中監聽到的
  • @Header(KafkaHeaders.RECEIVED_TOPIC):獲取監聽的TopicName
  • @Header(KafkaHeaders.RECEIVED_TIMESTAMP):獲取時間戳
    @KafkaListener(id = "anno", topics = "topic.quick.anno")
    public void annoListener(@Payload String data,
                             @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
                             @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                             @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                             @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
        log.info("topic.quick.anno receive : \n"+
            "data : "+data+"\n"+
            "key : "+key+"\n"+
            "partitionId : "+partition+"\n"+
            "topic : "+topic+"\n"+
            "timestamp : "+ts+"\n"
        );

    }

 

使用Ack機制確認消費

Kafka的Ack機制相對於RabbitMQ的Ack機制差別比較大,剛入門Kafka的時候我也被搞蒙了,不過能弄清楚Kafka是怎么消費消息的就能理解Kafka的Ack機制了。

我先說說RabbitMQ的Ack機制,RabbitMQ的消費可以說是一次性的,也就是你確認消費后就立刻從硬盤或內存中刪除,

而且RabbitMQ粗糙點來說是順序消費,像排隊一樣,一個個順序消費,未被確認的消息則會重新回到隊列中,等待監聽器再次消費。

但Kafka不同,Kafka是通過最新保存偏移量進行消息消費的,而且確認消費的消息並不會立刻刪除,所以我們可以重復的消費未被刪除的數據,

當第一條消息未被確認,而第二條消息被確認的時候,Kafka會保存第二條消息的偏移量,也就是說第一條消息再也不會被監聽器所獲取,除非是根據第一條消息的偏移量手動獲取。

使用Kafka的Ack機制比較簡單,只需簡單的三步即可:

  1. 設置ENABLE_AUTO_COMMIT_CONFIG=false,禁止自動提交
  2. 設置AckMode=MANUAL_IMMEDIATE
  3. 監聽方法加入Acknowledgment ack 參數

怎么拒絕消息呢,只要在監聽方法中不調用ack.acknowledge()即可

@Component
public class AckListener {

    private static final Logger log= LoggerFactory.getLogger(AckListener.class);

    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean("ackContainerFactory")
    public ConcurrentKafkaListenerContainerFactory ackContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
        return factory;
    }


    @KafkaListener(id = "ack", topics = "topic.quick.ack",containerFactory = "ackContainerFactory")
    public void ackListener(ConsumerRecord record, Acknowledgment ack) {
        log.info("topic.quick.ack receive : " + record.value());
        ack.acknowledge();
    }
}

 

重復消費未被Ack的消息

在這段章節開頭之初我就講解了Kafka機制會出現的一些情況,導致沒辦法重復消費未被Ack的消息,解決辦法有如下:

1、重新將消息發送到隊列中,這種方式比較簡單而且可以使用Headers實現第幾次消費的功能,用以下次判斷

    @KafkaListener(id = "ack", topics = "topic.quick.ack", containerFactory = "ackContainerFactory")
    public void ackListener(ConsumerRecord record, Acknowledgment ack, Consumer consumer) {
        log.info("topic.quick.ack receive : " + record.value());

        //如果偏移量為偶數則確認消費,否則拒絕消費
        if (record.offset() % 2 == 0) {
            log.info(record.offset()+"--ack");
            ack.acknowledge();
        } else {
            log.info(record.offset()+"--nack");
            kafkaTemplate.send("topic.quick.ack", record.value());
        }
    }

2、使用Consumer.seek方法,重新回到該未ack消息偏移量的位置重新消費,這種可能會導致死循環,原因出現於業務一直沒辦法處理這條數據,但還是不停的重新定位到該數據的偏移量上。

    @KafkaListener(id = "ack", topics = "topic.quick.ack", containerFactory = "ackContainerFactory")
    public void ackListener(ConsumerRecord record, Acknowledgment ack, Consumer consumer) {
        log.info("topic.quick.ack receive : " + record.value());

        //如果偏移量為偶數則確認消費,否則拒絕消費
        if (record.offset() % 2 == 0) {
            log.info(record.offset()+"--ack");
            ack.acknowledge();
        } else {
            log.info(record.offset()+"--nack");
            consumer.seek(new TopicPartition("topic.quick.ack",record.partition()),record.offset() );
        }
    }

 



引用:

https://www.jianshu.com/p/a64defb44a23

https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#message-listeners


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM