mongodb數據遷移到新庫並同時寫入ES


工作中遇到一個需求,要將舊系統的mongodb數據庫全部遷移至新的系統中。新舊系統的數據結構不一致,舊系統設計的是兩張表,新系統是一張。字段也發生了變化。

1、實現方案

連接mongodb數據庫,逐條讀取數據,並重新組裝。最后寫入新庫和ES。

程序實現並不復雜,但有幾個注意的地方,記錄一下。本文沒有詳細講述具體的模塊使用方法,如果需要就自行百度下。這種工具程序的業務定制化程度很高,無法完成通用的任務,僅供參考。

完整的程序到github下載。下載地址

2、結構

conf 配置文件目錄
logs 日志文件目錄
util 工具目錄
data_migrate.py  入庫文件main
mongo_connect.py mongodb連接類
write_es.py   ES連接類

3、使用

修改配置文件后,就可以使用了。里邊的數據結構轉換的代碼按照具體的業務邏輯轉換。
全局配置文件

填寫數據庫配置信息

#讀取數據庫配置
[MongodbR]
host =
port =
user =
password =
db =  數據庫1
db2 =
coll =  集合1
coll2 =
#寫入數據庫配置
[MongodbW]
host =
port =
user =
password =
db =
coll =

日志配置文件

需要修改日志文件路徑,報警級別

[handler_fileHandler]
class=logging.handlers.RotatingFileHandler
level=DEBUG
formatter=fmt
args=('./logs/logdemo.log','a',20000,5,'utf-8')

4、代碼解讀

mongodb

mongodb連接超時問題
通常我們查詢的使用res=coondocinfo.find() 一般來說沒有問題,此時出來的只是游標,而不是數據結果集。所以在處理過程中需要長時間保持數據庫的連接,
但默認的mongodb數據庫連接只有十分鍾,如果讀取大量數據的時候十分鍾顯然不夠用,這項目一年的數據量有上億條。所以需要使用:
with coondocinfo.find({"ch": 1,'it': {"$gte": stime, "$lt": etime}},no_cursor_timeout=True) as cursor 這種方式保持mongodb連接,否則會報錯

with coondocinfo.find({"ch": 1,'it': {"$gte": stime, "$lt": etime}},no_cursor_timeout=True) as cursor:
  for i in cursor:
    logging.info("正在處理的_id是%s" % i['_id'])
    #讀取doctext庫,獲取正文數據,已處理成字符串
    try:
      content = list(coondoctext.find({'_id' : i['_id']},{'content':1,'_id':0}))[0]['content']
    except Exception as e:
      logging.error(e)    

mongodb字段_ID問題

ES

寫入ES
寫入es的時候需要注意,

id = data['_id']如果是mongodb讀出來的數據會有_id這個字段,而es中也會默認給一個id字段,所以需要將數據結構的id刪除掉。否則會報錯。
  del data['_id']
  add(index='allchannel-iksmart', body=data,id=str(id))

寫入索引

body = {"name": "long", "age": 11,"height": 111}
add(index=index_name,body=body,id=1)

:param index: 索引名稱
:param body:文檔內容
:param id: 是否指定id,如不指定就會使用生成的字符串

MAIN

stime 查詢起始時間
etime 查詢截止時間
formatdata 主邏輯函數

if __name__ == '__main__':
  stime = 1577808000
  etime = 1580486400
  Data_migrate().formatdata(stime=stime,etime=etime)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM