kafka2.5.0生產者與消費者配置詳解


1)引入maven依賴

我這里使用的是springboot 2.1.3.RELEASE 版本:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

會引入一對的kafka包:

 

 2)生產者配置:

所有配置參考kafka-clients-2.5.0.jar包里的org.apache.kafka.clients.producer.ProducerConfig類,並且在該類中可以查看所有配置項的默認值: CONFIG = (new ConfigDef()).define(  這里的define方法的第三個參數就是默認值

application.properties里可以這樣配置:

#####################  重要配置  ######################
spring.kafka.producer.bootstrap.servers=192.168.2.60:9092,192.168.2.62:9092
spring.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
# acks=0  如果設置為0,生產者將不等待任何來自服務器的確認。每個記錄返回的偏移量將始終設置為-1。
# acks=1  這意味着leader確認消息即可,但不等待所有副本的完全確認的情況下進行響應。在這種情況下,如果leader在確認記錄后立即失敗,但是在副本復制它之前,那么記錄將丟失。
# acks=all  不僅需要leader確認收到消息,還將等待全部的副本確認。這保證了只要至少有一個副本保持活動狀態,記錄就不會丟失。這是最有力的保證。這相當於ack =-1設置。
# acks=-1   跟集群有關
# 默認 1
spring.kafka.producer.acks=1
# 一個批次發送的大小,默認16KB,超過這個大小就會發送數據
spring.kafka.producer.batch.size=16384
# 一個批次最長等待多久就發送數據,默認0,即馬上發送
spring.kafka.producer.linger.ms=5000
# 控制生產者最大發送大小,默認 1MB。這個值必須小於kafka服務器server.properties配置文件里的最大可接收數據大小配置:socket.request.max.bytes=104857600 (默認104857600 = 100MB)
spring.kafka.producer.max.request.size=1048576

#####################  非重要配置  ######################
# 生產者內存緩沖區大小。默認33554432bytes=32MB
spring.kafka.producer.buffer.memory=33554432
# 發送重試次數,默認 2147483647,接近無限大
spring.kafka.producer.retries=3
# 請求超時時間,默認30秒
spring.kafka.producer.request.timeout.ms=30000
# 默認值5。並發狀態下,kafka生產者允許存在最大的kafka服務端未確認接收的消息個數最大值。
# 注意,如果該值設置為1,並且開啟重試機制,則會在允許的重試次數內,阻塞其他消息發送到kafka Server端。並且為1的話,會嚴重影響生產者的吞吐量。僅適用於對數據有嚴格順序要求的場景。
spring.kafka.producer.max.in.flight.requests.per.connection=5
# 最大阻塞時間,超過則拋出異常。默認60秒
spring.kafka.max.block.ms=60000
# 數據壓縮類型:none、gzip、snappy、lz4、zstd,默認none什么都不做
spring.kafka.compression.type=none
# 客戶端在進行發送和消費的時候,會緩存kafka的元數據。默認30秒
spring.kafka.producer.metadata.max.age.ms=30000

  

在springboot框架里,手動封裝kafka生產者對象,並@bean對象注入到SpringBoot容器中去:

@Configuration
@EnableKafka
public class KafkaProducerConfig {
    @Value("${kafka.producer.servers}")
    private String servers;
    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 
        DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(props);
        KafkaTemplate kafkaTemplate
                = new KafkaTemplate<String, String>(factory) ;
        //kafkaTemplate.setProducerListener();
        return kafkaTemplate;
    }
}

key和value可以自定義序列化類,參考《kafka2.5.0自定義數據序列化類

 重要知識:

如果該topic的分區大於1,那么生產者生產的數據存放到哪個分區,完全取決於key值,比如key=A,那么存到分區0,key=B,那么存到分區1,如果key為null,那么負載均衡存儲到每個分區!

再均衡監聽器:無論分區個數還是消費者個數發生變化,都會觸發再均衡,重新分配分區的消費者。如果需要自定義分區,請參考《kafka2.5.0自定義分區器

3)消費者配置:

 所有配置參考kafka-clients-2.5.0.jar包里的org.apache.kafka.clients.consumer.ConsumerConfig類,並且在該類中可以查看所有配置項的默認值: CONFIG = (new ConfigDef()).define(  這里的define方法的第三個參數就是默認值

kafka.consumer.bootstrap-servers=192.168.2.61:9092,192.168.2.61:9093
# 注意:相同的Topic下,相同的群組ID,只有一個消費者能消費到消息
#kafka.consumer.group-id=myGroupId1
# 消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下,讀取設置。
# latest: (默認)讀取最新的,earliest: 讀取最早的,none: 如果沒有為使用者的組找到偏移量,則consumer拋出異常,anything else: consumer拋出異常
kafka.consumer.auto-offset-reset=latest
# 是否自動提交偏移,默認true。偏移量自己控制,可以有效避免重復讀、漏讀
kafka.consumer.enable-auto-commit=false
# 自動提交間隔,默認5秒。從開始消費一條數據到業務結束,必須在5秒內完成,否則會造成提前提交偏移量,如果出現事務失敗,將會漏掉該條消費
#kafka.consumer.auto.commit.interval.ms=5000

# 把分區分配給消費者的策略。RangeAssignor:默認。采用大部分分區都分配給消費者群組里的群主(即消費者0)的策略。RoundRobinAssignor:采用所有消費者平均分配分區策略
# 注意:無論分區個數變化或者消費者個數變化,都會觸發再分配
kafka.consumer.partition-assignment-strategy=org.apache.kafka.clients.consumer.RangeAssignor.class
# 客戶端在進行發送和消費的時候,會緩存kafka的元數據。默認30秒
kafka.consumer.metadata-max-age-ms=30000
# consumer最小拉取多大的數據,默認值1,就是立即發送。達不到這個數據就等待。注意:這里不是根據消費數據條數,而是數據大小,這樣設計主要避免每個數據之間大小差距過大。
kafka.consumer.fetch.min.bytes=1
# consumer最多等待10秒就消費一次數據,默認500ms
kafka.consumer.fetch.max.wait.ms=10000
# 控制每次poll方法返回的記錄數量,默認500。這個配置僅僅作用於手動 poll消費的情況下,在springboot中由於使用 @KafkaListener注解消費所以基本沒用
kafka.consumer.max-poll-records=500

在springboot框架里,手動封裝kafka生產者對象,並@bean對象注入到SpringBoot容器中去:

 先定義pojo 類:

@Component
@ConfigurationProperties(prefix = "kafka.consumer")
public class KafkaConsumerConfigModel {
    
     // 這里就是一個簡單的pojo類,定義application.properties配置文件的kafka.consumer開頭的所有字段.
     private String bootstrapServers;
     ......
     
     // getter and setter

}

再定義kafka consumer 工廠類:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.util.CollectionUtils;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {
    private Logger logger = LoggerFactory.getLogger(KafkaProducerConfig.class);

    @Autowired
    KafkaConsumerConfigModel config;

    @Bean("consumerFactory")
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Arrays.asList( RangeAssignor.class));
        propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, config.getFetchMinBytes());
        propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, config.getFetchMaxWaitMs());
        propsMap.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, config.getMetadataMaxAgeMs());
        ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(propsMap);
        return factory;
    }

    @Bean("kafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    getKafkaListenerContainerFactory(
            @Autowired ConsumerFactory<String, String> consumerFactory
    ) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory
                = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(1);
        factory.getContainerProperties().setPollTimeout(1500);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
//        factory.createContainer( new TopicPartitionOffset(Constant.TOPIC, 0));
        return factory;
    }
}

最后kafka consumer消費者長這樣:

import com.joyce.kafka.Constant;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

    private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

    // 相同的groupId的消費者只能有一個接收到消息
    @KafkaListener(groupId="mygroup-1",topics = Constant.TOPIC )
    public void listen1(String data) {
        logger.info("消費到消息1: [{}]", data);
    }

    @KafkaListener(groupId="mygroup-2",topics =  Constant.TOPIC)
    public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) {
        logger.info("消費到消息2: [{}]", record.value());
        logger.info("消費到消息2|"+String.format(
                "主題:%s,分區:%d,偏移量:%d,key:%s,value:%s",
                record.topic(),record.partition(),record.offset(),
                record.key(),record.value()));//提交offset
        ack.acknowledge();
    }

    @KafkaListener(groupId="mygroup-3", topics =  Constant.TOPIC)
    public void test(String data, Acknowledgment ack) { // ConsumerRecord<String, String> record
        logger.info("消費到消息3: [{}]", data);
        //提交offset
        ack.acknowledge();
    }

}

 

end.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM