Spring boot 集成Kafka


 

搭建Kafka集群,參考:

https://www.cnblogs.com/jonban/p/kafka.html

 

源碼示例如下:

 

1、新建 Maven 項目 kafka

 

2、pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
        http://maven.apache.org/xsd/maven-4.0.0.xsd">


    <modelVersion>4.0.0</modelVersion>
    <groupId>com.java</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0.0</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
    </parent>


    <dependencies>

        <!-- Spring Boot -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>



        <!-- 熱部署 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>springloaded</artifactId>
            <version>1.2.8.RELEASE</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>provided</scope>
        </dependency>

    </dependencies>

    <build>
        <finalName>${project.artifactId}</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 

3、KafkaStarter.java

package com.java;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 主啟動類
 * 
 * @author Logan
 * @version 1.0.0
 * @createDate 2019-05-07
 *
 */
@SpringBootApplication
public class KafkaStarter {

    public static void main(String[] args) {
        SpringApplication.run(KafkaStarter.class, args);
    }

}

 

4、MessageHandler.java

package com.java.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * 
 * 任務處理器,監聽kafka隊列中的消息,消費並處理
 * 
 * @author Logan
 * @version 1.0.0
 * @createDate 2019-05-07
 *
 */
@Component
public class MessageHandler {

    @KafkaListener(topics = { "test-topic" })
    public void handle(String message) {
        System.out.println("[ 處理器開始處理消息 ]" + System.currentTimeMillis());

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(message);

        System.out.println("[ 處理器處理消息完成 ]" + System.currentTimeMillis());
    }

    @KafkaListener(topics = { "test-topic" })
    public void handle(ConsumerRecord<String, String> record) {
        System.out.println("[ 處理器開始處理消息 ]" + System.currentTimeMillis());

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(record);

        System.out.println("[ 處理器處理消息完成 ]" + System.currentTimeMillis());
    }

}

 

5、SendMessageController.java

package com.java.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 發送消息類
 * 
 * @author Logan
 * @version 1.0.0
 * @createDate 2019-05-07
 *
 */
@RestController
public class SendMessageController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private String topic = "test-topic";

    @GetMapping("/send")
    public String send(String params) {
        System.out.println("[ 收到請求 ]");

        kafkaTemplate.send(topic, params);

        System.out.println("[ 返回響應 ]");
        return "您的任務已提交";
    }

}

 

6、application.properties

#生產者配置,參見org.springframework.boot.autoconfigure.kafka.KafkaProperties.Producer
spring.kafka.producer.bootstrapServers=s1:9092,s2:9092,s3:9092


#消費者配置,參見org.springframework.boot.autoconfigure.kafka.KafkaProperties.Consumer
spring.kafka.consumer.bootstrapServers=s1:9092,s2:9092,s3:9092
spring.kafka.consumer.groupId=kafka-test
spring.kafka.consumer.autoOffsetReset=latest
spring.kafka.consumer.enableAutoCommit=true

 

 

7、運行KafkaStarter.java 啟動

瀏覽器輸入:http://127.0.0.1:8080/send?params=Good

可以向主題中發送消息Good,(params參數就是發送的內容

程序中的消費者會監聽到消息並開始處理

在Kafka消費者控制台可以監聽到同樣的消息

 

同樣,在Kafka生產者控制台發送消息到主題 test-topic

程序中的消費者也會監聽到消息並開始處理。

 

結論:測試生產者和消費者功能一切正常!

 

 

 

Spring boot 集成Kafka

.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM