1)引入maven依賴
我這里使用的是springboot 2.1.3.RELEASE 版本:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
會引入一對的kafka包:
2)生產者配置:
所有配置參考kafka-clients-2.5.0.jar包里的org.apache.kafka.clients.producer.ProducerConfig類,並且在該類中可以查看所有配置項的默認值: CONFIG = (new ConfigDef()).define( 這里的define方法的第三個參數就是默認值
application.properties里可以這樣配置:
##################### 重要配置 ###################### spring.kafka.producer.bootstrap.servers=192.168.2.60:9092,192.168.2.62:9092 spring.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer # acks=0 如果設置為0,生產者將不等待任何來自服務器的確認。每個記錄返回的偏移量將始終設置為-1。 # acks=1 這意味着leader確認消息即可,但不等待所有副本的完全確認的情況下進行響應。在這種情況下,如果leader在確認記錄后立即失敗,但是在副本復制它之前,那么記錄將丟失。 # acks=all 不僅需要leader確認收到消息,還將等待全部的副本確認。這保證了只要至少有一個副本保持活動狀態,記錄就不會丟失。這是最有力的保證。這相當於ack =-1設置。 # acks=-1 跟集群有關 # 默認 1 spring.kafka.producer.acks=1 # 一個批次發送的大小,默認16KB,超過這個大小就會發送數據 spring.kafka.producer.batch.size=16384 # 一個批次最長等待多久就發送數據,默認0,即馬上發送 spring.kafka.producer.linger.ms=5000 # 控制生產者最大發送大小,默認 1MB。這個值必須小於kafka服務器server.properties配置文件里的最大可接收數據大小配置:socket.request.max.bytes=104857600 (默認104857600 = 100MB) spring.kafka.producer.max.request.size=1048576 ##################### 非重要配置 ###################### # 生產者內存緩沖區大小。默認33554432bytes=32MB spring.kafka.producer.buffer.memory=33554432 # 發送重試次數,默認 2147483647,接近無限大 spring.kafka.producer.retries=3 # 請求超時時間,默認30秒 spring.kafka.producer.request.timeout.ms=30000 # 默認值5。並發狀態下,kafka生產者允許存在最大的kafka服務端未確認接收的消息個數最大值。 # 注意,如果該值設置為1,並且開啟重試機制,則會在允許的重試次數內,阻塞其他消息發送到kafka Server端。並且為1的話,會嚴重影響生產者的吞吐量。僅適用於對數據有嚴格順序要求的場景。 spring.kafka.producer.max.in.flight.requests.per.connection=5 # 最大阻塞時間,超過則拋出異常。默認60秒 spring.kafka.max.block.ms=60000 # 數據壓縮類型:none、gzip、snappy、lz4、zstd,默認none什么都不做 spring.kafka.compression.type=none # 客戶端在進行發送和消費的時候,會緩存kafka的元數據。默認30秒 spring.kafka.producer.metadata.max.age.ms=30000
在springboot框架里,手動封裝kafka生產者對象,並@bean對象注入到SpringBoot容器中去:
@Configuration @EnableKafka public class KafkaProducerConfig { @Value("${kafka.producer.servers}") private String servers; @Value("${kafka.producer.retries}") private int retries; @Value("${kafka.producer.batch.size}") private int batchSize; @Value("${kafka.producer.linger}") private int linger; @Value("${kafka.producer.buffer.memory}") private int bufferMemory; @Bean public KafkaTemplate<String, String> kafkaTemplate() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(props); KafkaTemplate kafkaTemplate = new KafkaTemplate<String, String>(factory) ; //kafkaTemplate.setProducerListener(); return kafkaTemplate; } }
key和value可以自定義序列化類,參考《kafka2.5.0自定義數據序列化類》
重要知識:
如果該topic的分區大於1,那么生產者生產的數據存放到哪個分區,完全取決於key值,比如key=A,那么存到分區0,key=B,那么存到分區1,如果key為null,那么負載均衡存儲到每個分區!
再均衡監聽器:無論分區個數還是消費者個數發生變化,都會觸發再均衡,重新分配分區的消費者。如果需要自定義分區,請參考《kafka2.5.0自定義分區器》
3)消費者配置:
所有配置參考kafka-clients-2.5.0.jar包里的org.apache.kafka.clients.consumer.ConsumerConfig類,並且在該類中可以查看所有配置項的默認值: CONFIG = (new ConfigDef()).define( 這里的define方法的第三個參數就是默認值
kafka.consumer.bootstrap-servers=192.168.2.61:9092,192.168.2.61:9093 # 注意:相同的Topic下,相同的群組ID,只有一個消費者能消費到消息 #kafka.consumer.group-id=myGroupId1 # 消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下,讀取設置。 # latest: (默認)讀取最新的,earliest: 讀取最早的,none: 如果沒有為使用者的組找到偏移量,則consumer拋出異常,anything else: consumer拋出異常 kafka.consumer.auto-offset-reset=latest # 是否自動提交偏移,默認true。偏移量自己控制,可以有效避免重復讀、漏讀 kafka.consumer.enable-auto-commit=false # 自動提交間隔,默認5秒。從開始消費一條數據到業務結束,必須在5秒內完成,否則會造成提前提交偏移量,如果出現事務失敗,將會漏掉該條消費 #kafka.consumer.auto.commit.interval.ms=5000 # 把分區分配給消費者的策略。RangeAssignor:默認。采用大部分分區都分配給消費者群組里的群主(即消費者0)的策略。RoundRobinAssignor:采用所有消費者平均分配分區策略 # 注意:無論分區個數變化或者消費者個數變化,都會觸發再分配 kafka.consumer.partition-assignment-strategy=org.apache.kafka.clients.consumer.RangeAssignor.class # 客戶端在進行發送和消費的時候,會緩存kafka的元數據。默認30秒 kafka.consumer.metadata-max-age-ms=30000 # consumer最小拉取多大的數據,默認值1,就是立即發送。達不到這個數據就等待。注意:這里不是根據消費數據條數,而是數據大小,這樣設計主要避免每個數據之間大小差距過大。 kafka.consumer.fetch.min.bytes=1 # consumer最多等待10秒就消費一次數據,默認500ms kafka.consumer.fetch.max.wait.ms=10000 # 控制每次poll方法返回的記錄數量,默認500。這個配置僅僅作用於手動 poll消費的情況下,在springboot中由於使用 @KafkaListener注解消費所以基本沒用 kafka.consumer.max-poll-records=500
在springboot框架里,手動封裝kafka生產者對象,並@bean對象注入到SpringBoot容器中去:
先定義pojo 類:
@Component @ConfigurationProperties(prefix = "kafka.consumer") public class KafkaConsumerConfigModel { // 這里就是一個簡單的pojo類,定義application.properties配置文件的kafka.consumer開頭的所有字段. private String bootstrapServers; ...... // getter and setter }
再定義kafka consumer 工廠類:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.support.TopicPartitionOffset; import org.springframework.util.CollectionUtils; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaConsumerConfig { private Logger logger = LoggerFactory.getLogger(KafkaProducerConfig.class); @Autowired KafkaConsumerConfigModel config; @Bean("consumerFactory") public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit()); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Arrays.asList( RangeAssignor.class)); propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, config.getFetchMinBytes()); propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, config.getFetchMaxWaitMs()); propsMap.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, config.getMetadataMaxAgeMs()); ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(propsMap); return factory; } @Bean("kafkaListenerContainerFactory") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> getKafkaListenerContainerFactory( @Autowired ConsumerFactory<String, String> consumerFactory ) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setConcurrency(1); factory.getContainerProperties().setPollTimeout(1500); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // factory.createContainer( new TopicPartitionOffset(Constant.TOPIC, 0)); return factory; } }
最后kafka consumer消費者長這樣:
import com.joyce.kafka.Constant; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; @Component public class KafkaConsumer { private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); // 相同的groupId的消費者只能有一個接收到消息 @KafkaListener(groupId="mygroup-1",topics = Constant.TOPIC ) public void listen1(String data) { logger.info("消費到消息1: [{}]", data); } @KafkaListener(groupId="mygroup-2",topics = Constant.TOPIC) public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) { logger.info("消費到消息2: [{}]", record.value()); logger.info("消費到消息2|"+String.format( "主題:%s,分區:%d,偏移量:%d,key:%s,value:%s", record.topic(),record.partition(),record.offset(), record.key(),record.value()));//提交offset ack.acknowledge(); } @KafkaListener(groupId="mygroup-3", topics = Constant.TOPIC) public void test(String data, Acknowledgment ack) { // ConsumerRecord<String, String> record logger.info("消費到消息3: [{}]", data); //提交offset ack.acknowledge(); } }
end.