MongoDB-Elasticsearch 實時數據導入


時間  2017-09-18

 

5 ways to synchronize data from MongoDb to ElasticSearch

https://www.linkedin.com/pulse/5-way-sync-data-from-mongodb-es-kai-hao

 

Elastic search(ES) is a pop-star for recording and analyzing data, and Mongodb is a famous NoSQL database for storing and querying data. 
With our web infrastructure improving, how can we export data from mongodb to ES for searching or analyzing purposes? 
There are 5 possible solutions recommended for your choice.

1.synchronized by web server

We can use Mongoosastic module for storing-in-both-sides purpose when we use Nodejs as a web server container. When one document needs to be stored, 
Mongoosastic can commit the changes to both mongo and ES. As the chart below: Here is the reference link: Mongoosastic. The advantage is that data can be stored in both mongo and ES simultaneously, and the downside is that overhead may be caused in CUD operation efficiency.
 And inconsistent data might be generated when one type of the db store failed. And the server framework is not flexible enough for db migrating.

2.Manually loading data from Mongo to ES

Transporter tool is a good choice to synchronize data once you want to export mongo data to another ES server. Transporter also can export data from or to other type of data store. Reference link is: Transporter.

It's important to know that the transporter synchronizing only once. When the job is done, the transporter comes to its end. 3. Plugin for ES There is a plugin for ES named "elasticsearch-river-mongodb", and was widely used in ES 1.x, but now river mechanism for ES 2.x is deprecated. Reference link is elasticsearch-river-mongodb. 4. JDBC input plugin for logstash We can take advantage of buffering , inputting, outputting and filtering abilities from logstash by adding a mongo input and ES output plugin to get this job done. JDBC input plugin is one of the choices, but it needs JDBC driver support. As I know there is no well-supported-free JDBC driver for mongo. Some trial versions can be found in Unity or Simba. Reference link is : JDBC Plugin for Logstash 5. Mongo-ES connector mongo-connector is a real-time sync service as a package of python. It creates a pipeline from a mongodb cluster to one or more target systems. 
It needs mongo to run in replica-set mode, sync data in mongo to the target then tails the mongo oplog. 
It needs a package named "elastic2_doc_manager" to write data to ES. Process chart below:

Reference link is : github or python.

To recapitulate it,  it is a must to remember: mongo replica set, an opened port and IP for ES, using elastic2_doc_manager if you use ES 2.x. At present, I am not yet ready with any official support in Beats. it will be in the future. So that's all, 5 ways to mongo-ES-sync.

===============================================》》》》》》 

 

 

MongoDB-Elasticsearch 實時數據導入

https://zhuanlan.zhihu.com/p/26906652

 

搜索功能是App必不可少的一部分,我們使用目前比較流行的Elasticsearch進行全文檢索。我們的數據主要存儲在MongoDB中,如何將這些數據導入到Elasticsearch中,並能一直保持同步呢?做法大致分為兩種:

  1. 在應用層操作,在讀寫MongoDB的同時讀寫Elasticsearch,比如mongoosastic,需要修改已有的業務代碼。
  2. 與業務無關,通過讀取MongoDB的replica oplog,將MongoDB產生的操作在Elasticsearch上replay,來實現單向同步

 

為了減少老代碼修改成本,我們選擇了第二種方案,使用mongo-connector來進行數據同步。然而用着用着我們發現mongo-connector有一些問題:

  1. 有些數據需要關聯查詢,但是mongo-connector並不支持parent-child模型(其實有一個fork是支持的,但已經落后主分支一個版本,並且合進主分支的希望渺茫)。
  2. mongo-connector支持斷點續傳,但是恢復速度非常緩慢。
  3. mongo-connector可以設置每次處理的文檔數量,但坑爹的地方在於,到不了設置的數字,它始終不會寫入。比如,MongoDB一個表只有100個文檔,但是設置了batch的size為1000,於是那100個文檔這輩子也同步不到Elasticsearch中了。
  4. mongo-connector不會限速,直接把Elasticsearch寫炸了,但它不會管,接着寫,而且中間丟掉的數據就算后面有oplog里面有update操作,也沒辦法恢復,會報出404錯誤。
  5. 在MongoDB里面存了一張meta表,在Elasticsearch里面也存了一個meta索引,里面存了大量的timestamp,直接使Elasticsearch文檔總數翻倍。
(以上mongo-connector的缺點,如有誹謗,或許是我們不會用,懇請斧正。)

於是我們開始尋找更好用的工具,卻發現沒有好用的工具:

  1. Elasticsearch Rivers,曾經的官方同步工具。但該項目早已廢棄。
  2. Transporter,IBM旗下的Compose公司出品的同步工具。也不支持parent-child relationship,並且項目進度緩慢。
  3. elasticsearch-hadoop,先導到hadoop,再導到Elasticsearch。高射炮打蚊子,繞一大圈,不經濟。

沒辦法,只好自己用TypeScript寫一個,取名為mongo-es。

現已開源至  github ,並發布到了  npm ,歡迎大家多多試用,多挑(ti)毛(xu)病(qiu)。 

mongo-es導入數據分為兩個階段:

  1. Scan:掃描整個MongoDB的collection,每條文檔都插入到Elasticsearch對應的index里面。使用Bulk API,進行批量寫入。在掃描開始前記錄當前的時間點,供第二階段使用。
  2. Tail:從剛才記錄的時間點,或一個指定的時間點開始,將MongoDB的oplog在Elasticsearch上進行replay。使用RxJS的bufferWithTimeOrCount函數,既能批量寫入,又能保證同步延遲不會很長(一般是一秒左右)。

mongo-es比mongo-connector進步的地方有:

  1. 支持parent-child relationship,可以處理需要join的數據。
  2. 可以逆序Scan,先導入最新的數據,這對於出錯后重建索引快速恢復非常有用。
  3. 無需在兩邊存儲多余元數據,只記錄oplog的timestamp。只要程序掛的時間不太長,oplog里面還有這個timestamp,就能恢復。
  4. 遇到缺失文檔自動恢復。當因為不可控因素(如網絡原因),導致某個本應已經同步了的文檔在Elasticsearch中不存在。這時如果oplog里面遇到一個對該文檔的update操作,mongo-connector無法處理,打印出404錯誤。遇到這種情況時,mongo-es會回到MongoDB中,讀取到這個文檔,進行更新。
  5. 有限速功能,能夠限制每秒鍾讀取的文檔數量,避免把Elasticsearch壓垮。

當然了,mongo-connector是一個更加通用的程序,可以把文檔導到更多的地方。mongo-es只是把MongoDB的數據導入到Elasticsearch中,這樣比較未免有些不公平,但就在MongoDB到Elasticsearch這個使用場景下,還是好用不少的。

\開發過程中踩過的坑:

  1. Scan階段使用stream,方便控制讀取速度。Tail階段使用cursor,配合noCursorTimeout參數,避免長時間沒有oplog時的超時錯誤。Tail階段如果用stream,即使是設置了noCursorTimeout,超時了也會報錯。
  2. 對於操作是update的oplog,oplog里面有可能是一個完整的文檔,這時候直接就可以寫入。也有可能是$set或$unset操作,這時候要去Elasticsearch里面取到舊的,完整的文檔,在內存里執行update后再寫入回去。最好不要直接讀MongoDB,以減少MongoDB負擔。
  3. 在內存中執行update時,也要檢查變化的字段是否屬於我們需要的字段。如果變化的都不是需要的字段,可以忽略這次update操作,如果變化的字段不在我們需要的范圍內,則應排除,以減少寫入次數。
  4. 有_parent的文檔是不能直接用_id訪問到的,因為它的routing是_parent,必須指定_parent的值才行。對於操作是update的oplog,我們只能拿到_id,拿不到_parent對應的字段,所以這時要用es.search代替es.get,訪問每個分片,才能拿到文檔。
  5. Timestamp在js代碼里表示時low在前,high在后。在mongo shell里面是反過來的。
  6. Bulk API傳入的body長度不能為0,遇到0的情況要跳過,否則會報錯。

 

=============================================>>>>>>>>>> 

=====================================》》》》》》》》》》》

 

mongo-connector實現MongoDB與elasticsearch實時同步深入詳解

http://blog.csdn.net/laoyang360/article/details/51842822 

 

 

引言:

 

驗證表明:mongo-connector工具支持MongoDB與ES之間的實時增insert、刪delete、改update操作。 
對於歷史數據,mongo-connector工具不能同步到ES中,根因是本身工具不支持(初步界定),還是沒有這種場景,待查(進一步研究后再更新)。

1. mongo-connector 地址:

https://github.com/mongodb-labs/mongo-connector

2、 mongo-connector 工具簡介

mongo-connector工具創建一個從MongoDB簇到一個或多個目標系統的管道,目標系統包括:Solr,Elasticsearch,或MongoDB簇。 
該工具在MongoDB與目標系統間同步數據,並跟蹤MongoDB的oplog,保持操作與MongoDB的實時同步。 
該工具已經在python2.6,2.7,3.3+下進行驗證。 
mongo-connector工具是基於python開發的實時同步服務工具。它要求mongo運行在replica-set模式,且需要 elastic2_doc_manager將數據寫入ES。 
這里寫圖片描述

3、 elastic2-doc-manager 工具簡介

這是Elastic2.x版本的文檔管理器。對應Elastic1.x版本需要使用 elastic-doc-manager。

4、ES與MongoDB同步步驟:

(1)安裝 mongo-connector。

pip install mongo-connector

坑:用上面命令在Company內網可能會出現如下錯誤信息,

 

  Retrying (Retry(total=0, connect=None, read=None, redirect=None)) after connection broken by 'ConnectTimeoutError(<pip._vendor.requests.packages.urllib3.connection.HTTPConnection object at 0x03742DF0>, 'Connection to mirrors.aliyun.com timed out. (connect timeout=15)')': /pypi/simple/mongo-connector/ Could not find a version that satisfies the requirement mongo-connector[elastic5] (from versions: ) No matching distribution found for mongo-connector[elastic5]
經調查后,需要配置pip的代理和鏡像(如果install網速特別慢的話)

 

注:pip為安裝python后可以用到的命令 <

xxx.xxx.x.xx為內網代理ip

 

 

pip install --proxy http://xxx.xxx.x.xx:8000 --index http://mirrors.aliyun.com/pypi/simple/ mongo-connector[elastic5] --trusted-host mirrors.aliyun.com

參考文檔

mongo-connector 2.5.1:https://pypi.python.org/pypi/mongo-connector/

<<

mongodb-labs/mongo-connector / Usage with Elasticsearch:

https://github.com/mongodb-labs/mongo-connector/wiki/Usage-with-Elasticsearch#installation

mongodb-labs/elastic2-doc-manager:

https://github.com/mongodb-labs/elastic2-doc-manager

>>

python pip設置代理:http://blog.csdn.net/dangerousroy/article/details/52924116

Python pip 國內鏡像大全及使用辦法:http://blog.csdn.net/testcs_dn/article/details/54374849

 

(2)安裝 elastic2-doc-manager。

pip install elastic2-doc-manager

注意: 
如果不安裝(2)直接進入(3)、(4)則會報錯:

[root@5b9dbaaa148a bin]# mongo-connector -m 10.8.5.99:27017 -t 10.8.5.101:9200 -d elastic2_doc_manager Logging to mongo-connector.log. Exception in thread Thread-1Traceback (most recent call last)File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner self.run()

(3)mongo端啟動

MongoDB 必須開啟復制集,如果已經開啟請忽略這一步:

如需開啟復制集設置,參考如下步驟

Windows搭建MongoDB分片以及復制集:

http://blog.csdn.net/liangxw1/article/details/78031293

 

  • 【驗證】初始化副本集的配置
  • 28
  • 29
rs0:PRIMARY> rs.status() { "set" : "rs0", "date" : ISODate("2016-07-05T08:50:55.272Z"), "myState" : 1, "term" : NumberLong(1), "heartbeatIntervalMillis" : NumberLong(2000), "members" : [ { "_id" : 0, "name" : "b48eafd69929:27017", "health" : 1, "state" : 1, "stateStr" : "PRIMARY", "uptime" : 115, "optime" : { "ts" : Timestamp(1467708606, 1), "t" : NumberLong(1) }, "optimeDate" : ISODate("2016-07-05T08:50:06Z"), "infoMessage" : "could not find member to sync from", "electionTime" : Timestamp(1467708605, 2), "electionDate" : ISODate("2016-07-05T08:50:05Z"), "configVersion" : 1, "self" : true } ], "ok" : 1 }

(4)ES端同步操作

  • 1
  • 2
[root@5b9dbaaa148a bin]# mongo-connector -m 10.8.5.99:27017 -t 10.8.5.101:9200 -d elastic2_doc_manager Logging to mongo-connector.log.

參數含義: 
-m: mongodb的地址與端口,端口默認為27017。 
-t:ES的地址與端口,端口默認為9200。 
-d:doc manager的名稱,2.x版本為: elastic2-doc-manager。

5、ES與MongoDB Insert插入操作的同步驗證

(1)Mongo端插入數據操作:

#Mongo創建數據庫(對應ES的Index) rs0:PRIMARY> use zhang_index switched to db zhang_index #Mongo中插入數據(其中col_02對應ES中的Type) rs0:PRIMARY> db.col_02.insert({name:"laoluo", birth:"1964-03-21", sex:"man", company:"chuizi"}); WriteResult({ "nInserted" : 1 }) rs0:PRIMARY> db.col_02.insert({name:"renzhengfei", birth:"1954-03-21", sex:"man", company:"huawei"});

(2)Es端檢索驗證

  •  
[root@5b9dbaaa148a test_log]# curl -XGET http://10.8.5.101:9200/zhang_index/col_02/_search?pretty { "took" 4, "timed_out" false, "_shards" : { "total" 8, "successful" 8, "failed" 0 }, "hits" : { "total" 2, "max_score" 1.0, "hits" : [ { "_index" "zhang_index", "_type" "col_02", "_id" "577b7d8ceb8e3dc2d1db12a9", "_score" 1.0, "_source" : { "company" "huawei", "name" "renzhengfei", "birth" "1954-03-21", "sex" "man" } }, { "_index" "zhang_index", "_type" "col_02", "_id" "577b7d4aeb8e3dc2d1db12a7", "_score" 1.0, "_source" : { "company" "chuizi", "name" "laoluo", "birth" "1964-03-21", "sex" "man" } } ] } }

6、 ES與MongoDB Update更新操作的同步驗證

(1)MongoDB的更新update操作

rs0:PRIMARY> db.col_02.update({'name':'laoluo'}, {$set:{'name':'luoyonghao'}}) WriteResult({ "nMatched" 1, "nUpserted" 0, "nModified" 1 }) rs0:PRIMARY> rs0:PRIMARY> db.col_02.find().pretty() { "_id" ObjectId("577b7d4aeb8e3dc2d1db12a7"), "name" "luoyonghao", "birth" "1964-03-21", "sex" "man", "company" "chuizi" } { "_id" ObjectId("577b7d8ceb8e3dc2d1db12a9"), "name" "renzhengfei", "birth" "1954-03-21", "sex" "man", "company" "huawei" }

(2)Es端檢索更新后結果

  •  
[root@5b9dbaaa148a test_log]# curl -XGET http://10.8.5.101:9200/zhang_index/col_02/_search?pretty { "took" 1, "timed_out" false, "_shards" : { "total" 8, "successful" 8, "failed" 0 }, "hits" : { "total" 2, "max_score" 1.0, "hits" : [ { "_index" "zhang_index", "_type" "col_02", "_id" "577b7d8ceb8e3dc2d1db12a9", "_score" 1.0, "_source" : { "company" "huawei", "name" "renzhengfei", "birth" "1954-03-21", "sex" "man" } }, { "_index" "zhang_index", "_type" "col_02", "_id" "577b7d4aeb8e3dc2d1db12a7", "_score" 1.0, "_source" : { "company" "chuizi", "name" "luoyonghao", "birth" "1964-03-21", "sex" "man" } } ] } }

7、 ES與MongoDB delete刪除操作的同步驗證

(1) MongoDB的刪除delete操作

rs0:PRIMARY> db.col_02.remove({'name':'renzhengfei'}) WriteResult({ "nRemoved" 1 }) rs0:PRIMARY> db.col_02.find() { "_id" ObjectId("577b7d4aeb8e3dc2d1db12a7"), "name" "luoyonghao", "birth" "1964-03-21", "sex" "man", "company" "chuizi" } rs0:PRIMARY> db.col_02.find().pretty() { "_id" ObjectId("577b7d4aeb8e3dc2d1db12a7"), "name" "luoyonghao", "birth" "1964-03-21", "sex" "man", "company" "chuizi" }

(2)ES端檢索刪除后結果

結果表明,MongoDB刪除的內容,ES端已經同步刪除。

  •  
[root@5b9dbaaa148a test_log]# curl -XGET http://10.8.5.101:9200/zhang_index/col_02/_search?pretty { "took" 2, "timed_out" false, "_shards" : { "total" 8, "successful" 8, "failed" 0 }, "hits" : { "total" 1, "max_score" 1.0, "hits" : [ { "_index" "zhang_index", "_type" "col_02", "_id" "577b7d4aeb8e3dc2d1db12a7", "_score" 1.0, "_source" : { "company" "chuizi", "name" "luoyonghao", "birth" "1964-03-21", "sex" "man" } } ] } }

這里寫圖片描述

參見詳細介紹:

https://docs.mongodb.com/manual/tutorial/deploy-replica-set/

Mongo與ES同步的5種方式:

https://www.linkedin.com/pulse/5-way-sync-data-from-mongodb-es-kai-hao

常見Bug:

How to setup a MongoDB replica set for the connector? 
https://docs.mongodb.com/manual/tutorial/deploy-replica-set/

 

使用Mongo Connector和Elasticsearch實現模糊匹配 http://www.csdn.net/article/2014-09-02/2821485-how-to-perform-fuzzy-matching-with-mongo-connector? 

 

 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM