先導入spring boot整合kafka的依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
然后是application.yml中關於kafka的配置
1 spring: 2 kafka: 3 bootstrap-servers: ip:端口 #kafka服務器地址 多個以,號隔開 4 #生產者配置 5 producer: 6 batch-size: 16384 #每次批量發送信息的數量 7 buffer-memory: 33554432 #達到該緩沖區大小就發送數據 8 key-serializer: org.apache.kafka.common.serialization.StringSerializer #key的序列化器 9 value-serializer: org.apache.kafka.common.serialization.StringSerializer #value的序列化器 10 #消費者配置 11 consumer: 12 group-id: test #消費者分組id,同一個分組的消費者不會讀取到同一個消息 13 enable-auto-commit: true #啟用自動提交偏移量 14 auto-commit-interval: 100 #設置偏移量提交間隔 15 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #key的反序列化器 16 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #value的反序列化器 17 18 #自定義主題名,在代碼中可以讀取,方便修改主題 19 topic: testTopic
這是基礎配置,我生產者和消費者這里都是同一個工程進行測試了,如果是兩個不同的工程,只寫生產者或消費者的配置就行
1 @Component 2 public class KafkaProducer { 3 4 @Autowired 5 private KafkaTemplate<String,Object> kafkaTemplate; 6 7 @Value("${topic}") 8 private String topic; 9 10 public void sendMessage(String message){ 11 kafkaTemplate.send(topic,message); 12 } 13 14 }
這里新建了一個kafka生產者類用於發送信息到kafka,並且交給了spring 管理,在我們需要使用的地方直接注入進來即可.
這個KafkaTemplate<>類是spring boot提供的模板類,封裝了關於kafka的操作.
kafka消費者
1 @Component 2 public class KafkaConsumer { 3 4 @KafkaListener(topics = {"${topic}"}) 5 public void getMessage(String message){ 6 System.out.println(message); 7 } 8 9 }
同樣交由spring管理,然后使用@KafkaListener注解,定義要監聽的主題,至於{"${topic}"}這個是讀取配置文件中的topic配置項,也是我所定義的主題,但監聽到有信息時,
就會調用這個方法,將信息作為一個字符串參數傳遞進來,里面就可以寫你的業務邏輯了.
kafka大致使用就是這樣,很多細節配置,暫時也不清楚,有機會在補充吧