elasticsearch bulk


情景介紹

公司2000W的數據從mysql 遷移至elasticsearch,以提供微服務。本文基於elasticsearch-py bulk操作實現數據遷移。相比於elasticsearch-dump,自由度更大,並能夠進行數據處理。

API 原理

讓我們先來看一下官方文檔給出的栗子

POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }

我們可以通過kibana試驗一下

elasticsearch-py

elasticsearch-py 官方文檔
這里實際上我使用的是es-py的接口,栗子如下

def gendata():
    mywords = ['foo', 'bar', 'baz']
    for word in mywords:
        yield {
            "_index": "mywords",
            "_type": "document",
            "doc": {"word": word}, # field1: "value1"
        }

bulk(es, gendata())

實際操作

涉及到數據讀取,以及批量的大小。一般建議是1000-5000個文檔,如果你的文檔很大,可以適當減少隊列,大小建議是5-15MB,默認不能超過100M

import re

from elasticsearch import Elasticsearch
from elasticsearch import helpers
import pymysql


es = Elasticsearch()
conn = pymysql.connect('127.0.0.1',"root","root","literature",charset='utf8')


def read(conn,tableName):
    cursor = conn.cursor()
    sql = "show columns from {};".format(tableName)
    cursor.execute(sql)
    columns = [i[0] for i in cursor.fetchall()]

    select = "select * from {};".format(tableName)
    nums = cursor.execute(select)
    for i in range(nums):
        yield {k:v for k,v in zip(columns,cursor.fetchone())}


def bulk_insert(d):
    actions = []
    for i in d:
        _id = i.get('id')
        # 數據處理邏輯
        i['autor'] = i.get('autor').split(',')
        i['artkeyword'] = re.sub(r'[\[\]\d]',"",str(i.get('artkeyword',""))).strip(';').split(';')
        i['dateofpublication'] = i.get('dateofpublication').strftime('%Y-%m-%d') # 注意需要將datetime格式轉換成字符串類型
        i['dateofsummery'] = i.get('dateofsummery').strftime('%Y-%m-%d %H:%M:%S') # 注意需要將datetime格式轉換成字符串類型
        #
        action = {
            "_index":"literature",
            "_type":"_doc",
            "_id":_id,
            }
        action.update(i)
        actions.append(action)
        if len(actions) == 500:
            helpers.bulk(es,actions)
            actions = []
    if (len(actions) > 0):
        helpers.bulk(es, actions)


if __name__ == "__main__":
    d = read(conn,"literature_info")
    bulk_insert(d)
    conn.close()



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM