hadoop3.22,flink1.13,kafka2.7.0,zookeeper3.6.3等安裝包……
下載地址:
鏈接:https://pan.baidu.com/s/1eHKkQw2ymnfq2p1XIzOCjw
提取碼:i29n
其他集群搭建:
1.准備工作
三台虛擬機分別為:Flink1,Flink2,Flink3
配置好了java環境與免密登錄以及映射。
按照下面的配置以及上面的組件版本,已經搭建成功,可以放心往下看。
2.kafka集群搭建
2.1解壓安裝包
使用xftp上傳安裝包並進入相應目錄解壓
tar -zvxf kafka_2.12-2.7.0.tgz -C /opt/module
2.2配置環境變量
此步驟三台虛擬機均需要
vim /etc/profile export KAFKA_HOME=/opt/module/kafka_2.12-2.7.0 export PATH=$PATH:$KAFKA_HOME/bin
使環境變量生效
source /etc/profile
2.3創建存放消息的文件夾
cd /opt/module/kafka_2.12-2.7.0 mkdir logs
2.4修改配置文件
cd /opt/module/kafka_2.12-2.7.0/config vim server.properties
修改或增加以下地方:
broker.id=0 delete.topic.enable=true listeners=PLAINTEXT://Flink1:9092 log.dirs=/opt/module/kafka_2.12-2.7.0/logs zookeeper.connect=Flink1:2181,Flink2:2181,Flink3:2181 unclean.leader.election.enable=false
broder.id:broder的唯一標識
delete.topic.enable:是否可以刪除主題
listener:該參數用於客戶端和broker進行連接,用來綁定私網ip,如果想綁定公網ip,應該設置的是:advertised.listeners這個參數。
log.dirs:kafka消息持久化的路徑
zookeeper.connect:zk節點,這里注意,應該配置全部的zk節點
unclean.leader.election.enable:消息同步落后太多的副本可不可以進行leader選舉
2.5同步到其他機器並修改配置文件
此處用scp指令,配置了免密就不用輸入密碼
scp -r kafka_2.12-2.7.0/ root@Flink2:/opt/module scp -r kafka_2.12-2.7.0/ root@Flink3:/opt/module
修改Flink2的server.properties:
broker.id=1
listeners=PLAINTEXT://Flink2:9092
修改Flink2的server.properties:
broker.id=2
listeners=PLAINTEXT://Flink3:9092
2.6編寫批量操作腳本
使用Flink1,進入/opt/module/kafka_2.12-2.7.0/bin,編寫腳本
啟動kafka集群:
vim start-kafka-cluster.sh
#!/bin/bash brokers="Flink1 Flink2 Flink3" KAFKA_HOME="/opt/module/kafka_2.12-2.7.0" KAFKA_NAME="kafka_2.12-2.7.0" echo "INFO : Begin to start kafka cluster ..." for broker in $brokers do echo "INFO : Starting ${KAFKA_NAME} on ${broker} ..." ssh ${broker} -C "source /etc/profile; sh ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties" if [[ $? -eq 0 ]]; then echo "INFO:[${broker}] Start successfully" fi done echo "INFO:Kafka cluster starts successfully !"
關閉kafka集群:
vim stop-kafka-cluster.sh
#!/bin/bash brokers="Flink1 Flink2 Flink3" KAFKA_HOME="/opt/module/kafka_2.12-2.7.0" KAFKA_NAME="kafka_2.12-2.7.0" echo "INFO : Begin to stop kafka cluster ..." for broker in $brokers do echo "INFO : Shut down ${KAFKA_NAME} on ${broker} ..." ssh ${broker} "source /etc/profile;bash ${KAFKA_HOME}/bin/kafka-server-stop.sh" if [[ $? -ne 0 ]]; then echo "INFO : Shut down ${KAFKA_NAME} on ${broker} is down" fi done echo "INFO : kafka cluster shut down completed!"
記得修改主機名稱。
2.7啟動kafka集群
start-kafka-cluster.sh