Kafka集群部署指南


一、前言

1、Kafka簡介

Kafka是一個開源的分布式消息引擎/消息中間件,同時Kafka也是一個流處理平台。Kakfa支持以發布/訂閱的方式在應用間傳遞消息,同時並基於消息功能添加了Kafka Connect、Kafka Streams以支持連接其他系統的數據(ElasticsearchHadoop等)

Kafka最核心的最成熟的還是他的消息引擎,所以Kafka大部分應用場景還是用來作為消息隊列削峰平谷。另外,Kafka也是目前性能最好的消息中間件。

2、Kafka架構

kafka架構圖

在Kafka集群(Cluster)中,一個Kafka節點就是一個Broker,消息由Topic來承載,可以存儲在1個或多個Partition中。發布消息的應用為Producer、消費消息的應用為Consumer,多個Consumer可以促成Consumer Group共同消費一個Topic中的消息。

概念/對象 簡單說明
Broker Kafka節點
Topic 主題,用來承載消息
Partition 分區,用於主題分片存儲
Producer 生產者,向主題發布消息的應用
Consumer 消費者,從主題訂閱消息的應用
Consumer Group 消費者組,由多個消費者組成

3、准備工作

1、Kafka服務器

准備3台CentOS服務器,並配置好靜態IP、主機名

服務器名 IP 說明
kafka01 192.168.88.51 Kafka節點1
kafka02 192.168.88.52 Kafka節點2
kafka03 192.168.88.53 Kafka節點3

軟件版本說明

說明
Linux Server CentOS 7
Kafka 2.3.0

2、ZooKeeper集群

Kakfa集群需要依賴ZooKeeper存儲Broker、Topic等信息,這里我們部署三台ZK

服務器名 IP 說明
zk01 192.168.88.21 ZooKeeper節點
zk02 192.168.88.22 ZooKeeper節點
zk03 192.168.88.23 ZooKeeper節點

部署過程參考:https://ken.io/note/zookeeper...

二、部署過程

1、應用&數據目錄

#創建應用目錄 mkdir /usr/kafka #創建Kafka數據目錄 mkdir /kafka mkdir /kafka/logs chmod 777 -R /kafka

2、下載&解壓

Kafka官方下載地址:https://kafka.apache.org/down...
這次我下載的是2.3.0版本

#創建並進入下載目錄 mkdir /home/downloads cd /home/downloads #下載安裝包 wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz #解壓到應用目錄 tar -zvxf kafka_2.12-2.3.0.tgz -C /usr/kafka
kafka_2.12-2.3.0.tgz 其中2.12是Scala編譯器的版本,2.3.0才是Kafka的版本

3、Kafka節點配置

#進入應用目錄 cd /usr/kafka/kafka_2.12-2.3.0/ #修改配置文件 vi config/server.properties 

通用配置

配置日志目錄、指定ZooKeeper服務器

# A comma separated list of directories under which to store log files log.dirs=/kafka/logs  # root directory for all kafka znodes. zookeeper.connect=192.168.88.21:2181,192.168.88.22:2181,192.168.88.23:2181

分節點配置

  • Kafka01
broker.id=0

#listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://192.168.88.51:9092 
  • Kafka02
broker.id=1

#listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://192.168.88.52:9092 
  • Kafka03
broker.id=2

#listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://192.168.88.53:9092 

4、防火牆配置

#開放端口 firewall-cmd --add-port=9092/tcp --permanent #重新加載防火牆配置 firewall-cmd --reload

5、啟動Kafka

#進入kafka根目錄 cd /usr/kafka/kafka_2.12-2.3.0/ #啟動 /bin/kafka-server-start.sh config/server.properties & #啟動成功輸出示例(最后幾行) [2019-06-26 21:48:57,183] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser) [2019-06-26 21:48:57,183] INFO Kafka startTimeMs: 1561531737175 (org.apache.kafka.common.utils.AppInfoParser) [2019-06-26 21:48:57,185] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

三、Kafka測試

1、創建Topic

在kafka01(Broker)上創建測試Tpoic:test-ken-io,這里我們指定了3個副本、1個分區

bin/kafka-topics.sh --create --bootstrap-server 192.168.88.51:9092 --replication-factor 3 --partitions 1 --topic test-ken-io

Topic在kafka01上創建后也會同步到集群中另外兩個Broker:kafka02、kafka03

2、查看Topic

我們可以通過命令列出指定Broker的

bin/kafka-topics.sh --list --bootstrap-server 192.168.88.52:9092

3、發送消息

這里我們向Broker(id=0)的Topic=test-ken-io發送消息

bin/kafka-console-producer.sh --broker-list  192.168.88.51:9092  --topic test-ken-io #消息內容 > test by ken.io

4、消費消息

在Kafka02上消費Broker03的消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning

在Kafka03上消費Broker02的消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning

然后均能收到消息

test by ken.io

這是因為這兩個消費消息的命令是建立了兩個不同的Consumer
如果我們啟動Consumer指定Consumer Group Id就可以作為一個消費組協同工,1個消息同時只會被一個Consumer消費到

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning --group testgroup_ken bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning --group testgroup_ken

四、備注

1、Kafka常用配置項說明

Kafka常用Broker配置說明:

配置項 默認值/示例值 說明
broker.id 0 Broker唯一標識
listeners PLAINTEXT://192.168.88.53:9092 監聽信息,PLAINTEXT表示明文傳輸
log.dirs kafka/logs kafka數據存放地址,可以填寫多個。用","間隔
message.max.bytes message.max.bytes 單個消息長度限制,單位是字節
num.partitions 1 默認分區數
log.flush.interval.messages Long.MaxValue 在數據被寫入到硬盤和消費者可用前最大累積的消息的數量
log.flush.interval.ms Long.MaxValue 在數據被寫入到硬盤前的最大時間
log.flush.scheduler.interval.ms Long.MaxValue 檢查數據是否要寫入到硬盤的時間間隔。
log.retention.hours 24 控制一個log保留時間,單位:小時
zookeeper.connect 192.168.88.21:2181 ZooKeeper服務器地址,多台用","間隔

2、附錄


本文首發於我的獨立博客:https://ken.io/note/kafka-cluster-deploy-guide


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM