Spring Boot Kafka 入門示例


Spring Boot 的便捷,無出其右,仍然是三部曲,創建springboot 項目,配置項目,編寫示例代碼。
安裝 Kafka 測試環境請參加: https://blog.csdn.net/fishpro/article/details/105761986

本示例代碼https://github.com/fishpro/spring-boot-study/tree/master/spring-boot-study-kafka

1 新建 Spring Boot Maven 示例工程項目

注意:是用來 IDEA 開發工具

  1. File > New > Project,如下圖選擇 Spring Initializr 然后點擊 【Next】下一步
  2. 填寫 GroupId(包名)、Artifact(項目名) 即可。點擊 下一步
    groupId=com.fishpro
    artifactId=kafka
  3. 選擇依賴 Spring Web Starter 前面打鈎。
  4. 項目名設置為 spring-boot-study-kafka.

2 引入依賴 Pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.fishpro</groupId>
    <artifactId>kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
    <repositories>
        <repository>
            <id>public</id>
            <name>aliyun nexus</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
    </repositories>

</project>


3 application 配置


spring:
  kafka:
    # 指定kafka server的地址,集群配多個,中間,逗號隔開
    bootstrap-servers: 127.0.0.1:9092
    # 生產者
    producer:
      # 寫入失敗時,重試次數。當leader節點失效,一個repli節點會替代成為leader節點,此時可能出現寫入失敗,
      # 當retris為0時,produce不會重復。retirs重發,此時repli節點完全成為leader節點,不會產生消息丟失。
      retries: 0
      # 每次批量發送消息的數量,produce積累到一定數據,一次發送
      batch-size: 16384
      # produce積累數據一次發送,緩存大小達到buffer.memory就發送數據
      buffer-memory: 33554432
      # 指定消息key和消息體的編解碼方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        linger.ms: 1
    # 消費者
    consumer:
      enable-auto-commit: false
      auto-commit-interval: 100ms
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        session.timeout.ms: 15000
      group-id: group
server:
  port: 8081

4 代碼實例

4.1 發送消息

建立文件 KafkaDemoController.java 設置為 RestController

package com.fishpro.kafka.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaDemoController {
    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;

    @GetMapping("/message/send")
    public boolean send(@RequestParam String message){
        kafkaTemplate.send("testTopic",message);
        return  true;
    }
}


4.2 接收消息

新建文件 CustomerListener 設置標簽為 @Component

package com.fishpro.kafka.listener;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class CustomerListener {

    @KafkaListener(topics="testTopic")
    public void onMessage(String message){
        System.out.println(message);
    }
}

4.3 測試發送與接收

輸入發送消息 url http://localhost:8081/message/send?message=abc
此時 CustomerListener 也會實時接收到消息。

4.4 問題

  • 出現了:springboot整合kafka出現No group.id found in consumer config
  • 原因是 未配置消費端
    # 消費者
    consumer:
      enable-auto-commit: false
      auto-commit-interval: 100ms
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        session.timeout.ms: 15000
      group-id: group

4.5 異步同步消息

建立異步消息同步消息發送


package com.fishpro.kafka.service;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@Service
public class KafkaSendService {

    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;

    /**
     * 異步示例
     * */
    public void sendAnsyc(final String topic,final String message){
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic,message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onSuccess(SendResult<String, Object> result) {
                System.out.println("發送消息成功:" + result);
            }

            @Override
            public void onFailure(Throwable ex) {
                System.out.println("發送消息失敗:"+ ex.getMessage());
            }
        });
    }

    /**
     * 同步示例
     * */
    public void sendSync(final String topic,final String message){
        ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, message);
        try {
            kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS);
            System.out.println("發送成功");
        }
        catch (ExecutionException e) {
            System.out.println("發送消息失敗:"+ e.getMessage());
        }
        catch (TimeoutException | InterruptedException e) {
            System.out.println("發送消息失敗:"+ e.getMessage());
        }
    }
}


修改 KafkaDemoController 增加異步同步消息測試

/**
 * kafka 消息發送
 * */
@RestController
public class KafkaDemoController {
    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;
    @Autowired
    private KafkaSendService kafkaSendService;

    @GetMapping("/message/send")
    public boolean send(@RequestParam String message){
        kafkaTemplate.send("testTopic",message);
        return  true;
    }

    //同步
    @GetMapping("/message/sendSync")
    public boolean sendSync(@RequestParam String message){
        kafkaSendService.sendSync("synctopic",message);
        return  true;
    }

    //異步示例
    @GetMapping("/message/sendAnsyc")
    public boolean sendAnsys(@RequestParam String message){
        kafkaSendService.sendAnsyc("ansyctopic",message);
        return  true;
    }

    //事務消息發送
//    @GetMapping("/message/sendTransaction")
//    public  boolean sendTransaction(){
//        kafkaTemplate.executeInTransaction(kafkaTemplate -> {
//            kafkaTemplate.send("transactionTopic", "TransactionMessage");
//            return true;
//        });
//        return true;
//    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM