SpringBoot實戰(十四)之整合KafKa


 本人今天上午參考了不少博文,發現不少博文不是特別好,不是因為依賴沖突問題就是因為版本問題。

於是我結合相關的博文和案例,自己改寫了下並參考了下,於是就有了這篇文章。希望能夠給大家幫助,少走一些彎路。

 

一、KafKa的介紹

1.主要功能

根據官網的介紹,ApacheKafka®是一個分布式流媒體平台,它主要有3種功能:

  a.發布和訂閱消息流,這個功能類似於消息隊列,這也是kafka歸類為消息隊列框架的原因。

  b.以容錯的方式記錄消息流,kafka以文件的方式來存儲消息流。

  c.可以再消息發布的時候進行處理。

 

2.使用場景

a.在系統或應用程序之間構建可靠的用於傳輸實時數據的管道,消息隊列功能。

b.構建實時的流數據處理程序來變換或處理數據流,數據處理功能。

 

3.詳細介紹

 Kafka目前主要作為一個分布式的發布訂閱式的消息系統使用,下面簡單介紹一下kafka的基本機制

消息傳輸過程:

 

Producer即生產者,向Kafka集群發送消息,在發送消息之前,會對消息進行分類,即Topic,上圖展示了兩個producer發送了分類為topic1的消息,另外一個發送了topic2的消息。

 

Topic即主題,通過對消息指定主題可以將消息分類,消費者可以只關注自己需要的Topic中的消息

 

Consumer即消費者,消費者通過與kafka集群建立長連接的方式,不斷地從集群中拉取消息,然后可以對這些消息進行處理。

 

二、安裝

安裝包下載地址:http://kafka.apache.org/downloads

找到0.11.0.1版本,如圖:

1.下載

wget https://archive.apache.org/dist/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz

 

2.解壓

tar -xzvf kafka_2.11-0.11.0.1.tgz

配置說明:

    consumer.properites 消費者配置,這個配置文件用於配置開啟的消費者,此處我們使用默認的即可。

    producer.properties 生產者配置,這個配置文件用於配置開啟的生產者,此處我們使用默認的即可。

  server.properties kafka服務器的配置,此配置文件用來配置kafka服務器,目前僅介紹幾個最基礎的配置。

       a.broker.id 申明當前kafka服務器在集群中的唯一ID,需配置為integer,並且集群中的每一個kafka服務器的id都應是唯一的,我們這里采用默認配置即可。

       b.listeners 申明此kafka服務器需要監聽的端口號,如果是在本機上跑虛擬機運行可以不用配置本項,默認會使用localhost的地址,如果是在遠程服務器上運行則必須配置,

例如:listeners=PLAINTEXT:// 192.168.126.143:9092。並確保服務器的9092端口能夠訪問。

  c.zookeeper.connect 申明kafka所連接的zookeeper的地址 ,需配置為zookeeper的地址,由於本次使用的是kafka高版本中自帶zookeeper,

使用默認配置即可,zookeeper.connect=localhost:2181。

 

3.運行

首先運行zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

運行成功,顯示如圖:

 

然后運行kafka

bin/kafka-server-start.sh config/server.properties

 運行成功,顯示如圖:

 

三、整合KafKa

1.新建Maven項目導入Maven依賴

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>cn.test</groupId>
  <artifactId>kafka_demo</artifactId>
  <version>0.0.1-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.9.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.1.1.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.2</version>
        </dependency>

    </dependencies>
 
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            
   <!-- 指定編譯版本 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
   
        
    
            
    
        
        <finalName>${project.artifactId}</finalName>
            

    </build>

   
</project>

 

2.編寫消息實體

package com.springboot.kafka.bean;


import java.util.Date;

import lombok.Data;
 


@Data
public class Message {
    private Long id;    //id

    private String msg; //消息

    private Date sendTime;  //時間戳

}

 有了lombok,每次編寫實體不必要使用快捷鍵生成seter或geter方法了,代碼看起來更加簡潔了。

 

3.編寫消息發送者(可以理解為生產者,最好聯系詳細介紹中的圖)

package com.springboot.kafka.producer;

import java.util.Date;
import java.util.UUID;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.springboot.kafka.bean.Message;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class KafkaSender {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private Gson gson = new GsonBuilder().create();

    //發送消息方法
    public void send() {
        Message message = new Message();
        message.setId(System.currentTimeMillis());
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));
        kafkaTemplate.send("zhisheng", gson.toJson(message));
    }
}

 

4.編寫消息接收者(可以理解為消費者)

package com.springboot.kafka.producer;

import java.util.Date;
import java.util.UUID;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.springboot.kafka.bean.Message;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class KafkaSender {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private Gson gson = new GsonBuilder().create();

    //發送消息方法
    public void send() {
        Message message = new Message();
        message.setId(System.currentTimeMillis());
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));
        kafkaTemplate.send("zhisheng", gson.toJson(message));
    }
}

 

5.編寫啟動類

package com.springboot.kafka;
 


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

import com.springboot.kafka.producer.KafkaSender;


@SpringBootApplication
public class KafkaApplication {

    public static void main(String[] args) {

        ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);

        KafkaSender sender = context.getBean(KafkaSender.class);

        for (int i = 0; i < 3; i++) {
            //調用消息發送類中的消息發送方法
            sender.send();

            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

 

 

6.編寫application.properties配置文件

#============== kafka ===================
# \u6307\u5B9Akafka \u4EE3\u7406\u5730\u5740\uFF0C\u53EF\u4EE5\u591A\u4E2A
spring.kafka.bootstrap-servers=192.168.126.143:9092

#=============== provider  =======================

spring.kafka.producer.retries=0
# \u6BCF\u6B21\u6279\u91CF\u53D1\u9001\u6D88\u606F\u7684\u6570\u91CF
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432

# \u6307\u5B9A\u6D88\u606Fkey\u548C\u6D88\u606F\u4F53\u7684\u7F16\u89E3\u7801\u65B9\u5F0F
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
# \u6307\u5B9A\u9ED8\u8BA4\u6D88\u8D39\u8005group id
spring.kafka.consumer.group-id=test-consumer-group

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

# \u6307\u5B9A\u6D88\u606Fkey\u548C\u6D88\u606F\u4F53\u7684\u7F16\u89E3\u7801\u65B9\u5F0F
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

7.運行結果

 

示例代碼地址:https://github.com/youcong1996/study_simple_demo/tree/kafka_demo

如果按照上述流程沒有達到預計的效果可以git clone到本地。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM