Kafka MirrorMaker跨集群同步工具


來自:https://www.cnblogs.com/sunxucool/p/3913131.html

Kafka 跨集群同步方案——Kafka內置的MirrorMaker工具

由於公司機房搬遷,對於kafka數據的遷移使用kafka內置的MirrorMaker工具

Kafka的鏡像功能可以維護現有Kafka集群的副本。下圖顯示了如何使用MirrorMaker工具將源Kafka群集鏡像到目標(鏡像)Kafka群集。該工具使用Kafka使用者來使用來自源群集的消息,並使用嵌入式Kafka生成器將這些消息重新發布到本地(目標)群集。
還有關於MirrorMaker的文檔,網址為 https://kafka.apache.org/documentation.html#basic_ops_mirror_maker。

一、如何創建鏡像

設置鏡像很簡單 ,源集群存在,創建一個目標集群, 在啟動目標集群后啟動鏡像制作器進程即可。鏡像制作者至少需要一個或多個消費者配置,生產者配置以及白名單或黑名單。您需要將使用者指向源集群的ZooKeeper,將生成器指向鏡像集群的ZooKeeper(或使用broker.list參數)。

bin/kafka-mirror-maker.sh --consumer.config sourceCluster1Consumer.config --consumer.config sourceCluster2Consumer.config --num.streams 2 --producer.config targetClusterProducer.config --whitelist=".*"

1、創建sourceCluster1Consumer.config文件

zk.connect=szk1:2181,szk2:2181,szk3:2181groupid=test-mirror-consumer-group

2、創建targetClusterProducer.config

zk.connect=tzk1:2181,tzk2:2181

3、創建啟動文件:start.sh

$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config sourceClusterConsumer.config --num.streams 2 --producer.config targetClusterProducer.config --whitelist=".*"

4、執行腳本

執行start.sh通過日志信息查看運行狀況,到目標Kafka集群的log.dir中即可看到同步過來的數據。

二、MirrorMaker參數說明

$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.MirrorMaker --help

1. 白名單(whitelist) 黑名單(blacklist)
mirror-maker接受精確指定同步topic的白名單和黑名單。使用java標准的正則表達式,為了方便,逗號(‘,’)被編譯為java正則中的(‘|’)。
2. Producer timeout
為了支持高吞吐量,你最好使用異步的內置producer,並將內置producer設置為阻塞模式(queue.enqueueTimeout.ms=-1)。這樣可以保證數據(messages)不會丟失。否則,異步producer默認的 enqueueTimeout是0,如果producer內部的隊列滿了,數據(messages)會被丟棄,並拋出QueueFullExceptions異常。而對於阻塞模式的producer,如果內部隊列滿了就會一直等待,從而有效的節制內置consumer的消費速度。你可以打開producer的的trace logging,隨時查看內部隊列剩余的量。如果producer的內部隊列長時間處於滿的狀態,這說明對於mirror-maker來說,將消息重新推到目標Kafka集群或者將消息寫入磁盤是瓶頸。
對於kafka的producer同步異步的詳細配置請參考$KAFKA_HOME/config/producer.properties文件。關注其中的producer.type和queue.enqueueTimeout.ms這兩個字段。
3. Producer 重試次數(retries)
如果你在producer的配置中使用broker.list,你可以設置當發布數據失敗時候的重試次數。retry參數只在使用broker.list的時候使用,因為在重試的時候會重新選擇broker。
4. Producer 數量
通過設置—num.producers參數,可以使用一個producer池來提高mirror maker的吞吐量。在接受數據(messages)的broker上的producer是只使用單個線程來處理的。就算你有多個消費流,吞吐量也會在producer處理請求的時候被限制。
5. 消費流(consumption streams)數量
使用—num.streams可以指定consumer的線程數。請注意,如果你啟動多個mirror maker進程,你可能需要看看其在源Kafka集群partitions的分布情況。如果在每個mirror maker進程上的消費流(consumption streams)數量太多,某些消費進程如果不擁有任何分區的消費權限會被置於空閑狀態,主要原因在於consumer的負載均衡算法。
6. 淺迭代(Shallow iteration)與producer壓縮
我們建議在mirror maker的consumer中開啟淺迭代(shallow iteration)。意思就是mirror maker的consumer不對已經壓縮的消息集(message-sets)進行解壓,只是直接將獲取到的消息集數據同步到producer中。
如果你開啟淺迭代(shallow iteration),那么你必須關閉mirror maker中producer的壓縮功能,否則消息集(message-sets)會被重復壓縮。
7. Consumer 和 源Kafka集群(source cluster)的 socket buffer sizes
鏡像經常用在跨集群場景中,你可能希望通過一些配置選項來優化內部集群的通信延遲和特定硬件性能瓶頸。一般來說,你應該對mirror-maker中consumer的socket.buffersize 和源集群broker的socket.send.buffer設定一個高的值。此外,mirror-maker中消費者(consumer)的fetch.size應該設定比socket.buffersize更高的值。注意,套接字緩沖區大小(socket buffer size)是操作系統網絡層的參數。如果你啟用trace級別的日志,你可以檢查實際接收的緩沖區大小(buffer size),以確定是否調整操作系統的網絡層。

三、如何檢驗MirrorMaker運行狀況

Consumer offset checker工具可以用來檢查鏡像對源集群的消費進度。例如:

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group KafkaMirror --zkconnect localhost:2181 --topic test-topicKafkaMirror,topic1,0-0 (Group,Topic,BrokerId-PartitionId)            Owner = KafkaMirror_jkoshy-ld-1320972386342-beb4bfc9-0  Consumer offset = 561154288                  = 561,154,288 (0.52G)         Log size = 2231392259                  = 2,231,392,259 (2.08G)     Consumer lag = 1670237971                  = 1,670,237,971 (1.56G)BROKER INFO0 -> 127.0.0.1:9092

注意,–zkconnect參數需要指定到源集群的Zookeeper。另外,如果指定topic沒有指定,則打印當前消費者group下所有topic的信息。

四、注意事項

1)whitelist和blacklist支持正則表達式。比如需要包含兩個topic可以這樣寫,--whitelist 'A|B' or --whitelist 'A,B' ,或者想遷移所有topic可以這樣寫 --whitelist '*'
2)注意在遷移之前創建好相關topic以及規划好partition數量。
3)老版本和新版本遷移主要考慮consumer和producer的兼容性
4)如果允許的話,建議將MirrorMaker部署在目標集群內,這是因為如果一旦發生網絡分區,消費者與源集群斷開連接比生產者與目標集群斷開連接要安全。如果消費者斷開連接,那么只是當前讀取不到數據,但是數據仍然在源集群內,並不會丟失;而生產者斷開連接,MirrorMaker便生產不了數據,如果MirrorMaker本身處理不當,可能會丟失數據。
5)開始之前配置好限流,防止影響原來集群的正常工作。


參考文獻:
http://kafka.apache.org/documentation.html#configuratio
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+(MirrorMaker)

  

  

  

  

  

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM