前言
本篇文章主要介紹的是springboot整合kafka。
安裝kafka
1.使用docker安裝kafka,移步 https://www.cnblogs.com/lixianguo/p/13254950.html
創建工程
1.創建一個名為springboot-kafka的pom項目作為父工程,將main和resource文件夾都刪除,pom文件添加配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.lxg</groupId>
<artifactId>springboot-kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>springboot-kafka</name>
<modules>
<module>springboot-kafka-common</module>
<module>springboot-kafka-consumer</module>
<module>springboot-kafka-producer</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.67</version>
</dependency>
</dependencies>
</project>
3.創建公共服務模塊
創建一個名為springboot-kafka-common的微服務,打包方式為jar,存放一些公共配置和公共類,如util等
1.配置pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.lxg</groupId>
<artifactId>springboot-kafka</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>springboot-kafka-common</artifactId>
</project>
pom文件中以父工程作為父依賴,就不需要額外引入依賴了
2.新建一個user實體類
@Data
public class User implements Serializable {
/**
* id
*/
private Integer id;
/**
* 用戶名字
*/
private String username;
/**
* 密碼
*/
private String password;
}
3.創建application-common.yml配置文件,主要添加kafka的公共配置
spring:
kafka:
#kafka配置
bootstrap-servers: 192.168.56.102:9092
producer:
retries: 0
# 每次批量發送消息的數量
batch-size: 16384
buffer-memory: 33554432
# 指定消息key和消息體的編解碼方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# 指定默認消費者group id
group-id: test-consumer-group
auto-offset-reset: earliest
enable-auto-commit: true
auto-commit-interval: 5000
# 指定消息key和消息體的編解碼方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#自己定義的主題名稱,在微服務中使用Value注解注入調用,如果kafka中沒有該主題,則會自動創建
topic:
userTopic: userInfo
4.創建消息生產者,即創建一個名為springboot-kafka-producer的普通springboot項目
1.pom文件配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>com.lxg</groupId>
<artifactId>springboot-kafka-producer</artifactId>
<version>1.0-SNAPSHOT</version>
<modelVersion>4.0.0</modelVersion>
<dependencies>
<dependency>
<groupId>com.lxg</groupId>
<artifactId>springboot-kafka-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
2.application.yml配置文件,配置端口,設置微服務名稱,引入公共服務模塊中的application-common.yml
server:
port: 8081
spring:
application:
name: kafka-producer
profiles:
active: common
3.controller層
創建UserController
@Slf4j
@Controller
@RequestMapping("/api/user")
public class UserController {
@Autowired
private UserService userService;
@ResponseBody
@GetMapping("/getUser")
public void getUser() {
userService.sendUserMsg();
log.info("getUser");
}
}
4.service層
創建UserService
public interface UserService {
/**
* 發送用戶信息
*
* @return
*/
Boolean sendUserMsg();
}
創建UserServiceImpl
@Slf4j
@Service
public class UserServiceImpl implements UserService {
@Value("${spring.kafka.topic.userTopic}")
private String userTopic;
@Autowired
KafkaTemplate kafkaTemplate;
@Override
public Boolean sendUserMsg() {
User user = new User();
user.setId(1);
user.setUsername("lxg");
user.setPassword("6767167");
kafkaTemplate.send(userTopic, JSONObject.toJSONString(user));
log.info("lxg");
return Boolean.TRUE;
}
}
5.創建啟動類
@SpringBootApplication
public class WebApplication {
public static void main(String[] args) {
SpringApplication.run(WebApplication.class, args);
}
}
5.創建消息消費者
1.pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>com.lxg</groupId>
<artifactId>springboot-kafka-consumer</artifactId>
<version>1.0-SNAPSHOT</version>
<modelVersion>4.0.0</modelVersion>
<dependencies>
<dependency>
<groupId>com.lxg</groupId>
<artifactId>springboot-kafka-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
2.創建yml配置文件
server:
port: 8082
spring:
application:
name: kafka-consumer
profiles:
active: common
3.創建consumer消費者類
@Slf4j
@Component
public class UserConsumer {
@KafkaListener(topics = {"${spring.kafka.topic.userTopic}"})
public void userConsumer(String message) {
log.info("receive msg " + message);
}
}
4.啟動類
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
測試
啟動producer和consumer兩個服務模塊
訪問producer微服務中的接口 http://localhost:8081/api/user/getUser
會發現consumer微服務中的控制台打印了producer中創建並推送過來的的user實體
本文GitHub源碼:https://github.com/lixianguo5097/springboot/tree/master/springboot-kafka
CSDN:https://blog.csdn.net/qq_27682773
簡書:https://www.jianshu.com/u/e99381e6886e
博客園:https://www.cnblogs.com/lixianguo
個人博客:https://www.lxgblog.com