Kafka跨集群遷移方案MirrorMaker原理、使用以及性能調優實踐


 

序言
Kakfa MirrorMaker是Kafka 官方提供的跨數據中心的流數據同步方案。其實現原理,其實就是通過從Source Cluster消費消息然后將消息生產到Target Cluster,即普通的消息生產和消費。用戶只要通過簡單的consumer配置和producer配置,然后啟動Mirror,就可以實現准實時的數據同步。

1. Kafka MirrorMaker基本特性
Kafka Mirror的基本特性有:

在Target Cluster沒有對應的Topic的時候,Kafka MirrorMaker會自動為我們在Target Cluster上創建一個一模一樣(Topic Name、分區數量、副本數量)一模一樣的topic。如果Target Cluster存在相同的Topic則不進行創建,並且,MirrorMaker運行Source Cluster和Target Cluster的Topic的分區數量和副本數量不同。
同我們使用Kafka API創建KafkaConsumer一樣,Kafka MirrorMaker允許我們指定多個Topic。比如,TopicA|TopicB|TopicC。在這里,|其實是正則匹配符,MirrorMaker也兼容使用逗號進行分隔。
多線程支持。MirrorMaker會在每一個線程上創建一個Consumer對象,如果性能允許,建議多創建一些線程
多進程任意橫向擴展,前提是這些進程的consumerGroup相同。無論是多進程還是多線程,都是由Kafka ConsumerGroup的設計帶來的任意橫向擴展性,具體的分區分派,即具體的TopicPartition會分派給Group中的哪個Topic負責,是Kafka自動完成的,Consumer無需關心。
我們使用Kafka MirrorMaker完成遠程的AWS(Source Cluster)上的Kafka信息同步到公司的計算集群(Target Cluster)。由於我們的大數據集群只有一個統一的出口IP,外網訪問我們的內網服務器必須通過nginx轉發,因此為了降低復雜度,決定使用“拉”模式而不是“推”模式,即,Kafka MirrorMaker部署在我們內網集群(Target Cluster),它負責從遠程的Source Cluster(AWS)的Kafka 上拉取數據,然后生產到本地的Kafka。
Kafka MirrorMaker的官方文檔一直沒有更新,因此新版Kafka為MirrorMaker增加的一些參數、特性等在文檔上往往找不到,需要看Kafka MirrorMaker的源碼。Kafka MirrorMaker的主類位於kafka.tools.MirrorMaker,尤其是一些參數的解析邏輯和主要的執行流程,會比較有助於我們理解、調試和優化Kafka MirrorMaker。

 

這是我啟動Kakfa MirrorMaker 的命令:

nohup ./bin/kafka-mirror-maker.sh --new.consumer --consumer.config config/mirror-consumer.properties --num.streams 40 --producer.config config/mirror-producer.properties --whitelist 'ABTestMsg|AppColdStartMsg|BackPayMsg|WebMsg|GoldOpenMsg|BoCaiMsg' &

mirror-consumer.properties配置文件如下:

#新版consumer擯棄了對zookeeper的依賴,使用bootstrap.servers告訴consumer kafka server的位置
bootstrap.servers=ip-188-33-33-31.eu-central-1.compute.internal:9092,ip-188-33-33-32.eu-central-1.compute.internal:9092,ip-188-33-33-33.eu-central-1.compute.internal:9092

#如果使用舊版Consumer,則使用zookeeper.connect
#zookeeper.connect=ip-188-33-33-31.eu-central-1.compute.internal:2181,ip-188-33-33-32.eu-central-1.compute.internal:2181,ip-188-33-33-33.eu-central-1.compute.internal:2181
1.compute.internal:2181
#change the default 40000 to 50000
request.timeout.ms=50000

#hange default heartbeat interval from 3000 to 15000
heartbeat.interval.ms=30000

#change default session timeout from 30000 to 40000
session.timeout.ms=40000
#consumer group id
group.id=africaBetMirrorGroupTest
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
#restrict the max poll records from 2147483647 to 200000
max.poll.records=20000
#set receive buffer from default 64kB to 512kb
receive.buffer.bytes=524288

#set max amount of data per partition to override default 1048576
max.partition.fetch.bytes=5248576
#consumer timeout
#consumer.timeout.ms=5000

mirror-producer.properties的配置文件如下:

bootstrap.servers=10.120.241.146:9092,10.120.241.82:9092,10.120.241.110:9092

# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=

# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync

# specify the compression codec for all data generated: none, gzip, snappy, lz4.
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively
compression.codec=none
# message encoder
serializer.class=kafka.serializer.DefaultEncoder

同時,我使用kafka-consumer-groups.sh循環監控消費延遲:

bin/kafka-consumer-groups.sh --bootstrap-server ip-188-33-33-31.eu-central-1.compute.internal:9092,ip-188-33-33-32.eu-central-1.compute.internal:9092,ip-188-33-33-33.eu-central-1.compute.internal:9092 --describe --group africaBetMirrorGroupTest --new-consumer

當我們使用new KafkaConsumer進行消息消費,要想通過kafka-consumer-groups.sh獲取整個group的offset、lag延遲信息,也必須加上–new-consumer,告知kafka-consumer-groups.sh,這個group的消費者使用的是new kafka consumer,即group中所有consumer的信息保存在了Kafka上的一個名字叫做__consumer_offsets的特殊topic上,而不是保存在zookeeper上。我在使用kafka-consumer-groups.sh的時候就不知道還需要添加--new-consumer,結果我啟動了MirrorMaker以后,感覺消息在消費,但是就是在zookeeper的/consumer/ids/上找不到group的任何信息。后來在stack overflow上問了別人才知道。

3. 負載不均衡原因診斷以及問題解決
在我的另外一篇博客《Kafka為Consumer分派分區:RangeAssignor和RoundRobinAssignor》中,介紹了Kafka內置的分區分派策略:RangeAssignor和RoundRobinAssignor。由於RangeAssignor是早期版本的Kafka的唯一的分區分派策略,因此,默認不配置的情況下,Kafka使用RangeAssignor進行分區分派,但是,在MirrorMaker的使用場景下,RoundRobinAssignor更有利於均勻的分區分派。甚至在KAFKA-3831中有人建議直接將MirrorMaker的默認分區分派策略改為RoundRobinAssignor。那么,它們到底有什么區別呢?我們先來看兩種策略下的分區分派結果。在我的實驗場景下,有6個topic:ABTestMsg|AppColdStartMsg|BackPayMsg|WebMsg|GoldOpenMsg|BoCaiMsg,每個topic有兩個分區。由於MirrorMaker所在的服務器性能良好,我設置--num.streams 40,即單台MirrorMaker會用40個線程,創建40個獨立的Consumer進行消息消費,兩個MirrorMaker加起來80個線程,80個並行Consumer。由於總共只有6 * 2=12個TopicPartition,因此最多也只有12個Consumer會被分派到分區,其余Consumer空閑。
我們來看基於RangeAssignor分派策略,運行kafka-consumer-groups.sh觀察到的分區分派的結果:

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
ABTestMsg 0 780000 820038 49938 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-0
ABTestMsg 1 774988 820038 55000 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-1
AppColdStartMsg 0 774000 820039 55938 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-0
AppColdStartMsg 1 774100 820045 56038 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-1
BackPayMsg 0 780000 820038 49938 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-0
BackPayMsg 1 774988 820038 55000 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-1
WebMsg 0 774000 820039 55938 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-0
WebMsg 1 774100 820045 56038 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-1
GoldOpenMsg 0 780000 820038 49938 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-0
GoldOpenMsg 1 774988 820038 55000 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-1
BoCaiMsg 0 774000 820039 55938 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-0
BoCaiMsg 1 774100 820045 56038 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-1
- - - - - africaBetMirrorGroupTest-6-ae373364-2ae2-42b8-8a74-683557e315bf/114.113.198.126 africaBetMirrorGroupTest-6
- - - - - africaBetMirrorGroupTest-9-0e346b46-1a2c-46a2-a2da-d977402f5c5d/114.113.198.126 africaBetMirrorGroupTest-9
- - - - - africaBetMirrorGroupTest-7-f0ae9f31-33e6-4ddd-beac-236fb7cf20d5/114.113.198.126 africaBetMirrorGroupTest-7
- - - - - africaBetMirrorGroupTest-7-e2a9e905-57c1-40a6-a7f3-4aefd4f1a30a/114.113.198.126 africaBetMirrorGroupTest-7
- - - - - africaBetMirrorGroupTest-8-480a2ef5-907c-48ed-be1f-33450903ec72/114.113.198.126 africaBetMirrorGroupTest-8
- - - - - africaBetMirrorGroupTest-8-4206bc08-58a5-488a-b756-672fb4eee6e0/114.113.198.126 africaBetMirrorGroupTest-8
.....后續更多空閑consumer省略不顯示

當沒有在mirror-consumer.properties 中配置分區分派策略,即使用默認的RangeAssignor的時候,我們發現,盡管我們每一個MirrorMaker有40個Consumer,整個Group中有80個Consumer,但是,一共6 * 2 = 12個TopicPartition竟然全部聚集在2-3個Consumer上,顯然,這完全浪費了並行特性,被分配到一個consumer上的多個TopicPartition只能串行消費。

因此,通過partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor顯式指定分區分派策略為RoundRobinAssignor,重啟MirrorMaker,重新通過kafka-consumer-groups.sh 命令觀察分區分派和消費延遲結果:

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
ABTestMsg 0 819079 820038 959 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-1
ABTestMsg 1 818373 820038 1665 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-5
AppColdStartMsg 0 818700 818907 1338 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-20
AppColdStartMsg 1 818901 820045 1132 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-18
BackPayMsg 0 819032 820038 959 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-5
BackPayMsg 1 818343 820038 1638 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-8
WebMsg 0 818710 818907 1328 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-7
WebMsg 1 818921 820045 1134 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-9
GoldOpenMsg 0 819032 820038 959 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-12
GoldOpenMsg 1 818343 820038 1638 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-14
BoCaiMsg 0 818710 818907 1322 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-14
BoCaiMsg 1 818921 820045 1189 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-117
- - - - - africaBetMirrorGroupTest-6-ae373364-2ae2-42b8-8a74-683557e315bf/114.113.198.126 africaBetMirrorGroupTest-6
- - - - - africaBetMirrorGroupTest-9-0e346b46-1a2c-46a2-a2da-d977402f5c5d/114.113.198.126 africaBetMirrorGroupTest-9
- - - - - africaBetMirrorGroupTest-7-f0ae9f31-33e6-4ddd-beac-236fb7cf20d5/114.113.198.126 africaBetMirrorGroupTest-7
- - - - - africaBetMirrorGroupTest-7-e2a9e905-57c1-40a6-a7f3-4aefd4f1a30a/114.113.198.126 africaBetMirrorGroupTest-7
- - - - - africaBetMirrorGroupTest-8-480a2ef5-907c-48ed-be1f-33450903ec72/114.113.198.126 africaBetMirrorGroupTest-8
- - - - - africaBetMirrorGroupTest-8-4206bc08-58a5-488a-b756-672fb4eee6e0/114.113.198.126 africaBetMirrorGroupTest-8
.....后續更多空閑consumer省略不顯示

對比RangeAssingor,消息延遲明顯減輕,而且,12個TopicPartition被均勻分配到了不同的consumer上,即單個Consumer只負責一個TopicPartition的消息消費,不同的TopicPartition之間實現了完全並行。
之所以出現以上不同,原因在於兩個分區分派方式的策略不同:

RangeAssingor:先對所有Consumer進行排序,然后對Topic逐個進行分區分派。用以上Topic作為例子:
對所有的Consumer進行排序,排序后的結果為Consumer-0,Consumer-1,Consumer-2 ....Consumer-79
對ABTestMsg進行分區分派:
ABTestMsg-0分配給Consumer-0
ABTestMsg-1分配各Consumer-1

對AppColdStartMsg進行分區分派:
AppColdStartMsg-0分配各Consumer-0
AppColdStartMsg-1分配各Consumer-1

#后續TopicParition的分派以此類推

可見,RangeAssingor 會導致多個TopicPartition被分派在少量分區上面。
- RoundRobinAssignor:與RangeAssignor最大的區別,是不再逐個Topic進行分區分派,而是先將Group中的所有TopicPartition平鋪展開,再一次性對他們進行一輪分區分派。

將Group中的所有TopicPartition展開,展開結果為:

ABTestMsg-0,ABTestMsg-1,AppColdStartMsg-0,AppColdStartMsg-1,BackPayMsg-0,BackPayMsg-1,WebMsg-0,WebMsg-1,GoldOpenMsg-0,GoldOpenMsg-1,BoCaiMsg-0,BoCaiMsg-1

對所有的Consumer進行排序,排序后的結果為Consumer-0,Consumer-1,Consumer-2 ,Consumer-79。

開始講平鋪的TopicPartition進行分區分派

ABTestMsg-0分配給Consumer-0
ABTestMsg-1分配給Consumer-1
AppColdStartMsg-0分配給Consumer-2
AppColdStartMsg-1分配給Consumer-3
BackPayMsg-0分配給Consumer-4
BackPayMsg-1分配給Consumer-5


#后續TopicParition的分派以此類推

由此可見,RoundRobinAssignor平鋪式的分區分派算法是讓我們的Kafka MirrorMaker能夠無重疊地將TopicParition分派給Consumer的原因。

4. 本身網絡帶寬限制問題
網絡帶寬本身也會限制Kafka Mirror的吞吐量。進行壓測的時候,我分別在我們的在線環境和測試環境分別運行Kafka MirrorMaker,均選擇兩台服務器運行MirrorMaker,但是在線環境是實體機環境,單台機器通過SCP方式拷貝Source Cluster上的大文件,平均吞吐量是600KB-1.5MB之間,但是測試環境的機器是同一個host主機上的多台虛擬機,SCP吞吐量是100KB以下。經過測試,測試環境消息積壓會逐漸增多,在線環境持續積壓,但是積壓一直保持穩定。這種穩定積壓是由於每次poll()的間隙新產生的消息量,屬於正常現象。

5. 適當配置單次poll的消息總量和單次poll()的消息大小
通過Kafka MirrorMaker運行時指定的consumer配置文件(在我的環境中為$MIRROR_HOME/config/mirror-consumer.properties)來配置consumer。其中,通過以下配置,可以控制單次poll()的消息體量(數量和總體大小)
max.poll.records:單次poll()操作最多消費的消息總量,這里說的poll是單個consumer而言的。無論過大過小,都會發生問題:

如果設置得過小,則消息傳輸率降低,大量的頭信息會占用較大的網絡帶寬;-
如果設置得過大,則會產生一個非常難以判斷原因同時又會影響整個group中所有消息的消費的重要問題:rebalance。看過kafka代碼的話可以看到,每次poll()請求都會順帶向遠程server發送心跳信息,遠程GroupCoordinator會根據這個心跳信息判斷consumer的活性。如果超過指定時間(heartbeat.interval.ms)沒有收到對應Consumer的心跳,則GroupCoordinator會判定這個Server已經掛掉,因此將這個Consumer負責的partition分派給其它Consumer,即觸發rebalance。rebalance操作的影響范圍是整個Group,即Group中所有的Consumer全部暫停消費直到Rebalance完成。而且,TopicPartition越長,這個過程會越長。其實,一個正常消費的環境,應該是任何時候都不應該發生rebalance的(一個新的Consumer的正常加入也會引起Rebalance,這種情況除外)。雖然Kafka本身是非常穩定的,但是還是應該盡量避免rebalance的發生。在某些極端情況下觸發一些bug,rebalance可能永遠停不下來了。。。如果單次max.poll.records消費太多消息,這些消息produce到Target Cluster的時間可能會較長,從而可能觸發Rebalance。
6. 惡劣網絡環境下增加超時時間配置
在不穩定的網絡環境下,應該增加部分超時時間配置,如request.timeout.ms或者session.timeout.ms,一方面可以避免頻繁的超時導致大量不必要的重試操作,同時,通過增加如上文所講,通過增加heartbeat.interval.ms時間,可以避免不必要的rebalance操作。當然,在網絡環境良好的情況下,上述配置可以適當減小以增加Kafka Server對MirrorMaker出現異常情況下的更加及時的響應。

總之,Kafka MirrorMaker作為跨數據中心的Kafka數據同步方案,絕對無法允許數據丟失以及數據的傳輸速度低於生產速度導致數據越積累越多。因此,唯有進行充分的壓測和精准的性能調優,才能綜合網絡環境、服務器性能,將Kafka MirrorMaker的性能發揮到最大。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM