mongodb 增量同步之 MongoShake(1)


概要:

目的:增量同步mongodb 的數據(mongo需集群,或者副本集模式)

        1.官網文檔

         2 基本安裝,啟動,監控

         3.小試牛刀,最佳實踐

0.官網文檔:

github: https://github.com/alibaba/MongoShake/

releases: https://github.com/alibaba/MongoShake/releases?spm=a2c4e.10696291.0.0.7aac19a4ZhZPfe

This is a brief introduction of Mongo-Shake, please visit english wiki or chinese wiki if you want to see more details including architecture, data flow, performance test, business showcase and so on.

1.基本安裝: https://help.aliyun.com/document_detail/122621.html?spm=a2o8d.corp_prod_req_detail.0.0.3b1d23d3Cjcmsa

操作步驟:  最新版:https://github.com/alibaba/MongoShake/releases

1.執行如下命令下載MongoShake程序,並重命名為mongoshake.tar.gz
wget "http://docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/assets/attach/196977/jp_ja/1608863913991/mongo-shake-v2.4.16.tar.gz" -O mongoshake.tar.gz
2.解壓: tar zxvf mongoshake.tar.gz
3.執行vi collector.conf命令,修改MongoShake的配置文件collector.conf,涉及的主要參數說明如: https://developer.aliyun.com/article/719704
主要參數:
mongo_urls :
源端MongoDB實例的ConnectionStringURI格式連接地址。 tunnel.address: 目標端MongoDB實例的ConnectionStringURI格式連接地址。 sync_mode : 數據同步的方式,取值: all:執行全量數據同步和增量數據同步。 full:僅執行全量數據同步。 incr:僅執行增量數據同步
4.執行下述命令啟動同步任務,並打印日志信息。
./collector.linux -conf=collector.conf -verbose
最新版(2.6.5):
./collector.linux -conf=collector.conf -verbose 2
5.觀察打印的日志信息,當出現如下日志時,即代表全量數據同步已完成,並進入增量數據同步模式。
ngoshake/collector.(*ReplicationCoordinator).Run:80) finish full sync, start incr sync with timestamp: fullBeginTs[1560994443], fullFinishTs[1560994737]

2.監控MongoShake狀態

增量數據同步開始后,您可以再開啟一個命令行窗口,通過如下命令來監控MongoShake。
 ./mongoshake-stat --port=9100

參數 說明
logs_get/sec 每秒獲取的oplog數量。
logs_repl/sec 每秒執行重放操作的oplog數量。
logs_success/sec 每秒成功執行重放操作的oplog數量。
lsn.time 最后發送oplog的時間。
lsn_ack.time 目標端確認寫入的時間。
lsn_ckpt.time CheckPoint持久化的時間。
now.time 當前時間。
replset 源數據庫的副本集名稱。

 


3.MongoShake最佳實踐,詳見:https://developer.aliyun.com/article/719704

  1. 從MongoDB副本集同步到MongoDB副本集
  2. 從MongoDB副本集同步到MongoDB集群版
  3. 從MongoDB集群版同步到MongoDB集群版
  4. 從MongoDB副本集同步到kafka通道
  5. 雲上MongoDB副本集的雙向同步

1. 從MongoDB副本集同步到MongoDB副本集

假設源端是三副本:10.1.1.1:1001, 10.2.2.2:2002, 10.3.3.3:3003,目的端也是三副本:10.5.5.5:5005, 10.6.6.6:6006, 10.7.7.7:7007。同步模式是全量+增量同步。
則用戶需要修改以下幾個參數:

mongo_urls = mongodb://username:password@10.1.1.1:1001,10.2.2.2:2002,10.3.3.3:3003 #源端連接串信息,逗號分隔不同的mongod sync_mode = all # all 表示全量+增量,full表示僅全量,incr表示僅增量 tunnel.address = mongodb://username:password@10.5.5.5:5005, 10.6.6.6:6006, 10.7.7.7:7007 #目的端連接串信息,逗號分隔不同的mongod incr_sync.mongo_fetch_method = oplog # 如果希望以change stream拉取,該值需要配置change_stream,支持>=4.0.1版本。

2. 從MongoDB副本集同步到MongoDB集群版

假設源同樣是三副本:10.1.1.1:1001, 10.2.2.2:2002, 10.3.3.3:3003,目的端是sharding,有多個mongos:20.1.1.1:2021, 20.2.2.2:2022, 20.3.3.3:3033。

mongo_urls = mongodb://username:password@10.1.1.1:1001,10.2.2.2:2002,10.3.3.3:3003 #源端連接串信息,逗號分隔不同的mongod sync_mode = all # all 表示全量+增量,document表示僅全量,oplog表示僅增量 tunnel.address = mongodb://username:password@20.1.1.1:2021;20.2.2.2:2022;20.3.3.3:3033 #目的端連接串信息,分號分割不同的mongos。也可以只配置部分,配置多個mongos可以做負載均衡寫入。 incr_sync.mongo_fetch_method = oplog # 如果希望以change stream拉取,該值需要配置change_stream,支持>=4.0.1版本。

3. 從MongoDB集群版同步到MongoDB集群版

假設源是2節點:節點1是10.1.1.1:1001, 10.1.1.2:2002, 10.1.1.3:3003;節點2是10.2.2.1:1001, 10.2.2.2:2002, 10.2.2.3:3003,mongos:10.1.1.10:1010。目的端是sharding,有多個mongos:20.1.1.1:2021, 20.2.2.2:2022, 20.3.3.3:3033。

mongo_urls = mongodb://username1:password1@10.1.1.1:1001,10.1.1.2:2002,10.1.1.3:3003;mongodb://username2:password2@10.2.2.1:1001,10.2.2.2:2002,10.2.2.3:3003 #源端連接串信息,逗號分隔同一個shard不同的mongod,分號分隔不同的shard。 mongo_cs_url = mongodb://username1:password1@10.5.5.5:5555,10.5.5.6:5556 # 如果源端是sharding,此處需要配置源端sharding的cs的地址 mongo_s_url = mongodb://username_s:password_s@10.1.1.10:1010 # 如果希望采用change stream拉取,則還需要配置mongos的地址,一個就夠了 sync_mode = all # all 表示全量+增量,document表示僅全量,oplog表示僅增量 tunnel.address = mongodb://username:password@20.1.1.1:2021;20.2.2.2:2022;20.3.3.3:3033 #目的端連接串信息,分號分割不同的mongos。 incr_sync.mongo_fetch_method = oplog # 如果希望以change stream拉取,該值需要配置change_stream,支持>=4.0.1版本。

4. 從MongoDB副本集同步到kafka通道

假設源同樣是三副本:10.1.1.1:1001, 10.2.2.2:2002, 10.3.3.3:3003,目的kafka是50.1.1.1:6379,topic是test。

mongo_urls = mongodb://username:password@10.1.1.1:1001,10.2.2.2:2002,10.3.3.3:3003 #源端連接串信息,逗號分隔不同的mongod sync_mode = incr # 如果目的端不是mongodb,僅支持增量同步模式 tunnel = kafka tunnel.address = mongoshake_test@192.168.18.129:9092 #topic@ip:port incr_sync.mongo_fetch_method = oplog # 如果希望以change stream拉取,該值需要配置change_stream,支持>=4.0.1版本。

5. 雲上MongoDB副本集的雙向同步

雲上副本集的雙向同步可以參考副本集的單向同步,但是需要注意的有以下幾點,假設A和B之間雙向同步:

  1. 需要搭建2個mongoshake,一個從A到B,另一個從B到A
  2. 兩條mongoshake不能同時用全量+增量(all)模式,正常應該是一個庫為空(假設B),另一個有數據。那么從A到B先發起一次全量+增量同步,等待全量同步完畢以后,再從B到A發起一次增量同步。
  3. 雙向同步需要依賴gid的開啟,這個可以聯系燭昭(通過售后聯系),開啟gid將會重啟實例造成秒級別閃斷。
  4. gid用於記錄數據的產生地,比如從A產生的數據導入到B以后,不會被再導入回A,這樣就不會產生環形復制。需要注意的是,這個gid只能用於增量,這也是第2條為什么一個方向通道是全量+增量,另一個方向通道需要搭建增量的原因。
  5. 雲下開源的mongodb不能使用雙向同步,因為gid的修改是在內核里面,所以開源不支持。
  6. sharding同樣也支持雙向同步

6. 部分配置文件參數解釋

v2.4開始的參數請參考github wiki配置參數說明,下面是2.2及之前的參數。具體請查看配置文件的注釋,此處只做簡單解釋

  • mongo_urls: 源mongodb的連接地址
  • mongo_connect_mode: 源端連接的模式,有幾種模式可選:從seconary拉取;從primary拉取;secondary優先拉取;單節點拉取
  • sync_mode: sync模式,有幾種模式可選:全量,增量,全量+增量
  • http_profile: 提供restful接口,用戶可以查看一些內部運行情況,也可以對接監控。
  • system_profile: profile端口,可以查看進程運行的堆棧情況。
  • log: log日志相關參數。
  • filter.namespace.black: 黑名單過濾。黑名單內的庫表不會被同步,剩下的同步。
  • filter.namespace.white: 白名單過濾。白名單內的庫表會被同步,剩下的過濾掉。黑白名單最多只能配置一個,不配置會同步所有庫表。
  • filter.pass.special.db: 有些特別的庫表會被過濾,如admin,local, config庫,如果一定要開啟,可以在這里進行配置。
  • oplog.gids: 用於雲上雙向同步。
  • shard_key: 內部對數據多線程的哈希方式,默認collection表示按表級別進行哈希。
  • worker: 增量階段並發寫入的線程數,如果增量階段性能不夠,可以提高這個配置。
  • worker內部相關配置: worker.batch_queue_size, adaptive.batching_max_size, fetcher.buffer_capacity, 關於內部隊列的相關配置,具體請參考github wiki文檔。
  • worker.oplog_compressor: 壓縮模式,如果是非direct模式開啟這個可以減少網絡傳輸的開銷。
  • tunnel.address: 目的端對接的地址。
  • context.storage: checkpoint存儲的位置,database表示把數據存入MongoDB,api表示把數據存入用戶自己提供的http接口。
  • context.storage.url: checkpoint寫入到哪個MongoDB,如果源是sharding,此處配置cs地址,checkpoint會寫入admin庫;如果是副本集,不配置,會默認寫入源庫,配置則寫入配置的庫里面。
  • context.address: checkpoint寫入的表的名字。
  • context.start_position: checkpoint啟動開始拉取的增量時間位點。如果本身checkpoint已經存在(參考上述context的位置),那么則按照context信息進行拉取,如果不存在,則按照這個位點進行增量拉取。
  • master_quorum: 如果以主備模式拉取同一個源,則這個參數需要啟用。
  • transform.namespace: 命名空間的轉換,a.b:c.d表示把源端a庫下面的c表同步到目的端c庫下面的d表。
  • replayer.dml_only: 默認不同步DDL,false表示同步DDL。DDL包括建表,刪庫,建索引等語句。
  • replayer.executor.upsert: 目的端如果update語句對應的主鍵id不存在,是否將update語句更改為insert語句。
  • replayer.executor.insert_on_dup_update: 目的端如果insert語句對應的主鍵id已經存在,是否將insert語句更改為update語句。
  • replayer.conflict_write_to: 對於寫入沖突的情況,是否需要記錄沖突的文檔。
  • replayer.durable: 測試選項,false表示取消寫入,只用於拉取調試。
  • replayer.collection_parallel: 全量同步按表並發的並發度。
  • replayer.document_parallel: 全量同步同一個表內並發寫入的線程數。
  • replayer.document_batch_size: 全量同步一次性batch的大小。
  • replayer.collection_drop: 如果目的庫表存在,是否先刪除目的庫再進行同步。

 



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM