Springboot集成KafKa


一、windows安裝ZooKeeper

1、 下載安裝包 我下的是zookeeper-3.5.9 

http://zookeeper.apache.org/releases.html#download

2、 解壓並進入ZooKeeper目錄,進入目錄中的conf文件夾

3、 將“zoo_sample.cfg”重命名為“zoo.cfg”

4、 打開“zoo.cfg”找到並編輯dataDir=D:\Kafka\zookeeper-3.4.9\tmp(必須以\分割)

5、 添加系統變量:ZOOKEEPER_HOME=D:\Kafka\zookeeper-3.4.9

6、 編輯path系統變量,添加路徑:%ZOOKEEPER_HOME%\bin

7、 在zoo.cfg文件中修改默認的Zookeeper端口(默認端口2181)

8、 打開新的cmd,輸入“zkServer“,運行Zookeeper

9、 命令行提示如下:說明本地Zookeeper啟動成功

注意:不要關了這個窗口

當然你如果不確定有沒有啟動成功

在命令行中使用netstat -ano | findstr 2181(zookeeper默認是2181端口)

二、安裝Kafka

1、 下載安裝包

http://kafka.apache.org/downloads

注意:要下載二進制版本並且版本不要太高,太高window環境可能出錯,最好不要下載3.0以上版本

2、 解壓並進入Kafka目錄,筆者:D:\Kafka\kafka_2.12-0.11.0.0

3、 進入config目錄找到文件server.properties並打開

4、 找到並編輯log.dirs=D:\Kafka\kafka_2.12-0.11.0.0\kafka-logs

5、 找到並編輯zookeeper.connect=localhost:2181

6、 Kafka會按照默認,在9092端口上運行,並連接zookeeper的默認端口:2181

7、 進入Kafka安裝目錄D:\Kafka\kafka_2.12-0.11.0.0,按下Shift+右鍵,選擇“打開命令窗口”選項,打開命令行,輸入:

.\bin\windows\kafka-server-start.bat .\config\server.properties
或bin\kafka-server-start.sh config\server.properties

 

 注意:不要關了這個窗口,啟用Kafka前請確保ZooKeeper實例已經准備好並開始運行

三、創建SpringBoot項目

1、pom.xml中添加依賴   

注意:項目版本2.5.6即可 太高也容易報錯

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <!-- swagger-api 依賴開始-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
            <!-- 排除自身依賴的1.5.20版本 -->
            <exclusions>
                <!--<exclusion>
                    <groupId>io.swagger</groupId>
                    <artifactId>swagger-annotations</artifactId>
                </exclusion>-->
                <exclusion>
                    <groupId>io.swagger</groupId>
                    <artifactId>swagger-models</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- 引入1.5.21版本,解決訪問swagger文檔報空指針異常 -->
        <dependency>
            <groupId>io.swagger</groupId>
            <artifactId>swagger-models</artifactId>
            <version>1.5.21</version>
        </dependency>
        <!-- swagger使用bootstrap-ui -->
        <dependency>
            <groupId>com.github.xiaoymin</groupId>
            <artifactId>swagger-bootstrap-ui</artifactId>
            <version>1.9.2</version>
        </dependency>
        <!--<dependency>-->
        <!--<groupId>io.springfox</groupId>-->
        <!--<artifactId>springfox-swagger-ui</artifactId>-->
        <!--<version>2.9.2</version>-->
        <!--</dependency>-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.9.7</version>
        </dependency>

2、配置application.yml文件

spring:
  kafka:
    producer:
      # 發生錯誤后,消息重發的次數。
      retries: 0
      #當有多個消息需要被發送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節數計算。
      batch-size: 16384
      # 設置生產者內存緩沖區的大
      buffer-memory: 33554432
      # 鍵的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生產者在成功寫入消息之前不會等待任何來自服務器的響應。
      # acks=1 : 只要集群的首領節點收到消息,生產者就會收到一個來自服務器成功響應。
      # acks=all :只有當所有參與復制的節點全部收到消息時,生產者才會收到一個來自服務器的成功響應。
      acks: 1
    consumer:
      # 自動提交的時間間隔 在spring boot 2.X 版本中這里采用的是值的類型為Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下該作何處理:
      # latest(默認值)在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據(在消費者啟動之后生成的記錄)
      # earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區的記錄
      auto-offset-reset: earliest
      # 是否自動提交偏移量,默認值是true,為了避免出現重復數據和數據丟失,可以把它設置為false,然后手動提交偏移量
      enable-auto-commit: false
      # 鍵的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在偵聽器容器中運行的線程數。
      concurrency: 5
      #listner負責ack,每調用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false
server:
  port: 8888

3、生產者Producer

package com.baba.kafka.controller;
@Slf4j
@RestController
@Api(value = "kafka操作類",description = "kafka操作類")
public class KafkaProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 自定義topic
     */
    public static final String TOPIC_TEST = "topic.test";

    /**
     * 組別1
     */
    public static final String TOPIC_GROUP1 = "topic.group1";

    /**
     * 組別2
     */
    public static final String TOPIC_GROUP2 = "topic.group2";

    @PostMapping(value = "/pushMessage")
    @ApiOperation(notes = "kafka推送消息",value = "kafka推送消息")
    public void pushMessage(String message){
        log.info("准備發送消息信息:{}",message);
        //發送消息
        ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(TOPIC_TEST, message);
        DateTime time= new DateTime();
        String dateTime = time.toString("yyyy-MM-dd hh:mm:ss");
        log.info("topic:{}發送消息完成,完成時間:{}",TOPIC_TEST,dateTime);
        send.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.error(TOPIC_TEST+"-生產者發送消息失敗"+throwable.getMessage());
            }

            @Override
            public void onSuccess(@Nullable SendResult<String, Object> stringObjectSendResult) {
                DateTime time= new DateTime();
                String dateTime = time.toString("yyyy-MM-dd hh:mm:ss");
                log.info(TOPIC_TEST+"-生產者發送消息成功"+stringObjectSendResult.toString()+"時間:"+dateTime);
            }
        });
    }
}

4、消費者Consumer

package com.baba.kafka.controller;
@Slf4j
@Component
public class KafkaConsumer {
    @KafkaListener(topics = KafkaProducer.TOPIC_TEST,groupId = KafkaProducer.TOPIC_GROUP1)
    public void consumerGroup1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
        DateTime time= new DateTime();
        String dateTime = time.toString("yyyy-MM-dd hh:mm:ss");
        log.info("group1接收到消息時間:{}",dateTime);
        Optional message = Optional.ofNullable(record.value());
        if(message.isPresent()){
            Object msg = message.get();
            log.info("consumerGroup1 消費了: Topic:" + topic + ",Message:" + msg);
            //手動提交偏移量
            ack.acknowledge();
        }
    }

    @KafkaListener(topics = KafkaProducer.TOPIC_TEST,groupId = KafkaProducer.TOPIC_GROUP2)
    public void consumerGroup2(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
        DateTime time= new DateTime();
        String dateTime = time.toString("yyyy-MM-dd hh:mm:ss");
        log.info("group2接收到消息時間:{}",dateTime);
        Optional message = Optional.ofNullable(record.value());
        if(message.isPresent()){
            Object msg = message.get();
            log.info("consumerGroup2 消費了:Topic:" + topic + ",Message:" + msg);
            //手動提交偏移量
            ack.acknowledge();
        }
    }
}

5、Swagger配置  

package com.baba.kafka.swagger;

@Configuration
@EnableSwagger2
public class Swagger2 {

    @Bean
    public Docket createRestApi() {
        return new Docket(DocumentationType.SWAGGER_2)
                .apiInfo(apiInfo())
                .select()
                .apis(RequestHandlerSelectors.basePackage("com.baba.kafka.controller"))
                .paths(PathSelectors.any())
                .build();
    }

    private ApiInfo apiInfo() {
        return new ApiInfoBuilder()
                .title("KafKa測試接口")
                .version("1.0")
                .build();
    }

}

6、swagger測試

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM