springboot整合kafka實現消息推送


前言

本篇文章主要介紹的是springboot整合kafka。

安裝kafka

1.使用docker安裝kafka,移步 https://www.cnblogs.com/lixianguo/p/13254950.html

創建工程

1.創建一個名為springboot-kafka的pom項目作為父工程,將main和resource文件夾都刪除,pom文件添加配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.lxg</groupId>
    <artifactId>springboot-kafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging>

    <name>springboot-kafka</name>
    <modules>
        <module>springboot-kafka-common</module>
        <module>springboot-kafka-consumer</module>
        <module>springboot-kafka-producer</module>
    </modules>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.67</version>
        </dependency>
    </dependencies>

</project>
3.創建公共服務模塊

創建一個名為springboot-kafka-common的微服務,打包方式為jar,存放一些公共配置和公共類,如util等
1.配置pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>com.lxg</groupId>
        <artifactId>springboot-kafka</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>springboot-kafka-common</artifactId>
</project>

pom文件中以父工程作為父依賴,就不需要額外引入依賴了
2.新建一個user實體類

@Data
public class User implements Serializable {
    /**
     * id
     */
    private Integer id;

    /**
     * 用戶名字
     */
    private String username;

    /**
     * 密碼
     */
    private String password;
}

3.創建application-common.yml配置文件,主要添加kafka的公共配置

spring:
  kafka:
    #kafka配置
    bootstrap-servers: 192.168.56.102:9092
    producer:
      retries: 0
      # 每次批量發送消息的數量
      batch-size: 16384
      buffer-memory: 33554432
      # 指定消息key和消息體的編解碼方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # 指定默認消費者group id
      group-id: test-consumer-group
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 5000
      # 指定消息key和消息體的編解碼方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    #自己定義的主題名稱,在微服務中使用Value注解注入調用,如果kafka中沒有該主題,則會自動創建
    topic:
      userTopic: userInfo
4.創建消息生產者,即創建一個名為springboot-kafka-producer的普通springboot項目

1.pom文件配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <groupId>com.lxg</groupId>
    <artifactId>springboot-kafka-producer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <modelVersion>4.0.0</modelVersion>

    <dependencies>
        <dependency>
            <groupId>com.lxg</groupId>
            <artifactId>springboot-kafka-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

</project>

2.application.yml配置文件,配置端口,設置微服務名稱,引入公共服務模塊中的application-common.yml

server:
  port: 8081
spring:
  application:
    name: kafka-producer
  profiles:
    active: common

3.controller層
創建UserController

@Slf4j
@Controller
@RequestMapping("/api/user")
public class UserController {
    @Autowired
    private UserService userService;

    @ResponseBody
    @GetMapping("/getUser")
    public void getUser() {
        userService.sendUserMsg();
        log.info("getUser");
    }
}

4.service層
創建UserService

public interface UserService {
    /**
     * 發送用戶信息
     *
     * @return
     */
    Boolean sendUserMsg();
}

創建UserServiceImpl

@Slf4j
@Service
public class UserServiceImpl implements UserService {
    @Value("${spring.kafka.topic.userTopic}")
    private String userTopic;

    @Autowired
    KafkaTemplate kafkaTemplate;


    @Override
    public Boolean sendUserMsg() {
        User user = new User();
        user.setId(1);
        user.setUsername("lxg");
        user.setPassword("6767167");
        kafkaTemplate.send(userTopic, JSONObject.toJSONString(user));
        log.info("lxg");
        return Boolean.TRUE;
    }
}

5.創建啟動類

@SpringBootApplication
public class WebApplication {
    public static void main(String[] args) {
        SpringApplication.run(WebApplication.class, args);
    }
}
5.創建消息消費者

1.pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <groupId>com.lxg</groupId>
    <artifactId>springboot-kafka-consumer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <modelVersion>4.0.0</modelVersion>

    <dependencies>
        <dependency>
            <groupId>com.lxg</groupId>
            <artifactId>springboot-kafka-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>
</project>

2.創建yml配置文件

server:
  port: 8082
spring:
  application:
    name: kafka-consumer
  profiles:
    active: common

3.創建consumer消費者類

@Slf4j
@Component
public class UserConsumer {

    @KafkaListener(topics = {"${spring.kafka.topic.userTopic}"})
    public void userConsumer(String message) {
        log.info("receive msg " + message);
    }
}

4.啟動類

@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

測試

啟動producer和consumer兩個服務模塊
訪問producer微服務中的接口 http://localhost:8081/api/user/getUser
會發現consumer微服務中的控制台打印了producer中創建並推送過來的的user實體

本文GitHub源碼:https://github.com/lixianguo5097/springboot/tree/master/springboot-kafka

CSDN:https://blog.csdn.net/qq_27682773
簡書:https://www.jianshu.com/u/e99381e6886e
博客園:https://www.cnblogs.com/lixianguo
個人博客:https://www.lxgblog.com


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM