工作中遇到一個需求,要將舊系統的mongodb數據庫全部遷移至新的系統中。新舊系統的數據結構不一致,舊系統設計的是兩張表,新系統是一張。字段也發生了變化。
1、實現方案
連接mongodb數據庫,逐條讀取數據,並重新組裝。最后寫入新庫和ES。
程序實現並不復雜,但有幾個注意的地方,記錄一下。本文沒有詳細講述具體的模塊使用方法,如果需要就自行百度下。這種工具程序的業務定制化程度很高,無法完成通用的任務,僅供參考。
完整的程序到github下載。下載地址
2、結構
conf 配置文件目錄
logs 日志文件目錄
util 工具目錄
data_migrate.py 入庫文件main
mongo_connect.py mongodb連接類
write_es.py ES連接類
3、使用
修改配置文件后,就可以使用了。里邊的數據結構轉換的代碼按照具體的業務邏輯轉換。
全局配置文件
填寫數據庫配置信息
#讀取數據庫配置
[MongodbR]
host =
port =
user =
password =
db = 數據庫1
db2 =
coll = 集合1
coll2 =
#寫入數據庫配置
[MongodbW]
host =
port =
user =
password =
db =
coll =
日志配置文件
需要修改日志文件路徑,報警級別
[handler_fileHandler]
class=logging.handlers.RotatingFileHandler
level=DEBUG
formatter=fmt
args=('./logs/logdemo.log','a',20000,5,'utf-8')
4、代碼解讀
mongodb
mongodb連接超時問題
通常我們查詢的使用res=coondocinfo.find() 一般來說沒有問題,此時出來的只是游標,而不是數據結果集。所以在處理過程中需要長時間保持數據庫的連接,
但默認的mongodb數據庫連接只有十分鍾,如果讀取大量數據的時候十分鍾顯然不夠用,這項目一年的數據量有上億條。所以需要使用:
with coondocinfo.find({"ch": 1,'it': {"$gte": stime, "$lt": etime}},no_cursor_timeout=True) as cursor 這種方式保持mongodb連接,否則會報錯
with coondocinfo.find({"ch": 1,'it': {"$gte": stime, "$lt": etime}},no_cursor_timeout=True) as cursor:
for i in cursor:
logging.info("正在處理的_id是%s" % i['_id'])
#讀取doctext庫,獲取正文數據,已處理成字符串
try:
content = list(coondoctext.find({'_id' : i['_id']},{'content':1,'_id':0}))[0]['content']
except Exception as e:
logging.error(e)
mongodb字段_ID問題
ES
寫入ES
寫入es的時候需要注意,
id = data['_id']如果是mongodb讀出來的數據會有_id這個字段,而es中也會默認給一個id字段,所以需要將數據結構的id刪除掉。否則會報錯。
del data['_id']
add(index='allchannel-iksmart', body=data,id=str(id))
寫入索引
body = {"name": "long", "age": 11,"height": 111}
add(index=index_name,body=body,id=1)
:param index: 索引名稱
:param body:文檔內容
:param id: 是否指定id,如不指定就會使用生成的字符串
MAIN
stime 查詢起始時間
etime 查詢截止時間
formatdata 主邏輯函數
if __name__ == '__main__':
stime = 1577808000
etime = 1580486400
Data_migrate().formatdata(stime=stime,etime=etime)