當我們使用 Elasticsearch-py 批量插入數據到 ES 的時候,我們常常使用它的 helpers模塊里面的bulk函數。其使用方法如下:
from elasticsearch import helpers, Elasticsearch es = Elasticsearch(xxx) def generator(): datas = [1, 2, 3] for data in datas: yield { '_id': "xxx", '_source': { 'age': data } } helpers.bulk(es, index='xxx', generator(), doc_type='doc',)
但這種方式有一個問題,它默認相當於upsert操作。如果_id 對應的文檔已經在 ES 里面了,那么數據會被更新。如果_id 對應的文檔不在 ES 中,那么就插入。
如果我想實現,不存在就插入,存在就跳過怎么辦?此時就需要在文檔里面添加_op_type指定操作類型為create:
from elasticsearch import helpers, Elasticsearch es = Elasticsearch(xxx) def generator(): datas = [1, 2, 3] for data in datas: yield { '_op_type': 'create', '_id': "xxx", '_source': { 'age': data } } helpers.bulk(es, generator(), index='xxx', doc_type='doc')
此時,如果_id 對應的文檔不在 ES 中,那么就會正常插入,如果ES里面已經有_id對應的數據了,那么就會報錯。由於bulk一次性默認插入500條數據,假設其中有2條數據已經存在了,那么剩下的498條會被正常插入。然后程序報錯退出,告訴你有兩條寫入失敗,因為已經存在。
如果你不想讓程序報錯終止,那么可以增加2個參數:
helpers.bulk(es, generator(), index='xxx', doc_type='doc', raise_on_exception=False, raise_on_error=False)
其中raise_on_exception=False表示在插入數據失敗時,不需要拋出異常。raise_on_error=False表示不拋出BulkIndexError。
