shell腳本並發數據到kafka topic
需求:
每秒發送大量數據到kafka,驗證下游系統性能,數據中的時間戳要求為當前時間,可以之間采集系統當前時間替換文件中舊的時間戳,保證每次發送的數據都為最新時間。
利用kafka自帶的腳本,將待發數據寫入文件中,然后通過讀取文件 方式,將數據批量發送到kafka task節點
#在kafka master 節點用戶home 目錄下創建data 目錄
[root@node1 home]# mkdir data
#進入data目錄
[root@node1 home]# cd data/
#將准備好的數據放入data目錄中
[root@node1 data]# ls
batch1-1000.log batch2-1000.log batch-send.sh
#編寫batch-send.sh 腳本
[root@node1 data]# vi batch-send.sh
shell content:
#!/bin/bash
#響應Ctrl+C中斷
trap 'onCtrlC' INT
function onCtrlC () {
echo 'Ctrl+C is captured'
exit 1
}
#kafka及data所在目錄
dataPath=/home/data
kafkaPath=/usr/local/kafka
#broker list
brokerlist=192.168.1.10:9092,192.168.1.11:9092,192.168.1.12:9092
#kafka的topic,這里設置發送到2個kafka topic
topic1=test-topic1
topic2=test-topic2
#消息總數,可根據需要配置,實際數量是*num后的數,num取決於數據文件中的數據量
totalNum=100
#循環計時參數
batchNum=10
#開始時間
start=$(date +%s)
#打印循環開始時間
echo $(date)
for ((i=1; i<=${totalNum}; i ++))
do
{
#取余數
modVal=$(( ${i} % ${batchNum} ))
#如果循環計數達到設置參數,就發送一批次消息
if [ ${modVal} = 0 ] ; then
#在控制台顯示進度,*num,取決於數據文件中的數據量
echo “$(( ${i}*100 )) of $(( ${totalNum}*100 )) sent”
# 獲取每次循環發送的當前時間
current=`date "+%Y-%m-%d %H:%M:%S"`
# 將時間轉為UTC 時間,精確到秒
timeStamp=`date -d "$current" +%s`
#將current轉換為時間戳,精確到毫秒,10#`date "+%N"是為了避免循環過程中出現error:value too great for base
currentTimeStamp=$((timeStamp*1000+10#`date "+%N"`/1000000))
#每次循環發送數據前,都將數據文件中的recvTs 參數改為當前時間,放在前台執行,由於是順序執行,需要等待文件修改完后,才會下一步執行發送文件數據給kafka,每個文件中1000條數據
sed -i 's/\("recvTs":\)[0-9]*\(,\)/\1'$currentTimeStamp',/g' ${dataPath}/batch1-1000.log ${dataPath}/batch2-1000.log
#批量發送消息,並且將控制台返回的提示符重定向到/dev/null,&放到后台並發執行
cat ${dataPath}/batch1-1000.log | ${kafkaPath}/bin/kafka-console-producer.sh --broker-list ${brokerlist} --sync --topic ${topic1} | > /dev/null &
cat ${dataPath}/batch1-1000.log | ${kafkaPath}/bin/kafka-console-producer.sh --broker-list ${brokerlist} --sync --topic ${topic1} | > /dev/null &
fi
}
#每個for循環等待0.1秒,batchNum=10,即每秒執行一次數據發送,
sleep 0.1
done
#腳本結束時間,並統計腳本執行時間,打印到控制台
end=$(date +%s)
echo $(date)
echo $(( $end - $start ))
腳本執行效果
[root@node1 data]# ./batch-send.sh
Sat Jan 30 16:15:53 CST 2021
“1000 of 10000 sent”
“2000 of 10000 sent”
“3000 of 10000 sent”
“4000 of 10000 sent”
“5000 of 10000 sent”
“6000 of 10000 sent”
“7000 of 10000 sent”
“8000 of 10000 sent”
“9000 of 10000 sent”
“10000 of 10000 sent”
Sat Jan 30 16:16:04 CST 2021
11
[root@node1 data]#
這可以通過修改文件中的數據量和循環計時參數,實現每秒發送10000 甚至100000 條數據到kafka。