工作中遇到Kafka跨機房傳輸到遠程機房的場景,之前的方案是使用Flume消費后轉發到目標kafka,當topic增多並且數據量變大后,維護性較差且Flume較耗費資源。
一、原理
MirrorMaker 為Kafka 內置的跨集群/機房數據復制工具,二進制包解壓后bin目錄下有kafka-mirror-maker.sh,Mirror Maker啟動后,包含了一組消費者,這些消費者屬於同一個group,並從多個topic上讀取數據,所有的topic均使用該group.id,每個MirrorMaker 進程僅有一個生產者,該生產者將數據發送給目標集群的多個topic;
Kafka MirrorMaker的官方文檔一直沒有更新,因此新版Kafka為MirrorMaker增加的一些參數、特性等在文檔上往往找不到,需要看Kafka MirrorMaker的源碼,Kafka MirrorMaker啟動腳步如下,發現其主類位於kafka.tools.MirrorMaker,尤其是一些參數的解析邏輯和主要的執行流程,會比較有助於我們理解和運維Kafka MirrorMaker;
代碼示例
exec $(dirname $0)/kafka-run-class.sh kafka.tools.MirrorMaker "$@"
MirrorMaker 為每個消費者分配一個線程,消費者從源集群的topic和分區上讀取數據,然后通過公共生產者將數據發送到目標集群上,官方建議盡量讓 MirrorMaker 運行在目標數據中心里,因為長距離的跨機房網絡相對而言更加不可靠,如果發生了網絡分區,數據中心之間斷開了連接,無法連接到集群的消費者要比一個無法連接到集群的生產者要安全得多。
如果消費者無法連接到集群,最多也就是無法消費數據,數據仍然會在 Kafka 集群里保留很長的一段時間,不會有丟失的風險。相反,在發生網絡分區時如果 MirrorMaker 已經讀取了數據,但無法將數據生產到目標集群上,就會造成數據丟失。所以說遠程讀取比遠程生成更加安全。
建議:
-
建議啟動多個kafak-mirror-maker.sh 進程來完成數據同步,這樣就算有進程掛掉,topic的同組消費者可以進行reblance;
-
建議將kafka-mirror-maker.sh進程啟動在目標集群,原因上文有提及;
-
kafak-mirror-maker.sh啟動默認不會后台運行,調用kafka-run-class.sh的啟動內存256M,需要修改一下啟動參數(內存大小、日志);
-
建議對source 集群的whitelist中的topic的消費情況,加實時的積壓量監控;
-
建議producer.properties配置中開啟auto.create.topics.enable=true;
二、使用和配置
-
消費端配置(consumer.properties)
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181 group.id=groupyzg-02 # 選取鏡像數據的起始 即鏡像MirrorMaker啟動后的數據,參數latest,還是鏡像之前的數據,參數earliest auto.offset.reset=largest # 更改分區策略,默認是range,雖然有一定優勢但會導致不公平現象,特別是鏡像大量的主題和分區的時候,0.10版本設置 partition.assignment.strategy=roundrobin
source kafka版本是1.0,配置bootstrap-server指定kafka集群地址,配置方式如下:
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092 group.id=groupyzg-02 # 選取鏡像數據的起始?即鏡像MirrorMaker啟動后的數據,參數latest,還是鏡像之前的數據,參數earliest auto.offset.reset=latest # 消費者提交心跳周期,默認3000,由於是遠程鏡像,此處設為30秒 heartbeat.interval.ms=30000 # 消費連接超時值,默認10000,由於遠程鏡像,此處設為100秒 session.timeout.ms=100000 # 更改分區策略,默認是range,雖然有一定優勢但會導致不公平現象,特別是鏡像大量的主題和分區的時候 partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor # 單個poll()執行的最大record數,默認是500 max.poll.records=20000 # 讀數據時tcp接收緩沖區大小,默認是65536(64KiB) receive.buffer.bytes=4194304 # 設置每個分區總的大小,默認是1048576 max.partition.fetch.bytes=10485760
-
生產者配置(producer.properties)
bootstrap.servers = 192.168.xxx:9092,192.168.xxx:9092 buffer.memory = 268435456 batch.size = 104857 acks=0 linger.ms=10 max.request.size = 10485760 send.buffer.bytes = 10485760 compression.type=snappy
-
啟動、優化、日志監控
啟動命令kafka-mirror-maker.sh中添加端口約束和啟動內存配置:
export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G" export JMX_PORT="8888" exec $(dirname $0)/kafka-run-class.sh kafka.tools.MirrorMaker"$@"
日志監控:若想輸出日志數據,則使用一下命令啟動,日志數據會保存在kafka/logs/_mirror_maker.out 中;
./kafka-run-class.sh -daemon -name mirror_maker -loggc kafka.tools.MirrorMaker--consumer.config consumer.properties --num.streams 2--producer.config producer.properties --whitelist='testnet'
-
積壓監控:
0.10版本的積壓量監控:
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker--zookeeper xxxx:21810,xxx:21810,xxx:21810--topic testnet -group testnet-group
1.0版本的積壓量監控:
./kafka-consumer-groups.sh --bootstrap-server xxx:9092--describe --group testnet-group
進程數監控:建議增加mirror-maker的進程數監控,及時發現並啟動掛點進程;
#!/bin/bash
###################
#
# info :5 mins to check last 5mins logs
# add by deploy
# date:20190917
#
###################
#當前時間
sj=`date "+%F %T"`
#當前時間5分鍾前
last_sj=`date "+%F %T" -d '-5 min'`
#定義目錄
runlog=~/kafka_2.11-1.0.0/alarm/run.log
#通知手機號
noticetel="138XXXXXXXX"
province=~/kafka_2.11-1.0.0/alarm/province.cfg
tmplog=~/kafka_2.11-1.0.0/alarm/tmp.log
###短信通知,也可以使用郵箱通知服務
smsnotice(){
info=$@
IFS=","
for i in $noticetel;do
curl -kd xx
#curl -D - -kd xx
done
}
###判斷mirror-maker的進程個數;
province_all=`cat ${province}|wc -l`
mount=`ps -ef|grep -i mirror_maker-gc |wc -l`
ps -ef|grep -i mirror_maker-gc >${tmplog}
echo "the mount of mirror-maker is `expr $mount - 1`!"> $runlog
echo "the mount of province config is $province_all ! ">> $runlog
if [ `expr $mount - 1` -ge $province_all ] ;then
echo "`hostname -i` ----${sj} ---- the mirrormaker is ok!" >> $runlog
else
message="`hostname -i` ----${sj} ----the mount mirror-maker processor `expr $mount - 1` is less than the mount of province_config $province_all, "
echo ${message} >> $runlog
while read line
do
province_name=`echo ${line}|awk -F '|' '{print $1}'`
province_code=`echo ${line}|awk -F '|' '{print $2}'`
mount_two=`cat ${tmplog}|grep -i ${province_code} |wc -l`
if [ $mount_two -ge 1 ] ;then
echo "`hostname -i` ----${sj} ---- the province of ${province_name} is ok!" >> $runlog
else
message_two="${message} the province of [ ${province_name} ] mirror-maker processor is down, please check for it!"
echo ${message_two} >> $runlog
smsnotice ${message_two}
fi
done<${province}
fi
結語
跨機房傳輸是不是很簡單,你學會了嗎?
你那里是怎么實現kafka跨機房傳輸的呢,歡迎留言討論!
