kafka集群搭建及結合springboot使用


1.場景描述

因kafka以前用的不多,只往topic中寫入和讀取過數據,這次剛好又要用到,記錄下kafka集群搭建及結合springboot使用。

2. 解決方案

2.1 簡單介紹

(一)關於kafka,網上的介紹有很多,簡單說就是消息中間件,大數據項目中經常使用,我們項目是用於接收日志流水數據。

(二)關於消息中間件,主要有四個:

(1)ActiveMQ:歷史悠久,以前項目中使用多,現在更新慢,性能相對不高。
(2)RabbitMQ:可靠性高、安全,模式比較多,java使用比較多,每秒十萬級別
(3)Kafka:分布式、高性能、跨語言,性能超高,每秒百萬級別,模式簡單。
(4)RocketMQ:阿里開源的消息中間件,純Java實現,有商業版,收費,導致推廣一般。

(三)kafka與其他三個相比,優勢在於:

(1)性能高,每秒百萬級別;

(2)分布式,高可用,水平擴展。

(四) kafka官網圖

有中文官網,可以詳細看看。

地址:http://kafka.apachecn.org/intro.html

2.2 軟件下載

2.2.1 kakfa下載

地址:http://kafka.apache.org/downloads,下載最新的2.4.1。

2.2.2 zookeeper下載

(1)因為kafka要依賴於zookeeper做調度,kafka中實際自帶的有kafka,但是一般建議使用獨立的zookeeper,方便后續升級及公用。

(2)下載地址:

http://zookeeper.apache.org/,最新的是3.6.0,不過發布不久,建議先跟kafka內置zookeeper保持一致,使用3.5.7版本

2.2.3 下載說明

文件都不大,zk是9m多,kafka是50多兆

2.3 kafka單機部署及集群部署

說明:軟件老王本地弄了三台虛擬機,ip分別為:

192.168.85.158
192.168.85.168
192.168.85.178
2.3.1 單機部署

(1)上傳jar包,就不再新建用戶了,直接在root賬戶下執行,將kafka和zookeeper的tar包上傳到/root/tools目錄下。

(2)解壓

[root@ruanjianlaowang158 tools]# tar -zxvf kafka_2.12-2.4.1.tgz 
[root@ruanjianlaowang158 tools]# tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz  

(3)配置zookeeper及啟動

[root@ruanjianlaowang158 apache-zookeeper-3.5.7-bin]# cd /root/tools/apache-zookeeper-3.5.7-bin
#軟件老王,首先創建個空文件夾,在接下來的配置文件中配置
[root@ruanjianlaowang158 apache-zookeeper-3.5.7-bin]# mkdir data
[root@ruanjianlaowang158 conf]# cd /root/tools/apache-zookeeper-3.5.7-bin/conf
[root@ruanjianlaowang158 conf]# cp zoo_sample.cfg  zoo.cfg 
[root@ruanjianlaowang158 conf]# vi  zoo.cfg 
#單機只改一個值,保存退出。
#dataDir=/tmp/zookeeper
dataDir=/root/tools/apache-zookeeper-3.5.7-bin/data

#啟動zookeeper
[root@ruanjianlaowang158 bin]# cd /root/tools/apache-zookeeper-3.5.7-bin/bin
[root@ruanjianlaowang158 bin]# ./zkServer.sh  start

(4)配置kafka及啟動

[root@ruanjianlaowang158 kafka_2.12-2.4.1]# cd /root/tools/kafka_2.12-2.4.1

#軟件老王,新建個空文件夾
[root@ruanjianlaowang158 kafka_2.12-2.4.1]# mkdir data

#軟件老王,更改配置文件
[root@ruanjianlaowang158 config]# cd /root/tools/kafka_2.12-2.4.1/config
[root@ruanjianlaowang158 config]# vi server.properties 

#需要改3個值
#log.dirs=/tmp/kafka-logs
log.dirs=/root/tools/kafka_2.12-2.4.1/data
#listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.85.158:9092
#zookeeper.connect=localhost:2181
zookeeper.connect=192.168.85.158:2181

#啟動kafka
[root@ruanjianlaowang158 bin]# cd /root/tools/kafka_2.12-2.4.1/bin
[root@ruanjianlaowang158 bin]# ./zookeeper-server-start.sh ../config/server.properties &

啟動完畢,單機驗證就不驗證了,直接在集群中進行驗證。

2.3.2 集群部署

(1)集群方式,首先把上面的單機模式,再在192.168.85.168和192.168.85.178服務器上先解壓配置一遍。

(2)zookeeper是還是更改zoo.cfg

158,168,178三台服務器一樣:

[root@ruanjianlaowang158 conf]# cd /root/tools/apache-zookeeper-3.5.7-bin/conf

[root@ruanjianlaowang158 conf]# vi zoo.cfg
#其他不變,最后面新加,三行,三台服務器配置一樣,軟件老王
server.1=192.168.85.158:2888:3888
server.2=192.168.85.168:2888:3888
server.3=192.168.85.178:2888:3888

158服務器執行:
echo "1" > /root/tools/apache-zookeeper-3.5.7-bin/data/myid
168服務器執行:
echo "2" > /root/tools/apache-zookeeper-3.5.7-bin/data/myid
178服務器執行:
echo "3" > /root/tools/apache-zookeeper-3.5.7-bin/data/myid

(3)kafka集群配置

[root@ruanjianlaowang158 config]# cd /root/tools/kafka_2.12-2.4.1/config
[root@ruanjianlaowang158 config]# vi server.properties 
#broker.id 三台服務器不一樣,158服務器設置為1,168服務器設置為2,178服務器設置為3 

broker.id=1
#三個服務器配置一樣
zookeeper.connect=192.168.85.158:2181,192.168.85.168:2181,192.168.85.178:2181

Kafka常用Broker配置說明:

配置項 默認值/示例值 說明
broker.id 0 Broker唯一標識
listeners PLAINTEXT://192.168.85.158:9092 監聽信息,PLAINTEXT表示明文傳輸
log.dirs /root/tools/apache-zookeeper-3.5.7-bin/data kafka數據存放地址,可以填寫多個。用","間隔
message.max.bytes message.max.bytes 單個消息長度限制,單位是字節
num.partitions 1 默認分區數
log.flush.interval.messages Long.MaxValue 在數據被寫入到硬盤和消費者可用前最大累積的消息的數量
log.flush.interval.ms Long.MaxValue 在數據被寫入到硬盤前的最大時間
log.flush.scheduler.interval.ms Long.MaxValue 檢查數據是否要寫入到硬盤的時間間隔。
log.retention.hours 24 控制一個log保留時間,單位:小時
zookeeper.connect 192.168.85.158:2181,
192.168.85.168:2181,
192.168.85.178:2181
ZooKeeper服務器地址,多台用","間隔

(4)集群啟動

啟動方式跟單機一樣:

#啟動zookeeper
[root@ruanjianlaowang158 bin]# cd /root/tools/apache-zookeeper-3.5.7-bin/bin
[root@ruanjianlaowang158 bin]# ./zkServer.sh  start

#啟動kafka
[root@ruanjianlaowang158 bin]# cd /root/tools/kafka_2.12-2.4.1/bin
[root@ruanjianlaowang158 bin]# ./zookeeper-server-start.sh ../config/server.properties &

(5)注意點

集群啟動的時候,單機那台服務器(158)可能會報:Kafka:Configured broker.id 2 doesn't match stored broker.id 0 in meta.properties.
方案:在158服務器data中有個文件:meta.properties,文件中的broker.id也需要修改成與server.properties中的broker.id一樣,所以造成了這個問題。

(6)創建個topic,后面springboot項目測試使用。

[root@ruanjianlaowang158 bin]# cd /root/tools/kafka_2.12-2.4.1/bin
[root@ruanjianlaowang158 bin]# ./kafka-topics.sh --create --zookeeper 192.168.85.158:2181,192.168.85.168:2181,192.168.85.178:2181 --replication-factor 3 --partitions 5 --topic aaaa

2.4 結合springboot項目

2.4.1 pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.itany</groupId>
    <artifactId>kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

說明:

主要就兩個gav,一個是spring-boot-starter-web,啟動web服務使用;一個是spring-kafka,這個是springboot集成額kafka核心包。

2.4.2 application.yml
spring:
  kafka:
    # 軟件老王,kafka集群服務器地址
    bootstrap-servers: 192.168.85.158:9092,192.168.85.168:9092,192.168.85.178:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
2.4.3 producer(消息生產者)
@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate template;
 		//軟件老王,topic使用上測試創建的aaaa
    @RequestMapping("/sendMsg")
    public String sendMsg(String topic, String message){
        template.send(topic,message);
        return "success";
    }
}
2.3.4 consumer(消費者)
@Component
public class KafkaConsumer {
   //軟件老王,這里是監控aaaa這個topic,直接打印到idea中,軟件老王
    @KafkaListener(topics = {"aaaa"})
    public void listen(ConsumerRecord record){
        System.out.println(record.topic()+":"+record.value());
    }
}
2.4.5 驗證結果

(1)瀏覽器上輸入

http://localhost:8080/sendMsg?topic=aaaa&message=bbbb

(2)軟件老王的idea控制台打印信息


I’m 「軟件老王」,如果覺得還可以的話,關注下唄,后續更新秒知!歡迎討論區、同名公眾號留言交流!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM