Spring Boot 自定義kafka 消費者配置 ContainerFactory最佳實踐


Spring Boot 自定義kafka 消費者配置 ContainerFactory最佳實踐

本篇博文主要提供一個在 SpringBoot 中自定義 kafka配置的實踐,想象這樣一個場景:你的系統需要監聽多個不同集群的消息,在不同的集群中topic沖突了,所以你需要分別定義kafka消息配置。

此篇文章會在SpringBoot 提供的默認模板上提供擴展,不會因為你自定義了消費者配置,而導致原生SpringBoot的Kakfa模板配置失效。

引入 MAVEN 依賴

版本需要你自己指定

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>xxx</version>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>xxx</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>xxx</version>
</dependency>

引入Java配置類

/**
 * 手動自定義 kafka 消費者 ContainerFactory 配置demo
 */
@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConsumerConfig {

    @Autowired
    private KafkaProperties properties;

    @Value("${監聽服務地址}")
    private List<String> myServers;

    @Bean("myKafkaContainerFactory")
    @ConditionalOnBean(ConcurrentKafkaListenerContainerFactoryConfigurer.class)
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, consumerFactory());
        return factory;
    }

    //獲得創建消費者工廠
    public ConsumerFactory<Object, Object> consumerFactory() {
        KafkaProperties myKafkaProperties = JSON.parseObject(JSON.toJSONString(this.properties), KafkaProperties.class);
        //對模板 properties 進行定制化
        //....
        //例如:定制servers
        myKafkaProperties.setBootstrapServers(myServers);
        return new DefaultKafkaConsumerFactory<>(myKafkaProperties.buildConsumerProperties());
    }

}

yml模板

#kafka配置,更多配置請參考:KafkaProperties
spring.kafka:
  #公共參數,其他的timeout.ms, request.timeout.ms, metadata.fetch.timeout.ms保持默認值
  properties:
    #這個參數指定producer在發送批量消息前等待的時間,當設置此參數后,即便沒有達到批量消息的指定大小(batch-size),到達時間后生產者也會發送批量消息到broker。默認情況下,生產者的發送消息線程只要空閑了就會發送消息,即便只有一條消息。設置這個參數后,發送線程會等待一定的時間,這樣可以批量發送消息增加吞吐量,但同時也會增加延遲。
    linger.ms: 50 #默認值:0毫秒,當消息發送比較頻繁時,增加一些延遲可增加吞吐量和性能。
    #這個參數指定producer在一個TCP connection可同時發送多少條消息到broker並且等待broker響應,設置此參數較高的值可以提高吞吐量,但同時也會增加內存消耗。另外,如果設置過高反而會降低吞吐量,因為批量消息效率降低。設置為1,可以保證發送到broker的順序和調用send方法順序一致,即便出現失敗重試的情況也是如此。
    #注意:當前消息符合at-least-once,自kafka1.0.0以后,為保證消息有序以及exactly once,這個配置可適當調大為5。
    max.in.flight.requests.per.connection: 1 #默認值:5,設置為1即表示producer在connection上發送一條消息,至少要等到這條消息被broker確認收到才繼續發送下一條,因此是有序的。

  #生產者的配置,可參考org.apache.kafka.clients.producer.ProducerConfig
  producer:
    #這個參數可以是任意字符串,它是broker用來識別消息是來自哪個客戶端的。在broker進行打印日志、衡量指標或者配額限制時會用到。
    clientId: ${spring.application.name} #方便kafkaserver打印日志定位請求來源
    bootstrap-servers: 127.0.0.1:8080 #kafka服務器地址,多個以逗號隔開
    #acks=0:生產者把消息發送到broker即認為成功,不等待broker的處理結果。這種方式的吞吐最高,但也是最容易丟失消息的。
    #acks=1:生產者會在該分區的leader寫入消息並返回成功后,認為消息發送成功。如果群首寫入消息失敗,生產者會收到錯誤響應並進行重試。這種方式能夠一定程度避免消息丟失,但如果leader宕機時該消息沒有復制到其他副本,那么該消息還是會丟失。另外,如果我們使用同步方式來發送,延遲會比前一種方式大大增加(至少增加一個網絡往返時間);如果使用異步方式,應用感知不到延遲,吞吐量則會受異步正在發送中的數量限制。
    #acks=all:生產者會等待所有副本成功寫入該消息,這種方式是最安全的,能夠保證消息不丟失,但是延遲也是最大的。
    #如果是發送日志之類的,允許部分丟失,可指定acks=0,如果想不丟失消息,可配置為all,但需密切關注性能和吞吐量。
    acks: all #默認值:1
    #當生產者發送消息收到一個可恢復異常時,會進行重試,這個參數指定了重試的次數。在實際情況中,這個參數需要結合retry.backoff.ms(重試等待間隔)來使用,建議總的重試時間比集群重新選舉leader的時間長,這樣可以避免生產者過早結束重試導致失敗。
    #另外需注意,當開啟重試時,若未設置max.in.flight.requests.per.connection=1,則可能出現發往同一個分區的兩批消息的順序出錯,比如,第一批發送失敗了,第二批成功了,然后第一批重試成功了,此時兩者的順序就顛倒了。
    retries: 2  #發送失敗時重試多少次,0=禁用重試(默認值)
    #默認情況下消息是不壓縮的,此參數可指定采用何種算法壓縮消息,可取值:none,snappy,gzip,lz4。snappy壓縮算法由Google研發,這種算法在性能和壓縮比取得比較好的平衡;相比之下,gzip消耗更多的CPU資源,但是壓縮效果也是最好的。通過使用壓縮,我們可以節省網絡帶寬和Kafka存儲成本。
    compressionType: "none" #如果不開啟壓縮,可設置為none(默認值),比較大的消息可開啟。
    #當多條消息發送到一個分區時,Producer會進行批量發送,這個參數指定了批量消息大小的上限(以字節為單位)。當批量消息達到這個大小時,Producer會一起發送到broker;但即使沒有達到這個大小,生產者也會有定時機制來發送消息,避免消息延遲過大。
    batch-size: 16384 #默認16K,值越小延遲越低,但是吞吐量和性能會降低。0=禁用批量發送
    #這個參數設置Producer暫存待發送消息的緩沖區內存的大小,如果應用調用send方法的速度大於Producer發送的速度,那么調用會阻塞一定(max.block.ms)時間后拋出異常。
    buffer-memory: 33554432 #緩沖區默認大小32M
  #消費者的配置,可參考:org.apache.kafka.clients.consumer.ConsumerConfig
  consumer:
    #這個參數可以為任意值,用來指明消息從哪個客戶端發出,一般會在打印日志、衡量指標、分配配額時使用。
    #暫不用提供clientId,2.x版本可放出來,1.x有多個topic且concurrency>1會出現JMX注冊時異常
    #clientId: ${spring.application.name} #方便kafkaserver打印日志定位請求來源
    # 簽中kafka集群
    bootstrap-servers: 127.0.0.1:8080 #kafka服務器地址,多個以逗號隔開
    #這個參數指定了當消費者第一次讀取分區或者無offset時拉取那個位置的消息,可以取值為latest(從最新的消息開始消費),earliest(從最老的消息開始消費),none(如果無offset就拋出異常)
    autoOffsetReset: latest #默認值:latest
    #這個參數指定了消費者是否自動提交消費位移,默認為true。如果需要減少重復消費或者數據丟失,你可以設置為false,然后手動提交。如果為true,你可能需要關注自動提交的時間間隔,該間隔由auto.commit.interval.ms設置。
    enable-auto-commit: false
    #周期性自動提交的間隔,單位毫秒
    auto-commit-interval: 2000 #默認值:5000
    #這個參數允許消費者指定從broker讀取消息時最小的Payload的字節數。當消費者從broker讀取消息時,如果數據字節數小於這個閾值,broker會等待直到有足夠的數據,然后才返回給消費者。對於寫入量不高的主題來說,這個參數可以減少broker和消費者的壓力,因為減少了往返的時間。而對於有大量消費者的主題來說,則可以明顯減輕broker壓力。
    fetchMinSize: 1 #默認值: 1
    #上面的fetch.min.bytes參數指定了消費者讀取的最小數據量,而這個參數則指定了消費者讀取時最長等待時間,從而避免長時間阻塞。這個參數默認為500ms。
    fetchMaxWait: 500 #默認值:500毫秒
    #這個參數控制一個poll()調用返回的記錄數,即consumer每次批量拉多少條數據。
    maxPollRecords: 500 #默認值:500
  listener:
    #創建多少個consumer,值必須小於等於Kafk Topic的分區數。
    ack-mode: MANUAL_IMMEDIATE
    concurrency: 1  #推薦設置為topic的分區數

配置釋義

點開 KafkaProperties 這個類,可以看到這個是SpringBoot 自動配置kafka的配置類,引入這個實例,就相當於你拿到了SpringBoot kafka配置模板的參數,就是上述貼的配置,然后再此基礎上重新定義你需要改變的配置,這里主要講消費者配置。

代碼中舉了個重寫監聽servers的例子:

//例如:定制servers
myKafkaProperties.setBootstrapServers(myServers);

@KafkaListener 使用 containerFactory

@Slf4j
@Component
public class ConsumerDemo {

    //聲明consumerID為demo,監聽topicName為topic.quick.demo的Topic
    //這個消費者的 containerFactory 是SpringBoot 提供的 kafkaListenerContainerFactory 這個bean
    @KafkaListener(id = "demo", topics = "topic.quick.demo")
    public void listen(String msgData) {
        log.info("demo receive : " + msgData);
    }

    @KafkaListener(topics = "k010", containerFactory = "myKafkaContainerFactory")
    public void listen(String msgData, Acknowledgment ack) {
        log.info("demo receive : " + msgData);
        //手動提交
        //enable.auto.commit參數設置成false。那么就是Spring來替為我們做人工提交,從而簡化了人工提交的方式。
        //所以kafka和springboot結合中的enable.auto.commit為false為spring的人工提交模式。
        //enable.auto.commit為true是采用kafka的默認提交模式。
        ack.acknowledge();
    }
}

如果在@KafkaListener屬性中沒有指定 containerFactory 那么Spring Boot 會默認注入 name 為“kafkaListenerContainerFactory” 的 containerFactory。具體源碼可跟蹤:KafkaListenerAnnotationBeanPostProcessor中的常量:

public static final String DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "kafkaListenerContainerFactory";


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM