做性能測試時,kafka消息隊列比較長,讓程序自己消費完畢需要等待很長時間。就需要快速清理kafka隊列
清理方式把 這kafak manager + zookeeper + kafka 這些應用情況,復制成一個備份文件夾。當需要清理時,把在使用的kafka +zookeeper 文件夾刪除,在從備份文件還原回去。
這樣就作成幾個腳本
1、清理環境clen_envionment.sh, 這個只執行一次,需要把 kafka 和zookeeper的路徑填寫正確。如果存在備份文件就不需要執行這個腳本,
#要先導出kafka topic
#========zookeeper
#stop zookeeper
/app/zookeeper/bin/zkServer.sh stop;
ps -ef|grep zookeeper |grep -v grep|awk '{print $2}'|xargs kill -9;
ps -aef |grep zookeeper;
#clean zookeeper data
ls -l /app/zookeeper/data/version-2/;
rm -rf /app/zookeeper/data/version-2/;
rm -rf /app/zookeeper/logs/*;
rm -rf /app/zookeeper_backup;
rm -rf /app/zookeeper_org;
sleep 5;
#backup zookeeper file
cp -rp /app/zookeeper/ /app/zookeeper_backup;
cp -rp /app/zookeeper/ /app/zookeeper_org;
ps -aef |grep zookeeper;
# stop kafka
/app/kafka_cluster/bin/kafka-server-stop.sh ;
# 如果還存在,繼續殺死
ps -ef|grep kafka_cluster |grep -v grep|awk '{print $2}'|xargs kill -9;
ps -aef |grep kafka_cluster;
#clean tow log folder
rm -rf /app/kafka_cluster/kafka-logs/*;
rm -rf /app/kafka_cluster/logs/*;
rm -rf /app/kafka_cluster_org/;
rm -rf /app/kafka_cluster_backup/;
sleep 5;
#backup kafka file
cp -rp /app/kafka_cluster/ /app/kafka_cluster_org/;
cp -rp /app/kafka_cluster/ /app/kafka_cluster_backup/;
2、初始化zookeeper腳本
#!/bin/bash
echo "停止zookeeper服務......"
/app/zookeeper/bin/zkServer.sh stop
sleep 3
echo "初始化zookeeper安裝目錄......"
rm -rf /app/zookeeper
cp -rp /app/zookeeper_backup /app/zookeeper
echo "啟動zookeeper服務......"
/app/zookeeper/bin/zkServer.sh start
result=`ps -ef|grep "/app/zookeeper/"|grep -v grep|awk '{print $2}'`
echo -e "\033[42;30m zookeeper服務的進程號為----$result \033[0m"
3、初始化kafka腳本
#!/bin/bash
kafka_cluster="kafka_cluster"
kafka_path="/app/"$kafka_cluster
kafka_backup_path="/app/"$kafka_cluster"_backup"
echo "停止kafka服務....."
$kafka_path/bin/kafka-server-stop.sh
sleep 10
ps -ef|grep $kafka_cluster |grep -v grep|awk '{print $2}'|xargs kill -9
#刪除還原kafak
if [[ -d $kafka_backup_path && ${#kafka_path} -ge 8 ]];then
echo "kafka_backup_path dir is exsit...."
echo "the kafka folder is :$kafka_path , deleting $kafka_path"
#rm -rf $KAFKA_PATH
echo "the kafka backup folder is :$kafka_backup_path , copying file from $kafka_backup_path to $kafka_path"
#cp -rp $KAFKA_BAKE_PATH $KAFKA_PATH
sleep 20
else
echo "kafka_backup_path : $kafka_backup_path is not exist , exit process..."
exit 1
fi
echo "准備啟動kafka服務......"
$kafka_path/bin/kafka-server-start.sh -daemon $kafka_path/config/server.properties
result=`ps -ef|grep $kafka_cluster|grep -v grep|awk '{print $2}'`
echo -e "\033[42;30m kafka服務的進程號為----$result \033[0m"
4、創建topic腳本
#!/bin/bash topic_file="/home/root/topics.txt" ZK_DIR="192.168.53.125:2181,192.168.53.126:2181,192.168.53.127:2181" CMD="/app/kafka_cluster/bin/kafka-topics.sh" for line in `cat $topic_file` do $CMD --create --zookeeper $ZK_DIR --replication-facto 3 --partitions 8 --topic $line done
topic文件如下
[root@fkafka-01:/home/root]$cat topics.txt
test1
test2
如果需要清理 ,執行如下順序步驟
停止kafka-manager > 停止/啟動zookeeper > 停止/啟動 kafka > 啟動kafka-manager >啟動kafka manager
#停止kafka
ps -ef|grep manager |grep -v grep|awk '{print $2}'|xargs kill -9
cd
#恢復zookeeper 並啟動
./init_zk.sh
#恢復kafka 並啟動
cd /app/kafka-manager-1.3.0.7/
#kafka manager啟動
rm RUNNING_PID
nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9000 &
#創建topic
./create-topics.sh
