SpringBoot與Kafka整合實現簡單分布式消息隊列


SpringBoot與Kafka整合實現簡單分布式消息隊列

 

1、此處只是單純的梳理一下SpringBoot整合kafka,其他像Zookeeper、kafka等環境的安裝就不在詳

細說明,kafka安裝可參考https://www.cnblogs.com/jhtian/p/13708679.html Zookeeper安裝里面也有

相應的鏈接。

環境說明:           Kafka:192.168.232.3:9020

        Zookeeper:192.168.232.3:2181

             192.168.232.4:2181(master)

             192.168.232.5:2181

2、工程目錄(生產者-producer)

既然是SpringBoot整合kafka,那肯定是要搭建一個SpringBoot工程目錄,SpringBoot的搭建就不在

此處說明,搭建好之后的生產端工程如下

引入相應的依賴到pom.xml文件中去

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 4     <modelVersion>4.0.0</modelVersion>
 5     <parent>
 6         <groupId>org.springframework.boot</groupId>
 7         <artifactId>spring-boot-starter-parent</artifactId>
 8         <version>2.1.5.RELEASE</version>
 9         <relativePath/> <!-- lookup parent from repository -->
10     </parent>
11     <groupId>com.tianjh</groupId>
12     <artifactId>kafka-producer</artifactId>
13     <version>0.0.1-SNAPSHOT</version>
14     <name>kafka-producer</name>
15     <description>Demo project for Spring Boot</description>
16 
17     <properties>
18         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
19         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
20         <java.version>1.8</java.version>
21     </properties>
22     <!--SpringBoot 啟動相關的依賴-->
23     <dependencies>
24         <dependency>
25             <groupId>org.springframework.boot</groupId>
26             <artifactId>spring-boot-starter</artifactId>
27         </dependency>
28         <dependency>
29             <groupId>org.springframework.boot</groupId>
30             <artifactId>spring-boot-starter-web</artifactId>
31         </dependency>
32         <!--        日志文件依賴-->
33         <dependency>
34             <groupId>org.projectlombok</groupId>
35             <artifactId>lombok</artifactId>
36         </dependency>
37         <!--        ## SpringBoot 整合 kafka核心依賴 ##-->
38         <dependency>
39             <groupId>org.springframework.kafka</groupId>
40             <artifactId>spring-kafka</artifactId>
41         </dependency>
42 
43         <!--        Springboot 單元測試-->
44         <dependency>
45             <groupId>org.springframework.boot</groupId>
46             <artifactId>spring-boot-starter-test</artifactId>
47             <scope>test</scope>
48             <exclusions>
49                 <exclusion>
50                     <groupId>org.junit.vintage</groupId>
51                     <artifactId>junit-vintage-engine</artifactId>
52                 </exclusion>
53             </exclusions>
54         </dependency>
55     </dependencies>
56 
57     <build>
58         <plugins>
59             <plugin>
60                 <groupId>org.springframework.boot</groupId>
61                 <artifactId>spring-boot-maven-plugin</artifactId>
62             </plugin>
63         </plugins>
64     </build>
65 
66 </project>

Springboot工程創建好之后會自動生成一個application.properties文件

 1 server.servlet.context-path=/producer
 2 server.port=8001
 3 
 4 ## Spring 整合 kafka  kafka的ip:port
 5 spring.kafka.bootstrap-servers=192.168.232.3:9092
 6 ## kafka producer 發送消息失敗時的重試次數
 7 spring.kafka.producer.retries=0
 8 ## 批量發送數據的配置
 9 spring.kafka.producer.batch-size=16384
10 ## 設置kafka 生產者內存緩存區的大小(32M)
11 spring.kafka.producer.buffer-memory=33554432
12 ## kafka消息的序列化配置
13 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
14 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
15 
16 # acks=0 : 生產者在成功寫入消息之前不會等待任何來自服務器的響應。
17 # acks=1 : 只要集群的首領節點收到消息,生產者就會收到一個來自服務器成功響應。
18 # acks=-1: 表示分區leader必須等待消息被成功寫入到所有的ISR副本(同步副本)中才認為producer請求成功。
19 # 這種方案提供最高的消息持久性保證,但是理論上吞吐率也是最差的。
20 
21 ##     這個是kafka生產端最核心的配置
22 spring.kafka.producer.acks=1

啟動類Application就很簡單了

 1 package com.tianjh.kafka;
 2 
 3 import org.springframework.boot.SpringApplication;
 4 import org.springframework.boot.autoconfigure.SpringBootApplication;
 5 
 6 @SpringBootApplication
 7 public class Application {
 8 
 9     public static void main(String[] args) {
10         SpringApplication.run(Application.class, args);
11     }
12 
13 }

接下來就是最重要的kafka發送消息的方法 KafkaProducerService.java

 1 package com.tianjh.kafka.producer;
 2 
 3 import lombok.extern.slf4j.Slf4j;
 4 import org.springframework.beans.factory.annotation.Autowired;
 5 import org.springframework.kafka.core.KafkaTemplate;
 6 import org.springframework.kafka.support.SendResult;
 7 import org.springframework.stereotype.Component;
 8 import org.springframework.util.concurrent.ListenableFuture;
 9 import org.springframework.util.concurrent.ListenableFutureCallback;
10 
11 /**
12  * $KafkaProducerService
13  * @author tianjh
14  * Component 注解交由Spring管理
15  * Slf4j 引入日志
16  */
17 @Slf4j
18 @Component
19 public class KafkaProducerService {
20 
21     @Autowired
22     private KafkaTemplate<String, Object> kafkaTemplate;
23 
24     /**
25      * 生產端真正發消息的方法 直接調用kafkaTemplate.send
26      * 方法進行發送 返回ListenableFuture<SendResult<K, V>>
27      * 使用future.addCallback 創建ListenableFutureCallback<SendResult<String, Object>>()對象
28      * 此處需要實現它的onSuccess、onFailure兩個方法
29      * @param topic  發到哪個topic上面
30      * @param object 要發送的消息內容
31      */
32     public void sendMessage(String topic, Object object) {
33         ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);
34         future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
35             @Override
36             public void onSuccess(SendResult<String, Object> result) {
37                 log.info("Kafka-Producer發放消息成功:" + result.toString());
38             }
39 
40             @Override
41             public void onFailure(Throwable throwable) {
42                 log.info("Kafka-Producer發放消息失敗:" + throwable.getMessage());
43             }
44 
45         });
46     }
47 
48 }

 3、工程目錄(消費端-consumer)

工程目錄結構和生成端差不多,可以直接拷貝生產端代碼進行簡單修改

 pom.xml文件

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 4     <modelVersion>4.0.0</modelVersion>
 5     <parent>
 6         <groupId>org.springframework.boot</groupId>
 7         <artifactId>spring-boot-starter-parent</artifactId>
 8         <version>2.1.5.RELEASE</version>
 9         <relativePath/> <!-- lookup parent from repository -->
10     </parent>
11     <groupId>com.tianjh</groupId>
12     <artifactId>kafka-consumer</artifactId>
13     <version>0.0.1-SNAPSHOT</version>
14     <name>kafka-consumer</name>
15     <description>Demo project for Spring Boot</description>
16 
17     <properties>
18         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
19         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
20         <java.version>1.8</java.version>
21     </properties>
22 
23     <dependencies>
24         <dependency>
25             <groupId>org.springframework.boot</groupId>
26             <artifactId>spring-boot-starter</artifactId>
27         </dependency>
28         <dependency>
29             <groupId>org.springframework.boot</groupId>
30             <artifactId>spring-boot-starter-web</artifactId>
31         </dependency>
32         <dependency>
33             <groupId>org.projectlombok</groupId>
34             <artifactId>lombok</artifactId>
35         </dependency>
36         <dependency>
37             <groupId>org.springframework.kafka</groupId>
38             <artifactId>spring-kafka</artifactId>
39         </dependency>
40 
41         <dependency>
42             <groupId>org.springframework.boot</groupId>
43             <artifactId>spring-boot-starter-test</artifactId>
44             <scope>test</scope>
45             <exclusions>
46                 <exclusion>
47                     <groupId>org.junit.vintage</groupId>
48                     <artifactId>junit-vintage-engine</artifactId>
49                 </exclusion>
50             </exclusions>
51         </dependency>
52     </dependencies>
53 
54     <build>
55         <plugins>
56             <plugin>
57                 <groupId>org.springframework.boot</groupId>
58                 <artifactId>spring-boot-maven-plugin</artifactId>
59             </plugin>
60         </plugins>
61     </build>
62 
63 </project>

消費端的SpringBoot的配置文件application.properties

 1 server.servlet.context-path=/consumer
 2 server.port=8002
 3 
 4 spring.kafka.bootstrap-servers=192.168.232.3:9092
 5 
 6 ## consumer 消息的簽收機制:手工簽收 等於false 表示需要手工簽收
 7 spring.kafka.consumer.enable-auto-commit=false
 8 spring.kafka.listener.ack-mode=manual
 9 # 該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下該作何處理:
10 # latest(默認值)在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據(在消費者啟動之后生成的記錄)
11 # earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區的記錄
12 spring.kafka.consumer.auto-offset-reset=earliest
13 ## 序列化配置
14 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
15 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
16 
17 spring.kafka.listener.concurrency=5

消費端消費消息的具體方法KafkaConsumerService

 1 package com.tianjh.kafka.consumer;
 2 
 3 import org.apache.kafka.clients.consumer.Consumer;
 4 import org.apache.kafka.clients.consumer.ConsumerRecord;
 5 import org.springframework.kafka.annotation.KafkaListener;
 6 import org.springframework.kafka.support.Acknowledgment;
 7 import org.springframework.stereotype.Component;
 8 
 9 import lombok.extern.slf4j.Slf4j;
10 
11 @Slf4j
12 @Component
13 public class KafkaConsumerService {
14 
15     @KafkaListener(groupId = "group02", topics = "topic02")
16     public void onMessage(ConsumerRecord<String, Object> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
17         log.info("消費端接收消息: {}", record.value());
18         //    手工簽收機制
19         acknowledgment.acknowledge();
20     }
21     
22     
23 }

4、測試

在生產端的src/test/java下新建一個Java測試類,我這兒新建的是ApplicationTests.java

 1 package com.tianjh.kafka.test;
 2 
 3 import com.tianjh.kafka.producer.KafkaProducerService;
 4 import org.junit.Test;
 5 import org.junit.runner.RunWith;
 6 import org.springframework.beans.factory.annotation.Autowired;
 7 import org.springframework.boot.test.context.SpringBootTest;
 8 import org.springframework.test.context.junit4.SpringRunner;
 9 
10 /**
11  * 測試發現消息
12  */
13 @RunWith(SpringRunner.class)
14 @SpringBootTest
15 public class ApplicationTests {
16     /**
17      * 依賴注入之前寫好的發送
18      * 消息的實現類,調用send方法進行發送
19      */
20     @Autowired
21     private KafkaProducerService kafkaProducerService;
22 
23     @Test
24     public void send() {
25         try {
26         String topic = "topic02";
27         for (int i = 0; i < 10; i++) {
28             kafkaProducerService.sendMessage(topic, "hello kafka" + i);
29         }
30             Thread.sleep(1000);
31         } catch (InterruptedException e) {
32             e.printStackTrace();
33         }
34     }
35 }

生產端運行之后控制台打印:

 說明消息發送成功,現在就看看消費端有沒有進行消費消息,運行之后查看控制台發現收到了消息

 雖然消費端收到了消息但是並沒有真正的被消費,因為在消費端消費消息的時候注釋了手工簽收的代碼

//     acknowledgment.acknowledge();

通過kafka控制台查看 使用如下命令

 ./kafka-consumer-groups.sh --bootstrap-server 192.168.232.3:9092 --describe --group group02 

 現在把消費端注釋掉的代碼放開,確認手工簽收 acknowledgment.acknowledge(); 接着再重新運行一下消費端項目

再去kafka使用上訴命令進行查看,如下:

 

這表明咋們發送的十條消息都被真正的進行消費了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM