MongoDB -> kafka 高性能實時同步(sync 采集)mongodb數據到kafka解決方案


寫這篇博客的目的

讓更多的人了解 阿里開源的MongoShake可以很好滿足mongodb到kafka高性能高可用實時同步需求(項目地址:https://github.com/alibaba/MongoShake,下載地址:https://github.com/alibaba/MongoShake/releases)。至此博客就結束了,你可以愉快地啃這個項目了。還是一起來看一下官方的描述:

MongoShake is a universal data replication platform based on MongoDB's oplog. Redundant replication and active-active replication are two most important functions. 基於mongodb oplog的集群復制工具,可以滿足遷移和同步的需求,進一步實現災備和多活功能。

沒有標題的標題

哈哈,有興趣聽我啰嗦的可以往下。最近,有個實時增量采集mongodb數據(數據量在每天10億條左右)的需求,需要先調研一下解決方案。我分別百度、google了mongodb kafka sync 同步 采集 實時等 關鍵詞,寫這篇博客的時候排在最前面的當屬kafka-connect(官方有實現https://github.com/mongodb/mongo-kafka,其實也有非官方的實現)那一套方案,我對kafka-connect相對熟悉一點(不熟悉的話估計編譯部署都要花好一段時間),沒測之前就感覺可能不滿足我的采集性能需求,測下來果然也是不滿足需求。后來,也看到了https://github.com/rwynn/route81,編譯部署也較為麻煩,同樣不滿足采集性能需求。我搜索東西的時候一般情況下不會往下翻太多,沒找到所需的,大多會嘗試換關鍵詞(包括中英文)搜搜,這次可能也提醒我下次要多往下找找,說不定有些好東西未必排在最前面幾個

之后在github上搜in:readme mongodb kafka sync,讓我眼前一亮。

github上搜索mongodb、kafka、sync關鍵詞結果

點進去快速讀了一下readme,正是我想要的(后面自己實際測下來確實高性能、高可用,滿足我的需求),官方也提供了MongoShake的性能測試報告

這篇博客不講(也很大可能是筆者技術太渣,無法參透領會(●´ω`●))MongoShake的架構、原理、實現,如何高性能的,如何高可用的等等。就一個目的,希望其他朋友在搜索mongodb kafka時候,MongoShake的解決方案可以排在最前面。

初次使用MongoShake值得注意的地方

數據處理流程

v2.2.1之前的MongoShake版本處理數據的流程:

MongoDB(數據源端,待同步的數據)
-->MongoShake(對應的是collector.linux進程,作用是采集)
-->Kafka(raw格式,未解析的帶有header+body的數據)
-->receiver(對應的是receiver.linux進程,作用是解析,這樣下游組件就能拿到比如解析好的一條一條的json格式的數據)
-->下游組件(拿到mongodb中的數據用於自己的業務處理)

v2.2.1之前MongoShake的版本解析入kafka,需要分別啟collector.linux和receiver.linux進程,而且receiver.linux需要自己根據你的業務邏輯填充完整,然后編譯出來,默認只是把解析出來的數據打個log而已

src/mongoshake/receiver/replayer.go中的代碼如圖:

需要自己填充receiver邏輯的地方

詳情見:https://github.com/alibaba/MongoShake/wiki/FAQ#q-how-to-connect-to-different-tunnel-except-direct

v2.2.1版本MongoShake的collector.conf有一個配置項tunnel.message

# the message format in the tunnel, used when tunnel is kafka.
# "raw": batched raw data format which has good performance but encoded so that users
# should parse it by receiver.
# "json": single oplog format by json.
# "bson": single oplog format by bson.
# 通道數據的類型,只用於kafka和file通道類型。
# raw是默認的類型,其采用聚合的模式進行寫入和
# 讀取,但是由於攜帶了一些控制信息,所以需要專門用receiver進行解析。
# json以json的格式寫入kafka,便於用戶直接讀取。
# bson以bson二進制的格式寫入kafka。
tunnel.message = json
  • 如果選擇的raw格式,那么數據處理流程和上面之前的一致(MongoDB->MongoShake->Kafka->receiver->下游組件)
  • 如果選擇的是jsonbson,處理流程為MongoDB->MongoShake->Kafka->下游組件

v2.2.1版本設置為json處理的優點就是把以前需要由receiver對接的格式,改為直接對接,從而少了一個receiver,也不需要用戶額外開發,降低開源用戶的使用成本。

簡單總結一下就是:
raw格式能夠最大程度的提高性能,但是需要用戶有額外部署receiver的成本。json和bson格式能夠降低用戶部署成本,直接對接kafka即可消費,相對於raw來說,帶來的性能損耗對於大部分用戶是能夠接受的。

高可用部署方案

我用的是v2.2.1版本,高可用部署非常簡單。collector.conf開啟master的選舉即可:

# high availability option.
# enable master election if set true. only one mongoshake can become master
# and do sync, the others will wait and at most one of them become master once
# previous master die. The master information stores in the `mongoshake` db in the source
# database by default.
# 如果開啟主備mongoshake拉取同一個源端,此參數需要開啟。
master_quorum = true

# checkpoint存儲的地址,database表示存儲到MongoDB中,api表示提供http的接口寫入checkpoint。
context.storage = database

同時我checkpoint的存儲地址默認用的是database,會默認存儲在mongoshake這個db中。我們可以查詢到checkpoint記錄的一些信息。

rs0:PRIMARY> use mongoshake
switched to db mongoshake
rs0:PRIMARY> show collections;
ckpt_default
ckpt_default_oplog
election
rs0:PRIMARY> db.election.find()
{ "_id" : ObjectId("5204af979955496907000001"), "pid" : 6545, "host" : "192.168.31.175", "heartbeat" : NumberLong(1582045562) }

我在192.168.31.174,192.168.31.175,192.168.31.176上總共啟了3個MongoShake實例,可以看到現在工作的是192.168.31.175機器上進程。自測過程,高速往mongodb寫入數據,手動kill掉192.168.31.175上的collector進程,等192.168.31.174成為master之后,我又手動kill掉它,最終只保留192.168.31.176上的進程工作,最后統計數據發現,有重采數據現象,猜測有實例還沒來得及checkpoint就被kill掉了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM