Spring Boot 的便捷,無出其右,仍然是三部曲,創建springboot 項目,配置項目,編寫示例代碼。
安裝 Kafka 測試環境請參加: https://blog.csdn.net/fishpro/article/details/105761986
本示例代碼https://github.com/fishpro/spring-boot-study/tree/master/spring-boot-study-kafka
1 新建 Spring Boot Maven 示例工程項目
注意:是用來 IDEA 開發工具
File > New > Project
,如下圖選擇Spring Initializr
然后點擊 【Next】下一步- 填寫
GroupId
(包名)、Artifact
(項目名) 即可。點擊 下一步
groupId=com.fishpro
artifactId=kafka - 選擇依賴
Spring Web Starter
前面打鈎。 - 項目名設置為
spring-boot-study-kafka
.
2 引入依賴 Pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.fishpro</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>public</id>
<name>aliyun nexus</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
</project>
3 application 配置
spring:
kafka:
# 指定kafka server的地址,集群配多個,中間,逗號隔開
bootstrap-servers: 127.0.0.1:9092
# 生產者
producer:
# 寫入失敗時,重試次數。當leader節點失效,一個repli節點會替代成為leader節點,此時可能出現寫入失敗,
# 當retris為0時,produce不會重復。retirs重發,此時repli節點完全成為leader節點,不會產生消息丟失。
retries: 0
# 每次批量發送消息的數量,produce積累到一定數據,一次發送
batch-size: 16384
# produce積累數據一次發送,緩存大小達到buffer.memory就發送數據
buffer-memory: 33554432
# 指定消息key和消息體的編解碼方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
linger.ms: 1
# 消費者
consumer:
enable-auto-commit: false
auto-commit-interval: 100ms
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
session.timeout.ms: 15000
group-id: group
server:
port: 8081
4 代碼實例
4.1 發送消息
建立文件 KafkaDemoController.java 設置為 RestController
package com.fishpro.kafka.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaDemoController {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
@GetMapping("/message/send")
public boolean send(@RequestParam String message){
kafkaTemplate.send("testTopic",message);
return true;
}
}
4.2 接收消息
新建文件 CustomerListener 設置標簽為 @Component
package com.fishpro.kafka.listener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class CustomerListener {
@KafkaListener(topics="testTopic")
public void onMessage(String message){
System.out.println(message);
}
}
4.3 測試發送與接收
輸入發送消息 url http://localhost:8081/message/send?message=abc
此時 CustomerListener 也會實時接收到消息。
4.4 問題
- 出現了:springboot整合kafka出現No group.id found in consumer config
- 原因是 未配置消費端
# 消費者
consumer:
enable-auto-commit: false
auto-commit-interval: 100ms
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
session.timeout.ms: 15000
group-id: group
4.5 異步同步消息
建立異步消息同步消息發送
package com.fishpro.kafka.service;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Service
public class KafkaSendService {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
/**
* 異步示例
* */
public void sendAnsyc(final String topic,final String message){
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic,message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("發送消息成功:" + result);
}
@Override
public void onFailure(Throwable ex) {
System.out.println("發送消息失敗:"+ ex.getMessage());
}
});
}
/**
* 同步示例
* */
public void sendSync(final String topic,final String message){
ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, message);
try {
kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS);
System.out.println("發送成功");
}
catch (ExecutionException e) {
System.out.println("發送消息失敗:"+ e.getMessage());
}
catch (TimeoutException | InterruptedException e) {
System.out.println("發送消息失敗:"+ e.getMessage());
}
}
}
修改 KafkaDemoController 增加異步同步消息測試
/**
* kafka 消息發送
* */
@RestController
public class KafkaDemoController {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
@Autowired
private KafkaSendService kafkaSendService;
@GetMapping("/message/send")
public boolean send(@RequestParam String message){
kafkaTemplate.send("testTopic",message);
return true;
}
//同步
@GetMapping("/message/sendSync")
public boolean sendSync(@RequestParam String message){
kafkaSendService.sendSync("synctopic",message);
return true;
}
//異步示例
@GetMapping("/message/sendAnsyc")
public boolean sendAnsys(@RequestParam String message){
kafkaSendService.sendAnsyc("ansyctopic",message);
return true;
}
//事務消息發送
// @GetMapping("/message/sendTransaction")
// public boolean sendTransaction(){
// kafkaTemplate.executeInTransaction(kafkaTemplate -> {
// kafkaTemplate.send("transactionTopic", "TransactionMessage");
// return true;
// });
// return true;
// }
}