SpringBoot與Kafka整合實現簡單分布式消息隊列
1、此處只是單純的梳理一下SpringBoot整合kafka,其他像Zookeeper、kafka等環境的安裝就不在詳
細說明,kafka安裝可參考https://www.cnblogs.com/jhtian/p/13708679.html Zookeeper安裝里面也有
相應的鏈接。
環境說明: Kafka:192.168.232.3:9020
Zookeeper:192.168.232.3:2181
192.168.232.4:2181(master)
192.168.232.5:2181
2、工程目錄(生產者-producer)
既然是SpringBoot整合kafka,那肯定是要搭建一個SpringBoot工程目錄,SpringBoot的搭建就不在
此處說明,搭建好之后的生產端工程如下
引入相應的依賴到pom.xml文件中去
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 4 <modelVersion>4.0.0</modelVersion> 5 <parent> 6 <groupId>org.springframework.boot</groupId> 7 <artifactId>spring-boot-starter-parent</artifactId> 8 <version>2.1.5.RELEASE</version> 9 <relativePath/> <!-- lookup parent from repository --> 10 </parent> 11 <groupId>com.tianjh</groupId> 12 <artifactId>kafka-producer</artifactId> 13 <version>0.0.1-SNAPSHOT</version> 14 <name>kafka-producer</name> 15 <description>Demo project for Spring Boot</description> 16 17 <properties> 18 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 19 <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 20 <java.version>1.8</java.version> 21 </properties> 22 <!--SpringBoot 啟動相關的依賴--> 23 <dependencies> 24 <dependency> 25 <groupId>org.springframework.boot</groupId> 26 <artifactId>spring-boot-starter</artifactId> 27 </dependency> 28 <dependency> 29 <groupId>org.springframework.boot</groupId> 30 <artifactId>spring-boot-starter-web</artifactId> 31 </dependency> 32 <!-- 日志文件依賴--> 33 <dependency> 34 <groupId>org.projectlombok</groupId> 35 <artifactId>lombok</artifactId> 36 </dependency> 37 <!-- ## SpringBoot 整合 kafka核心依賴 ##--> 38 <dependency> 39 <groupId>org.springframework.kafka</groupId> 40 <artifactId>spring-kafka</artifactId> 41 </dependency> 42 43 <!-- Springboot 單元測試--> 44 <dependency> 45 <groupId>org.springframework.boot</groupId> 46 <artifactId>spring-boot-starter-test</artifactId> 47 <scope>test</scope> 48 <exclusions> 49 <exclusion> 50 <groupId>org.junit.vintage</groupId> 51 <artifactId>junit-vintage-engine</artifactId> 52 </exclusion> 53 </exclusions> 54 </dependency> 55 </dependencies> 56 57 <build> 58 <plugins> 59 <plugin> 60 <groupId>org.springframework.boot</groupId> 61 <artifactId>spring-boot-maven-plugin</artifactId> 62 </plugin> 63 </plugins> 64 </build> 65 66 </project>
Springboot工程創建好之后會自動生成一個application.properties文件
1 server.servlet.context-path=/producer 2 server.port=8001 3 4 ## Spring 整合 kafka kafka的ip:port 5 spring.kafka.bootstrap-servers=192.168.232.3:9092 6 ## kafka producer 發送消息失敗時的重試次數 7 spring.kafka.producer.retries=0 8 ## 批量發送數據的配置 9 spring.kafka.producer.batch-size=16384 10 ## 設置kafka 生產者內存緩存區的大小(32M) 11 spring.kafka.producer.buffer-memory=33554432 12 ## kafka消息的序列化配置 13 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer 14 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer 15 16 # acks=0 : 生產者在成功寫入消息之前不會等待任何來自服務器的響應。 17 # acks=1 : 只要集群的首領節點收到消息,生產者就會收到一個來自服務器成功響應。 18 # acks=-1: 表示分區leader必須等待消息被成功寫入到所有的ISR副本(同步副本)中才認為producer請求成功。 19 # 這種方案提供最高的消息持久性保證,但是理論上吞吐率也是最差的。 20 21 ## 這個是kafka生產端最核心的配置 22 spring.kafka.producer.acks=1
啟動類Application就很簡單了
1 package com.tianjh.kafka; 2 3 import org.springframework.boot.SpringApplication; 4 import org.springframework.boot.autoconfigure.SpringBootApplication; 5 6 @SpringBootApplication 7 public class Application { 8 9 public static void main(String[] args) { 10 SpringApplication.run(Application.class, args); 11 } 12 13 }
接下來就是最重要的kafka發送消息的方法 KafkaProducerService.java
1 package com.tianjh.kafka.producer; 2 3 import lombok.extern.slf4j.Slf4j; 4 import org.springframework.beans.factory.annotation.Autowired; 5 import org.springframework.kafka.core.KafkaTemplate; 6 import org.springframework.kafka.support.SendResult; 7 import org.springframework.stereotype.Component; 8 import org.springframework.util.concurrent.ListenableFuture; 9 import org.springframework.util.concurrent.ListenableFutureCallback; 10 11 /** 12 * $KafkaProducerService 13 * @author tianjh 14 * Component 注解交由Spring管理 15 * Slf4j 引入日志 16 */ 17 @Slf4j 18 @Component 19 public class KafkaProducerService { 20 21 @Autowired 22 private KafkaTemplate<String, Object> kafkaTemplate; 23 24 /** 25 * 生產端真正發消息的方法 直接調用kafkaTemplate.send 26 * 方法進行發送 返回ListenableFuture<SendResult<K, V>> 27 * 使用future.addCallback 創建ListenableFutureCallback<SendResult<String, Object>>()對象 28 * 此處需要實現它的onSuccess、onFailure兩個方法 29 * @param topic 發到哪個topic上面 30 * @param object 要發送的消息內容 31 */ 32 public void sendMessage(String topic, Object object) { 33 ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object); 34 future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { 35 @Override 36 public void onSuccess(SendResult<String, Object> result) { 37 log.info("Kafka-Producer發放消息成功:" + result.toString()); 38 } 39 40 @Override 41 public void onFailure(Throwable throwable) { 42 log.info("Kafka-Producer發放消息失敗:" + throwable.getMessage()); 43 } 44 45 }); 46 } 47 48 }
3、工程目錄(消費端-consumer)
工程目錄結構和生成端差不多,可以直接拷貝生產端代碼進行簡單修改
pom.xml文件
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 4 <modelVersion>4.0.0</modelVersion> 5 <parent> 6 <groupId>org.springframework.boot</groupId> 7 <artifactId>spring-boot-starter-parent</artifactId> 8 <version>2.1.5.RELEASE</version> 9 <relativePath/> <!-- lookup parent from repository --> 10 </parent> 11 <groupId>com.tianjh</groupId> 12 <artifactId>kafka-consumer</artifactId> 13 <version>0.0.1-SNAPSHOT</version> 14 <name>kafka-consumer</name> 15 <description>Demo project for Spring Boot</description> 16 17 <properties> 18 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 19 <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 20 <java.version>1.8</java.version> 21 </properties> 22 23 <dependencies> 24 <dependency> 25 <groupId>org.springframework.boot</groupId> 26 <artifactId>spring-boot-starter</artifactId> 27 </dependency> 28 <dependency> 29 <groupId>org.springframework.boot</groupId> 30 <artifactId>spring-boot-starter-web</artifactId> 31 </dependency> 32 <dependency> 33 <groupId>org.projectlombok</groupId> 34 <artifactId>lombok</artifactId> 35 </dependency> 36 <dependency> 37 <groupId>org.springframework.kafka</groupId> 38 <artifactId>spring-kafka</artifactId> 39 </dependency> 40 41 <dependency> 42 <groupId>org.springframework.boot</groupId> 43 <artifactId>spring-boot-starter-test</artifactId> 44 <scope>test</scope> 45 <exclusions> 46 <exclusion> 47 <groupId>org.junit.vintage</groupId> 48 <artifactId>junit-vintage-engine</artifactId> 49 </exclusion> 50 </exclusions> 51 </dependency> 52 </dependencies> 53 54 <build> 55 <plugins> 56 <plugin> 57 <groupId>org.springframework.boot</groupId> 58 <artifactId>spring-boot-maven-plugin</artifactId> 59 </plugin> 60 </plugins> 61 </build> 62 63 </project>
消費端的SpringBoot的配置文件application.properties
1 server.servlet.context-path=/consumer 2 server.port=8002 3 4 spring.kafka.bootstrap-servers=192.168.232.3:9092 5 6 ## consumer 消息的簽收機制:手工簽收 等於false 表示需要手工簽收 7 spring.kafka.consumer.enable-auto-commit=false 8 spring.kafka.listener.ack-mode=manual 9 # 該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下該作何處理: 10 # latest(默認值)在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據(在消費者啟動之后生成的記錄) 11 # earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區的記錄 12 spring.kafka.consumer.auto-offset-reset=earliest 13 ## 序列化配置 14 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer 15 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer 16 17 spring.kafka.listener.concurrency=5
消費端消費消息的具體方法KafkaConsumerService
1 package com.tianjh.kafka.consumer; 2 3 import org.apache.kafka.clients.consumer.Consumer; 4 import org.apache.kafka.clients.consumer.ConsumerRecord; 5 import org.springframework.kafka.annotation.KafkaListener; 6 import org.springframework.kafka.support.Acknowledgment; 7 import org.springframework.stereotype.Component; 8 9 import lombok.extern.slf4j.Slf4j; 10 11 @Slf4j 12 @Component 13 public class KafkaConsumerService { 14 15 @KafkaListener(groupId = "group02", topics = "topic02") 16 public void onMessage(ConsumerRecord<String, Object> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) { 17 log.info("消費端接收消息: {}", record.value()); 18 // 手工簽收機制 19 acknowledgment.acknowledge(); 20 } 21 22 23 }
4、測試
在生產端的src/test/java下新建一個Java測試類,我這兒新建的是ApplicationTests.java
1 package com.tianjh.kafka.test; 2 3 import com.tianjh.kafka.producer.KafkaProducerService; 4 import org.junit.Test; 5 import org.junit.runner.RunWith; 6 import org.springframework.beans.factory.annotation.Autowired; 7 import org.springframework.boot.test.context.SpringBootTest; 8 import org.springframework.test.context.junit4.SpringRunner; 9 10 /** 11 * 測試發現消息 12 */ 13 @RunWith(SpringRunner.class) 14 @SpringBootTest 15 public class ApplicationTests { 16 /** 17 * 依賴注入之前寫好的發送 18 * 消息的實現類,調用send方法進行發送 19 */ 20 @Autowired 21 private KafkaProducerService kafkaProducerService; 22 23 @Test 24 public void send() { 25 try { 26 String topic = "topic02"; 27 for (int i = 0; i < 10; i++) { 28 kafkaProducerService.sendMessage(topic, "hello kafka" + i); 29 } 30 Thread.sleep(1000); 31 } catch (InterruptedException e) { 32 e.printStackTrace(); 33 } 34 } 35 }
生產端運行之后控制台打印:
說明消息發送成功,現在就看看消費端有沒有進行消費消息,運行之后查看控制台發現收到了消息
雖然消費端收到了消息但是並沒有真正的被消費,因為在消費端消費消息的時候注釋了手工簽收的代碼
// acknowledgment.acknowledge();
通過kafka控制台查看 使用如下命令
./kafka-consumer-groups.sh --bootstrap-server 192.168.232.3:9092 --describe --group group02
現在把消費端注釋掉的代碼放開,確認手工簽收 acknowledgment.acknowledge(); 接着再重新運行一下消費端項目
再去kafka使用上訴命令進行查看,如下:
這表明咋們發送的十條消息都被真正的進行消費了。