Elasticsearch批量插入時,存在就不插入


當我們使用 Elasticsearch-py 批量插入數據到 ES 的時候,我們常常使用它的 helpers模塊里面的bulk函數。其使用方法如下:

from elasticsearch import helpers, Elasticsearch

es = Elasticsearch(xxx)

def generator():
    datas = [1, 2, 3]
    for data in datas:
        yield {
            '_id': "xxx",
            '_source': {
                'age': data
            }
        }

helpers.bulk(es,
index='xxx',
generator(),
doc_type='doc',)

但這種方式有一個問題,它默認相當於upsert操作。如果_id 對應的文檔已經在 ES 里面了,那么數據會被更新。如果_id 對應的文檔不在 ES 中,那么就插入。

如果我想實現,不存在就插入,存在就跳過怎么辦?此時就需要在文檔里面添加_op_type指定操作類型為create:

from elasticsearch import helpers, Elasticsearch

es = Elasticsearch(xxx)

def generator():
    datas = [1, 2, 3]
    for data in datas:
        yield {
            '_op_type': 'create',
            '_id': "xxx",
            '_source': {
                'age': data
            }
        }

helpers.bulk(es,
generator(),
index='xxx',
doc_type='doc')

此時,如果_id 對應的文檔不在 ES 中,那么就會正常插入,如果ES里面已經有_id對應的數據了,那么就會報錯。由於bulk一次性默認插入500條數據,假設其中有2條數據已經存在了,那么剩下的498條會被正常插入。然后程序報錯退出,告訴你有兩條寫入失敗,因為已經存在。

如果你不想讓程序報錯終止,那么可以增加2個參數:

helpers.bulk(es,
    generator(),
    index='xxx',
    doc_type='doc',
    raise_on_exception=False,               raise_on_error=False)

其中raise_on_exception=False表示在插入數據失敗時,不需要拋出異常。raise_on_error=False表示不拋出BulkIndexError

 

轉自:https://mp.weixin.qq.com/s?src=11&timestamp=1579108111&ver=2098&signature=ZXtHL4GJONIJr9lN3KD*vHKfeujxkmmrWRnFl3Pfyu0DENxKPlybBsPaIlcjfiy5woHNz-v8oWES6FQP5e8j3yTKJWCL2qLRbCRtWb6NLlHvLjyJvELSPyG0dXhv1sR6&new=1


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM