spring boot集成kafka簡單實用


 

先導入spring boot整合kafka的依賴

 <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

然后是application.yml中關於kafka的配置

 1 spring:
 2   kafka:
 3     bootstrap-servers: ip:端口 #kafka服務器地址 多個以,號隔開
 4     #生產者配置
 5     producer:
 6       batch-size: 16384  #每次批量發送信息的數量
 7       buffer-memory: 33554432 #達到該緩沖區大小就發送數據
 8       key-serializer: org.apache.kafka.common.serialization.StringSerializer #key的序列化器
 9       value-serializer: org.apache.kafka.common.serialization.StringSerializer #value的序列化器
10     #消費者配置
11     consumer:
12       group-id: test #消費者分組id,同一個分組的消費者不會讀取到同一個消息
13       enable-auto-commit: true #啟用自動提交偏移量
14       auto-commit-interval: 100 #設置偏移量提交間隔
15       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #key的反序列化器
16       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #value的反序列化器
17 
18 #自定義主題名,在代碼中可以讀取,方便修改主題
19 topic: testTopic

這是基礎配置,我生產者和消費者這里都是同一個工程進行測試了,如果是兩個不同的工程,只寫生產者或消費者的配置就行

 1 @Component
 2 public class KafkaProducer {
 3 
 4     @Autowired
 5     private KafkaTemplate<String,Object> kafkaTemplate;
 6 
 7     @Value("${topic}")
 8     private  String topic;
 9 
10     public  void sendMessage(String message){
11          kafkaTemplate.send(topic,message);
12     }
13 
14 }

這里新建了一個kafka生產者類用於發送信息到kafka,並且交給了spring 管理,在我們需要使用的地方直接注入進來即可.

這個KafkaTemplate<>類是spring boot提供的模板類,封裝了關於kafka的操作.

 

kafka消費者

1 @Component
2 public class KafkaConsumer {
3 
4     @KafkaListener(topics = {"${topic}"})
5     public void getMessage(String message){
6         System.out.println(message);
7     }
8 
9 }

同樣交由spring管理,然后使用@KafkaListener注解,定義要監聽的主題,至於{"${topic}"}這個是讀取配置文件中的topic配置項,也是我所定義的主題,但監聽到有信息時,

就會調用這個方法,將信息作為一個字符串參數傳遞進來,里面就可以寫你的業務邏輯了.

 

kafka大致使用就是這樣,很多細節配置,暫時也不清楚,有機會在補充吧 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM