spring-kafka 使得在 Spring 環境中使用 Kafka 變的很簡單,只需少量的配置和少量的代碼就可以發送和接受消息了。本文主要介紹在 SpringBoot 中用 spring-kafka 操作 Kafka,文中使用到的軟件版本:Kafka 2.8.0、SpringBoot 2.4.6、Java 1.8.0_191。
1、參數說明
spring-kafka 中參數是以 spring.kafka 開頭的,后面的參數名稱和 Kafka 的原始參數很類似,只不過 spring-kafka 會把一些參數中的 "." 改為 "-",如 auto.offset.reset 改為 spring.kafka.consumer.auto-offset-reset。
前綴 | 描述 |
spring.kafka | Spring 中 Kafka 相關配置總的前綴 |
spring.kafka.consumer | 消費者相關參數 |
spring.kafka.producer | 生產者相關參數 |
spring.kafka.admin | Kafka 管理相關參數 |
kafka 的原始參數說明可參考:Kafka入門實戰(1)-概念、安裝及簡單使用;或參考官方文檔。
2、SpringBoot 整合 Kafka
2.1、引入依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!--流處理需要用到--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> </dependency>
2.2、增加 Kafka 配置
spring: kafka: bootstrap-servers: 10.40.100.69:9092 producer: acks: all transaction-id-prefix: tx. #開啟事務,發送消息的方法需增加@Transactional注解 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: groupA auto-offset-reset: streams: application-id: streams-test properties: "[default.key.serde]": org.apache.kafka.common.serialization.Serdes$StringSerde "[default.value.serde]": org.apache.kafka.common.serialization.Serdes$StringSerde
2.3、發送消息
package com.abc.demo.kafka; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @Component public class Producer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Transactional @Scheduled(cron = "0/10 * * * * ?") public void sendMessage() { for (int i = 0; i < 10; i++) { kafkaTemplate.send("test", "消息" + i); } } // @Scheduled(cron = "0/10 * * * * ?") // public void sendMessage2() { // kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback(){ // @Override // public Object doInOperations(KafkaOperations kafkaOperations) { // for (int i = 0; i < 10; i++) { // kafkaTemplate.send("test", "消息" + i); // } // return null; // } // }); // } }
2.4、接受消息
package com.abc.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class Consumer { private static Logger logger = LoggerFactory.getLogger(Consumer.class); @KafkaListener(topics = "test") public void recevieMessage(ConsumerRecord<String, String> record) { logger.info("offset={}, key={}, value={}", record.offset(), record.key(), record.value()); } }
2.5、流處理
package com.abc.demo.kafka; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.support.serializer.JsonSerde; import java.util.HashMap; import java.util.Map; @Configuration(proxyBeanMethods = false) @EnableKafkaStreams public class StreamConfig { @Bean public KStream<String, String> kStream(StreamsBuilder streamsBuilder) { KStream<String, String> stream = streamsBuilder.stream("stream-in"); //從 stream-in 隊列中讀取數據,處理后發送給 stream-out 隊列 //發送數據 key,value 分別使用的序列化類為Serdes.String(),JsonSerde stream.map(this::uppercaseValue).to("stream-out", Produced.with(Serdes.String(), new JsonSerde<>())); return stream; } /** * 消息轉換,新的消息:key-原來的value值,value-一個map */ private KeyValue<String, Map> uppercaseValue(String key, String value) { Map<String, String> map = new HashMap<>(); map.put("message", value.toUpperCase()); map.put("timestamp", System.currentTimeMillis() + ""); return new KeyValue(value, map); } }
程序從 stream-in 中讀取消息,對消息加工后再發送給 stream-out;打開兩個終端,一個往 stream-in 發送消息,一個接受 stream-out 的消息。
./kafka-console-producer.sh --broker-list 10.40.100.69:9092 --topic stream-in #發送消息 ./kafka-console-consumer.sh --bootstrap-server 10.40.100.69:9092 --topic stream-out --property print.key=true #接受消息
stream-in 的輸入:
stream-out 的輸出: