第一步:pom.xml配置文件添加kafka支持
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>
第二步:添加配置文件
#kafka
spring.kafka.inner.bootstrap-servers=172.17.0.2:9095,172.17.0.12:9095,172.17.0.13:9095
spring.kafka.inner.security-protocol=SASL_PLAINTEXT
spring.kafka.inner.sasl-mechanism=PLAIN
#=============== producer =======================
# 寫入失敗時,重試次數。當leader節點失效,一個repli節點會替代成為leader節點,此時可能出現寫入失敗,
# 當retris為0時,produce不會重復。retirs重發,此時repli節點完全成為leader節點,不會產生消息丟失。
spring.kafka.producer.retries=0
# 生產者jaas配置賬號密碼
spring.kafka.producer.sasl-jaas-config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"kafka\" password=\"kafkapswd\";
# 每次批量發送消息的數量,produce積累到一定數據,一次發送
spring.kafka.producer.batch-size=16384
# produce積累數據一次發送,緩存大小達到buffer.memory就發送數據
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.linger-ms=5
#=============== consumer =======================
# 指定默認消費者group id --> 由於在kafka中,同一組中的consumer不會讀取到同一個消息,依靠groud.id設置組名
spring.kafka.consumer.group-id=kafkaGroup
# 消費者jaas配置賬號密碼
spring.kafka.consumer.sasl-jaas-config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"mooc\" password=\"moocpswd\";
# smallest和largest才有效,如果smallest重新0開始讀取,如果是largest從logfile的offset讀取。一般情況下我們都是設置smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 設置自動提交offset
spring.kafka.consumer.enable-auto-commit=true
#如果'enable.auto.commit'為true,則消費者偏移自動提交給Kafka的頻率(以毫秒為單位),默認值為5000。
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.max-poll-records=1000
第三步:創建配置class
package com.xxx.service.config;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.inner.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.inner.security-protocol}")
private String kafkaSecurityProtocol;
@Value("${spring.kafka.inner.sasl-mechanism}")
private String kafkaSASLMechanism;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.sasl-jaas-config}")
private String kafkaConsumerSASLJaasConfig;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.max-poll-records}")
private String maxPollRecords;
@Value("${spring.kafka.producer.retries}")
private String producerRetries;
@Value("${spring.kafka.producer.sasl-jaas-config}")
private String kafkaProducerSASLJaasConfig;
@Value("${spring.kafka.producer.batch-size}")
private String producerBatchSize;
@Value("${spring.kafka.producer.linger-ms}")
private String producerLingerMs;
@Value("${spring.kafka.producer.buffer-memory}")
private String bufferMemory;
@Bean
public KafkaListenerContainerFactory<?> batchFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setBatchListener(true); // 開啟批量監聽
return factory;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> config = new HashMap<>();
//kafka地址
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//組id
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
if (!StringUtils.isEmpty(kafkaSecurityProtocol) && !StringUtils.isEmpty(kafkaSASLMechanism)
&& !StringUtils.isEmpty(kafkaConsumerSASLJaasConfig)) {
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
config.put(SaslConfigs.SASL_MECHANISM, kafkaSASLMechanism);
config.put("sasl.jaas.config", kafkaConsumerSASLJaasConfig);
}
return config;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.RETRIES_CONFIG, producerRetries); // 重試,0為不啟用重試機制
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, producerBatchSize); // 控制批處理大小,單位為字節,默認為16384
properties.put(ProducerConfig.LINGER_MS_CONFIG, producerLingerMs); // 批量發送,延遲為5毫秒,啟用該功能能有效減少生產者發送消息次數,從而提高並發量,默認為0
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); // 生產者可以使用的總內存字節來緩沖等待發送到服務器的記錄,默認為33554432,使用默認值即可
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
if (!StringUtils.isEmpty(kafkaSecurityProtocol) && !StringUtils.isEmpty(kafkaSASLMechanism)
&& !StringUtils.isEmpty(kafkaProducerSASLJaasConfig)) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
properties.put(SaslConfigs.SASL_MECHANISM, kafkaSASLMechanism);
properties.put("sasl.jaas.config", kafkaProducerSASLJaasConfig);
}
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
第四步:使用
//# 注入在config配置好的kafka對象
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping(value = "put_kafka")
public String put_kafka(String topic,String message){
kafkaTemplate.send(topic,message);
/*
*
# 直接使用send方法發送
kafkaTemplate.send(topic,message);
kafkaTemplate.send(topic,key,message);
kafkaTemplate.sendDefault(message);
kafkaTemplate.sendDefault(key,message);
* */
return "ok";
}
//# 批量接收數據,這里需要配置containerFactory,而我們在上述的kafkaConfig文件中配置了該消費者
@KafkaListener(topics = "topic",containerFactory = "batchFactory")
public void onMessage(List<ConsumerRecord<?, ?>> records){
List<Map<String,String>> mapList = new ArrayList<>();
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
System.out.println("Received: " + record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
System.out.println("接受數據:{}"+message);
}
}
}