kafka詳解(02) - kafka_2.11-2.4.1安裝部署
環境准備
下載安裝包
官網下載地址:https://kafka.apache.org/downloads.html
2.4.1版本下載地址:https://archive.apache.org/dist/kafka/2.4.1/kafka_2.11-2.4.1.tgz
Kafka安裝包版本使用scala_2.11的版本,即kafka_2.11-kafka版本號.tgz的安裝包,經測試scala_2.12的版本在增加topic時非常的慢,具體原因未定位, scala_2.13版本的未測試過
集群規划
hadoop102 |
hadoop103 |
hadoop104 |
zk |
zk |
zk |
kafka |
kafka |
kafka |
Zookeeper環境
Kafka集群需要依賴zookeeper環境,zookeeper安裝請參考文檔《Zookeeper詳解(02) - zookeeper安裝部署-單機模式-集群模式》
集群部署
上傳解壓
將安裝包kafka_2.11-2.4.1.tgz上傳到服務器的/opt/software並解壓到指定目錄
[hadoop@hadoop102 software]$ tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/
修改解壓后的文件夾名稱
[hadoop@hadoop102 software]$ cd /opt/module/
[hadoop@hadoop102 module]$ mv kafka_2.11-2.4.1/ kafka
在/opt/module/kafka目錄下創建logs文件夾
[hadoop@hadoop102 module]$ cd kafka/
[hadoop@hadoop102 kafka]$ mkdir logs
修改配置文件
[hadoop@hadoop102 kafka]$ cd config/
[hadoop@hadoop102 config]$ vi server.properties
修改以下內容:
#broker的全局唯一編號,不能重復
broker.id=0
#刪除topic功能使能,當前版本此配置默認為true,已從配置文件移除
delete.topic.enable=true
#處理網絡請求的線程數量
num.network.threads=3
#用來處理磁盤IO的線程數量
num.io.threads=8
#發送套接字的緩沖區大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區大小
socket.receive.buffer.bytes=102400
#請求套接字的緩沖區大小
socket.request.max.bytes=104857600
#kafka運行日志存放的路徑
log.dirs=/opt/module/kafka/logs
#topic在當前broker上的分區個數
num.partitions=1
#用來恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長時間,超時將被刪除
log.retention.hours=168
#配置連接Zookeeper集群地址,zookeeper的host必須要用主機名或hosts中配置的映射,不能直接使用ip地址,直接使用ip會出現無法連接的現象
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
分發安裝包
[hadoop@hadoop102 module]$ scp -r kafka/ hadoop103:/opt/module/
[hadoop@hadoop102 module]$ scp -r kafka/ hadoop104:/opt/module/
配置環境變量(可選)
sudo vim /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
立即生效
source /etc/profile
修改hadoop103和hadoop104上的broker.id
分別在hadoop103和hadoop104上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2
集群中每個節點的broker.id不得重復
集群啟停
先啟動Zookeeper集群
[hadoop@hadoop102 module]$ myzookeeper.sh start
依次在hadoop102、hadoop103、hadoop104節點上啟動kafka
[hadoop@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[hadoop@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[hadoop@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
查看進程
[hadoop@hadoop102 kafka]$ jpsall
=============== hadoop102 ===============
1237 QuorumPeerMain
1610 Kafka
=============== hadoop103 ===============
1236 QuorumPeerMain
1612 Kafka
=============== hadoop104 ===============
1603 Kafka
1236 QuorumPeerMain
停止kafka集群
[hadoop@hadoop102 kafka]$ bin/kafka-server-stop.sh stop
[hadoop@hadoop103 kafka]$ bin/kafka-server-stop.sh stop
[hadoop@hadoop104 kafka]$ bin/kafka-server-stop.sh stop
kafka群起腳本
[hadoop@hadoop102 kafka]$ vi /home/hadoop/bin/mykafka.sh
添加如下內容
#!/bin/bash
if [ $# -lt 1 ]
then
echo "Input Args Error....."
exit
fi
for i in hadoop102 hadoop103 hadoop104
do
case $1 in
start)
echo "==================START $i KAFKA==================="
ssh $i /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
;;
stop)
echo "==================STOP $i KAFKA==================="
ssh $i /opt/module/kafka/bin/kafka-server-stop.sh stop
;;
*)
echo "Input Args Error....."
exit
;;
esac
done
添加可執行權限
[hadoop@hadoop102 kafka]$ chmod +x /home/hadoop/bin/mykafka.sh
使用腳本啟動kafka集群
[hadoop@hadoop102 kafka]$ mykafka.sh start
使用腳本停止kafka集群
[hadoop@hadoop102 kafka]$ mykafka.sh stop
Kafka基礎命令行操作
1)創建topic
[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic test
選項說明:
--topic 定義topic名
--replication-factor 定義副本數
--partitions 定義分區數
2)查看當前服務器中的所有topic
[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
3)向topic中發送消息
[hadoop@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic test
>hello kafka
>hello hadoop
4)從指定topic中消費消息
從此刻最新發送到消息開始消費
[hadoop@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic test
加--from-beginning:會把主題中現有的所有的數據都讀取出來
[hadoop@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic test
5)查看某個Topic的詳情
[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe –-topic test
6)修改分區數
bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter –-topic test --partitions 6
7)刪除topic
[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic test