Kafka集群部署
部署Kafka之前必須先部署好zookeeper
1. zookeeper分布式安裝部署
1.1 集群規划
在master、slave01和slave02三個節點上部署Zookeeper。
1.2 解壓安裝
(1)進入存放zookeeper安裝包目錄,解壓Zookeeper安裝包到/opt/module/目錄下
tar -zxvf zookeeper-3.4.5.tar.gz -C /opt/module/
(2)解壓后文件名修改為zookeeper
mv zookeeper-3.4.5 zookeeper
1.3 設置zookeeper環境變量
命令:
vi /root/.bash_profile
加入下面內容:
export ZOOKEEPER_HOME=/opt/module/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin
使設置立即生效:
source /root/.bash_profile
復制環境變量到slave01、slave01節點
slave01節點:
scp -r /root/.bash_profile root@slave01:/root/.bash_profile
slave02節點:
scp -r /root/.bash_profile root@slave02:/root/.bash_profile
1.4 配置服務器編號
(1)在/opt/module/zookeeper/這個目錄下創建zkData
mkdir zkData
(2)在/opt/module/zookeeper/zkData目錄下創建一個myid的文件
touch myid
(3)編輯myid文件
vi myid
在文件中添加與server對應的編號:
2
1.5 配置zoo.cfg文件
(1)重命名/opt/module/zookeeper/conf這個目錄下的zoo_sample.cfg為zoo.cfg
mv zoo_sample.cfg zoo.cfg
(2)打開zoo.cfg文件
vi zoo.cfg
修改數據存儲路徑配置
dataDir=/opt/module/zookeeper/zkData
增加如下配置
server.2=master:2888:3888
server.3=slave01:2888:3888
server.4=slave02:2888:3888
(3)復制配置好的zookeeper到其他節點上
slave01節點:
scp -r /opt/module/zookeeper/ root@slave01:/opt/module/zookeeper/
slave02節點:
scp -r /opt/module/zookeeper/ root@slave02:/opt/module/zookeeper/
並分別在slave01、slave02上修改myid文件中內容為3、4
1.6 集群操作
(1)分別啟動三個節點的Zookeeper
master節點:
[root@master zookeeper]# bin/zkServer.sh start
slave01節點:
[root@slave01 zookeeper]# bin/zkServer.sh start
slave02節點:
[root@slave02 zookeeper]# bin/zkServer.sh start
(2)查看進程是否啟動
jps
(3)查看三個節點的狀態
master節點 :
[root@master zookeeper]# bin/zkServer.sh status
slave01節點:
[root@slave01 zookeeper]# bin/zkServer.sh status
slave02節點:
[root@slave02 zookeeper]# bin/zkServer.sh status
(4)啟動客戶端:
[root@master zookeeper]# bin/zkCli.sh
(5)退出客戶端:
[zk: localhost:2181(CONNECTED) 0] quit
(6)停止Zookeeper
[root@master zookeeper]# bin/zkServer.sh stop
2. Kafka組件的安裝與部署
2.1 Kafka下載地址
http://kafka.apache.org/downloads.html
這里的Kafka用的版本是:kafka_2.11-0.11.0.2.tgz
2.2 集群規划
在master、slave01和slave02三個節點上部署Kafka。
2.3 Kafka集群部署
(1)進入存放Kafka安裝包目錄,解壓安裝包到指定路徑
tar -zxvf kafka_2.11-0.11.0.2.tgz -C /opt/module/
修改解壓后的文件名稱
mv kafka_2.11-0.11.0.2 kafka
(2)配置環境變量
命令:
vi /root/.bash_profile
加入下面內容:
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
使設置立即生效:
source /root/.bash_profile
復制環境變量到slave01、slave01節點
slave01節點:
scp -r /root/.bash_profile root@slave01:/root/.bash_profile
slave02節點:
scp -r /root/.bash_profile root@slave02:/root/.bash_profile
(2)在/opt/module/kafka目錄下創建logs文件夾
mkdir logs
(3)修改配置文件
進入/opt/module/kafka/config/目錄,修改server.properties文件
[root@master config]# vi server.properties
輸入以下內容(里面等號前面的內容已存在,只需修改后面的內容即可):
#broker的全局唯一編號,不能重復
broker.id=0
#刪除topic功能使能
delete.topic.enable=true
#處理網絡請求的線程數量
num.network.threads=3
#用來處理磁盤IO的現成數量
num.io.threads=8
#發送套接字的緩沖區大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區大小
socket.receive.buffer.bytes=102400
#請求套接字的緩沖區大小
socket.request.max.bytes=104857600
#kafka運行日志存放的路徑
log.dirs=/opt/module/kafka/logs
#topic在當前broker上的分區個數
num.partitions=1
#用來恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長時間,超時將被刪除
log.retention.hours=168
#配置連接Zookeeper集群地址
zookeeper.connect=master:2181,master:2181,master:2181
(5)復制配置好的Kafka到其他節點上
slave01節點:
scp -r /opt/module/kafka/ root@slave01:/opt/module/kafka/
slave02節點:
scp -r /opt/module/kafka/ root@slave02:/opt/module/kafka/
(6)分別在slave01和slave02上修改配置文件/opt/module/kafka/config/server.properties中的broker.id
slave01:broker.id=1
slave02:broker.id=2
注:broker.id不得重復
(7)啟動集群
依次在master、slave01、slave02節點上啟動kafka
[root@master kafka]# bin/kafka-server-start.sh -daemon config/server.properties
(8)關閉集群命令
[root@master kafka]# bin/kafka-server-stop.sh
2.4 啟動后,查看zookeeper集群鏈接情況
(1)進入zookeeper目錄下
啟動zookeeper客戶端:
[root@master zookeeper]# bin/zkCli.sh
查看
[zk: localhost:2181(CONNECTED) 0] ls /
查看連接情況
[zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
如圖所示
2.5 Kafka命令行操作
(1)查看當前服務器中的所有topic
[root@master kafka]# bin/kafka-topics.sh --zookeeper master:2181 --list
(2)任意一台機器創建topic
[root@master kafka]# bin/kafka-topics.sh --zookeeper master:2181 --create --replication-factor 1 --partitions 1 --topic test
## 說明
master:2181:這是zookeeper服務器名+端口號
test:是topic的名字
如圖所示

(3)使用任意一台Kafka服務器做生產者(這里使用的是master節點)
[root@master kafka]# bin/kafka-console-producer.sh --broker-list 192.168.1.110:9092 --topic test
(4)使用其他兩台Kafka消費
slave01節點:
[root@slave01 kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.111:9092 --topic test --from-beginning
slave02節點:
[root@slave02 kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.112:9092 --topic test --from-beginning
3. 刪除kafka的topic
登錄zookeeper客戶端
bin/zkCli.sh
找到topic所在的目錄
ls /brokers/topics
找到要刪除的topic執行如下命令
rmr /brokers/topics/topic名稱