monstache同步mongo數據到es並保證高可用


monstache同步mongo數據到es並保證高可用

需求 & 問題描述

  • 我們需要將MongoDB的數據實時同步到Elasticsearch中(包括數據變更),在評估了AWS DMS和Monstache之后,暫定選擇Monstache插件同步數據

什么是Monstache?

實踐

  • monstache是通過配置文件啟動的,配置參數比較豐富

    monstache 啟動需要指定配置文件並取名config.toml,如下

    # connection settings
    # print detailed information including request traces
    #啟用調試日志,這項要放在最上面,否則日志打印不到文件
    verbose = true
    # connect to MongoDB using the following URL
    # 指定mongo 連接地址,一定要搭建mongodb集群
    mongo-url = "mongodb://*:27021"
    #"mongodb://root:<your_mongodb_password>@dds-bp1aadcc629******.mongodb.rds.aliyuncs.com:3717"
    # connect to the Elasticsearch REST API at the following node URLs
    # 指定es 連接地址
    elasticsearch-urls = ["http://localhost:9200"]
    
    # frequently required settings
    # if you need to seed an index from a collection and not just listen and sync changes events
    # you can copy entire collections or views from MongoDB to Elasticsearch
    # 要監聽的mongodb的集合格式是 庫名.集合名
    direct-read-namespaces = ["db.posts"]
    
    # if you want to use MongoDB change streams instead of legacy oplog tailing use change-stream-namespaces
    # change streams require at least MongoDB API 3.6+
    # if you have MongoDB 4+ you can listen for changes to an entire database or entire deployment
    # in this case you usually don't need regexes in your config to filter collections unless you target the deployment.
    # to listen to an entire db use only the database name.  For a deployment use an empty string.
    #change-stream-namespaces = ["mydb.col"]
    
    # additional settings
    
    # if you don't want to listen for changes to all collections in MongoDB but only a few
    # e.g. only listen for inserts, updates, deletes, and drops from mydb.mycollection
    # this setting does not initiate a copy, it is only a filter on the change event listener
    #namespace-regex = '^db\.posts$'
    # compress requests to Elasticsearch
    #gzip = true
    # generate indexing statistics
    #stats = true
    # index statistics into Elasticsearch
    #index-stats = true
    # use the following PEM file for connections to MongoDB
    #mongo-pem-file = "/path/to/mongoCert.pem"
    # disable PEM validation
    #mongo-validate-pem-file = false
    # use the following user name for Elasticsearch basic auth
    elasticsearch-user = "elastic"
    # use the following password for Elasticsearch basic auth
    #elasticsearch-password = "<your_es_password>"
    # use 8 go routines concurrently pushing documents to Elasticsearch
    #monstache最多開幾個線程同步到es,默認為4
    elasticsearch-max-conns = 8
    # use the following PEM file to connections to Elasticsearch
    #elasticsearch-pem-file = "/path/to/elasticCert.pem"
    # validate connections to Elasticsearch
    #elastic-validate-pem-file = true
    # propogate dropped collections in MongoDB as index deletes in Elasticsearch
    #mongodb刪除集合或庫時是否同步刪除es中的索引
    dropped-collections = false
    # propogate dropped databases in MongoDB as index deletes in Elasticsearch
    dropped-databases = false
    # do not start processing at the beginning of the MongoDB oplog
    # if you set the replay to true you may see version conflict messages
    # in the log if you had synced previously. This just means that you are replaying old docs which are already
    # in Elasticsearch with a newer version. Elasticsearch is preventing the old docs from overwriting new ones.
    #replay = false
    # resume processing from a timestamp saved in a previous run
    # 從上一個時間點恢復
    resume = true
    # do not validate that progress timestamps have been saved
    #resume-write-unsafe = false
    # override the name under which resume state is saved
    #resume-name = "default"
    # use a custom resume strategy (tokens) instead of the default strategy (timestamps)
    # tokens work with MongoDB API 3.6+ while timestamps work only with MongoDB API 4.0+
    resume-strategy = 0
    # exclude documents whose namespace matches the following pattern
    #namespace-exclude-regex = '^mydb\.ignorecollection$'
    # turn on indexing of GridFS file content
    #index-files = true
    # turn on search result highlighting of GridFS content
    #file-highlighting = true
    # index GridFS files inserted into the following collections
    #file-namespaces = ["users.fs.files"]
    # enable clustering mode
    # 指定monstache集群名,高可用模式中很重要
    cluster-name = 'dev'
    # worker模式
    #workers = ["Tom", "Dick", "Harry"]
    # do not exit after full-sync, rather continue tailing the oplog
    #exit-after-direct-reads = false
    namespace-regex = '^db\.(posts|\$cmd)$'
    
    [[mapping]]
    namespace = "db"
    index = "posts"
    
    #生產環境記錄日志必不可少,monstache默認是輸出到標准輸出的,這里指定它輸出到指定的日志文件(這個也是踩坑踩出來的哦!)
    #[logs]
    #info = "/var/monstache/log/info.log"
    #warn = "/var/monstache/log/wran.log"
    #error = "/var/monstache/log/error.log"
    #trace = "/var/monstache/log/trace.log"
    

    啟動方式 monstache -cluster-name dev -f config.toml,monstache是編譯好的二進制文件,如下圖所示

    cloud 設計文檔 > monstache同步mongo數據到es並保證高可用 > WeChatc129c29d99f2ff642b000817723041cc.png

  • 現在往mongodb寫入一條數據,再去查詢es,如下圖所示


​ 由此,經過實踐,MongoDB對文檔的其他操作同理 ,都會同步到es

  • Monstache的高可用之普通模式和多worker模式,****配置文件里面的cluster-name需要打開,cluster-name="你自定義monstache集群名字"

    1. 基於普通方式

    原理: When cluster-name is given monstache will enter a high availablity mode. Processes with cluster name set to the same value will coordinate. Only one of the processes in a cluster will sync changes. The other processes will be in a paused state. If the process which is syncing changes goes down for some reason one of the processes in paused state will take control and start syncing. See the section high availability for more information. 意思是在一個集群里面,只有一個進程會同步數據,其他進程處於Pausing狀態,如果同步數據進程掛掉,其他的某一個Pausing狀態進程會升級為監聽狀態 相關文檔https://rwynn.github.io/monstache-site/config/
    ****執行命令
    :monstache -cluster-name dev -f config.toml

    在終端連續兩次執行該命令,便啟動了兩個monstache進程,其中一個進程在監聽同步狀態,另一個處於Pausing,如下圖所示

    ​ 上圖為監聽狀態

    ​ 上圖為pausing狀態

    現在我們把正在監聽的進程殺掉,驗證一下處於pausing狀態的進程是否會切換為監聽狀態,如下圖所示

    ​ 上圖驗證了處於pausing狀態的進程已經切換為監聽狀態

    \2. 基於多worker的方式

    原理:workers- You can run multiple monstache processes and distribute the work between them. First configure the names of all the workers in a shared config.toml file. You can run monstache in high availability mode by starting multiple processes with the same value for cluster-name. Each process will join a cluster which works together to ensure that a monstache process is always syncing to Elasticsearch. 意思是多個worker協同工作, 在相同集群名下的所有worker都會同步數據,都不會處於pausing狀態。集群名與worker名相同的進程,如果其中某一個進程處於監聽狀態,另一個會處於pausing狀 態,當處於監聽狀態的進程掛掉之后,同名的進程由pausing狀態升級為監聽狀態。你不能指定works列表之外的worker來啟動進程。 相關文檔 https://rwynn.github.io/monstache-site/advanced/#high-availability

    前提條件:需要在配置文件指定workers : workers = ["Tom", "Dick", "Harry"]

    執行命令:monstache -cluster-name HA -worker Tom -f config.toml

    ​ monstache -cluster-name HA -worker Dick -f config.toml

    ​ monstache -cluster-name HA -worker Harry -f config.toml

驗證:我們往MongoDB同時寫入10000條數據,monstache會hash所有worker,並把文檔id交給某一個worker去執行,如下圖

​ 現在mongodb里面有10000條數據,es等待同步


​ 啟動三個worker,我們發現每個worker都同步相對量的數據

​ 通過查詢es,10000條數據已經同步完畢

Monstache的高可用之普通模式和多worker模式的比較

1 普通模式

優勢:部署相對簡單

劣勢:處理數據較慢,原因是普通模式就只有一個worker在工作,然后指定你想要的goroutine去消費數據(該配置可以彌補多worker的方式)

2 多worker模式

優勢:同步效率近乎實時,因為多worker同時工作,並且每一個worker還可以指定多個goroutine去消費數據,並發能力更高

劣勢:部署相對繁瑣

總結:由於普通模式和多worker模式在同步時間上其實相差不大,普通模式同步一萬數據只需要1.5秒的時間(已在本地驗證)且部署相對簡單,所以最終選擇普通模式

關於monstache的eks部署請參見:https://www.cnblogs.com/agopher/p/15704633.html

本文所引用的文檔:

官方文檔:https://rwynn.github.io/monstache-site/advanced/#high-availability

動手實踐文檔:https://help.aliyun.com/document_detail/171650.html#title-8gf-qh2-3qj


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM