SpringBoot進階教程(六十二)整合Kafka


在上一篇文章《Linux安裝Kafka》中,已經介紹了如何在Linux安裝Kafka,以及Kafka的啟動/關閉和創建發話題並產生消息和消費消息。這篇文章就介紹介紹SpringBoot整合Kafka。

v創建項目

若是已有的項目中添加kafka, 請直接跳至1.3

1.1 創建springboot:

SpringBoot進階教程(六十二)整合Kafka

SpringBoot進階教程(六十二)整合Kafka

1.2 選web和kafka:

SpringBoot進階教程(六十二)整合Kafka

SpringBoot進階教程(六十二)整合Kafka

1.3 已有的項目中添加kafka, pom.xml中添加依賴

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

1.4 整體架構目錄:

SpringBoot進階教程(六十二)整合Kafka

v配置項目

2.1 設置application.yml

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092 #指定kafka server的地址,集群配多個,中間,逗號隔開
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default_consumer_group #群組ID
      enable-auto-commit: true
      auto-commit-interval: 1000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
server:
  port: 8500

項目默認生成的是applicaiton.properties,直接重命名修改文件后綴名為yml即可。

2.2 添加生產者ProducerController

package com.toutou.Controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author toutou
 * @date by 2019/08
 */
@RestController
public class ProducerController {
    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;
    @RequestMapping("message/send")
    public String send(String msg){
        kafkaTemplate.send("demo", msg); //使用kafka模板發送信息
        return "success";
    }
}

2.3 添加消費者ConsumerDemo

package com.toutou.Consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @author toutou
 * 監聽服務器上的kafka是否有相關的消息發過來
 * @date by 2019/08
 */
@Component
public class ConsumerDemo {
    /**
     * 定義此消費者接收topics = "demo"的消息,與controller中的topic對應上即可
     * @param record 變量代表消息本身,可以通過ConsumerRecord<?,?>類型的record變量來打印接收的消息的各種信息
     */
    @KafkaListener(topics = "demo")
    public void listen (ConsumerRecord<?, ?> record){
        System.out.printf("topic is %s, offset is %d, value is %s \n", record.topic(), record.offset(), record.value());
    }
}

v啟動測試

3.1 測試生產者

SpringBoot進階教程(六十二)整合Kafka

3.2 消費者效果

SpringBoot進階教程(六十二)整合Kafka

v源碼地址

https://github.com/toutouge/javademosecond/tree/master/hellokafka


作  者:請叫我頭頭哥
出  處:http://www.cnblogs.com/toutou/
關於作者:專注於基礎平台的項目開發。如有問題或建議,請多多賜教!
版權聲明:本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。
特此聲明:所有評論和私信都會在第一時間回復。也歡迎園子的大大們指正錯誤,共同進步。或者直接私信
聲援博主:如果您覺得文章對您有幫助,可以點擊文章右下角推薦一下。您的鼓勵是作者堅持原創和持續寫作的最大動力!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM