Kafka 跨集群同步方案(轉)


來自:http://tangzhaohui.net/524

Kafka 跨集群同步方案——Kafka內置的MirrorMaker工具

該方案解決Kafka跨集群同步、創建Kafka集群鏡像等相關問題,主要使用Kafka內置的MirrorMaker工具實現。

Kafka鏡像即已有Kafka集群的副本。下圖展示如何使用MirrorMaker工具創建從源Kafka集群(source cluster)到目標Kafka集群(target cluster)的鏡像。該工具通過Kafka consumer從源Kafka集群消費數據,然后通過一個內置的Kafka producer將數據重新推送到目標Kafka集群。

kafka mirror maker

 

 

一、如何創建鏡像

使用MirrorMaker創建鏡像是比較簡單的,搭建好目標Kafka集群后,只需要啟動mirror-maker程序即可。其中,一個或多個consumer配置文件、一個producer配置文件是必須的,whitelist、blacklist是可選的。在consumer的配置中指定源Kafka集群的Zookeeper,在producer的配置中指定目標集群的Zookeeper(或者broker.list)。

例如,你需要創建S集群的鏡像,目標集群T已經搭建好,簡單的做法如下:

1. 創建consumer配置文件:sourceClusterConsumer.config

 2. 創建producer配置文件:targetClusterProducer.config

 3. 創建啟動腳本:start.sh

 4. 執行腳本

執行start.sh通過日志信息查看運行狀況,到目標Kafka集群的log.dir中即可看到同步過來的數據。

二、MirrorMaker的參數說明

執行上面的命令就可以看到各個參數的說明:

MirrorMaker 參數

1. 白名單(whitelist) 黑名單(blacklist)

mirror-maker接受精確指定同步topic的白名單和黑名單。使用java標准的正則表達式,為了方便,逗號(‘,’)被編譯為java正則中的(‘|’)。

2. Producer timeout

為了支持高吞吐量,你最好使用異步的內置producer,並將內置producer設置為阻塞模式(queue.enqueueTimeout.ms=-1)。這樣可以保證數據(messages)不會丟失。否則,異步producer默認的 enqueueTimeout是0,如果producer內部的隊列滿了,數據(messages)會被丟棄,並拋出QueueFullExceptions異常。而對於阻塞模式的producer,如果內部隊列滿了就會一直等待,從而有效的節制內置consumer的消費速度。你可以打開producer的的trace logging,隨時查看內部隊列剩余的量。如果producer的內部隊列長時間處於滿的狀態,這說明對於mirror-maker來說,將消息重新推到目標Kafka集群或者將消息寫入磁盤是瓶頸。

對於kafka的producer同步異步的詳細配置請參考$KAFKA_HOME/config/producer.properties文件。關注其中的producer.type和queue.enqueueTimeout.ms這兩個字段。

3. Producer 重試次數(retries)

如果你在producer的配置中使用broker.list,你可以設置當發布數據失敗時候的重試次數。retry參數只在使用broker.list的時候使用,因為在重試的時候會重新選擇broker。

4. Producer 數量

通過設置—num.producers參數,可以使用一個producer池來提高mirror maker的吞吐量。在接受數據(messages)的broker上的producer是只使用單個線程來處理的。就算你有多個消費流,吞吐量也會在producer處理請求的時候被限制。

5. 消費流(consumption streams)數量

使用—num.streams可以指定consumer的線程數。請注意,如果你啟動多個mirror maker進程,你可能需要看看其在源Kafka集群partitions的分布情況。如果在每個mirror maker進程上的消費流(consumption streams)數量太多,某些消費進程如果不擁有任何分區的消費權限會被置於空閑狀態,主要原因在於consumer的負載均衡算法。

6. 淺迭代(Shallow iteration)與producer壓縮

我們建議在mirror maker的consumer中開啟淺迭代(shallow iteration)。意思就是mirror maker的consumer不對已經壓縮的消息集(message-sets)進行解壓,只是直接將獲取到的消息集數據同步到producer中。

如果你開啟淺迭代(shallow iteration),那么你必須關閉mirror maker中producer的壓縮功能,否則消息集(message-sets)會被重復壓縮。

7. Consumer 和 源Kafka集群(source cluster)的 socket buffer sizes

鏡像經常用在跨集群場景中,你可能希望通過一些配置選項來優化內部集群的通信延遲和特定硬件性能瓶頸。一般來說,你應該對mirror-maker中consumer的socket.buffersize 和源集群broker的socket.send.buffer設定一個高的值。此外,mirror-maker中消費者(consumer)的fetch.size應該設定比socket.buffersize更高的值。注意,套接字緩沖區大小(socket buffer size)是操作系統網絡層的參數。如果你啟用trace級別的日志,你可以檢查實際接收的緩沖區大小(buffer size),以確定是否調整操作系統的網絡層。

三、如何檢驗MirrorMaker運行狀況

Consumer offset checker工具可以用來檢查鏡像對源集群的消費進度。例如:

注意,–zkconnect參數需要指定到源集群的Zookeeper。另外,如果指定topic沒有指定,則打印當前消費者group下所有topic的信息。

參考文獻

  1. http://kafka.apache.org/documentation.html#configuration
  2. https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+(MirrorMaker)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM