順序寫入100條
現在我們如果有大量的文檔(例如10000000萬條文檔)需要寫入es的某條索引中,該怎么辦呢?之前學過的一次插入一條肯定不行:
import time
from elasticsearch import Elasticsearch
es = Elasticsearch()
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
print('共耗時約 {:.2f} 秒'.format(time.time() - start))
return res
return wrapper
@timer
def create_data():
""" 寫入數據 """
for line in range(100):
es.index(index='s2', doc_type='doc', body={'title': line})
if __name__ == '__main__':
create_data() # 執行結果大約耗時 7.79 秒
上例為順序向es的s2
索引(該索引已存在)寫入100條文檔,而且值也僅是數字。卻花費了大約7秒左右,這種速度在大量數據的時候,肯定不行。那怎么辦呢?
批量寫入100條
現在,來介紹一種批量寫入的方式:
import time
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch()
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
print('共耗時約 {:.2f} 秒'.format(time.time() - start))
return res
return wrapper
@timer
def create_data():
""" 寫入數據 """
for line in range(100):
es.index(index='s2', doc_type='doc', body={'title': line})
@timer
def batch_data():
""" 批量寫入數據 """
action = [{
"_index": "s2",
"_type": "doc",
"_source": {
"title": i
}
} for i in range(10000000)]
helpers.bulk(es, action)
if __name__ == '__main__':
# create_data()
batch_data() # MemoryError
我們通過elasticsearch模塊導入helper
,通過helper.bulk
來批量處理大量的數據。首先我們將所有的數據定義成字典形式,各字段含義如下:
_index
對應索引名稱,並且該索引必須存在。_type
對應類型名稱。_source
對應的字典內,每一篇文檔的字段和值,可有有多個字段。
首先將每一篇文檔(組成的字典)都整理成一個大的列表,然后,通過helper.bulk(es, action)
將這個列表寫入到es對象中。
然后,這個程序要執行的話——你就要考慮,這個一千萬個元素的列表,是否會把你的內存撐爆(MemoryError
)!很可能還沒到沒到寫入es那一步,卻因為列表過大導致內存錯誤而使寫入程序崩潰!很不幸,我的程序報錯了。下圖是我在生成列表的時候,觀察任務管理器的進程信息,可以發現此時Python消耗了大量的系統資源,而運行es實例的Java虛擬機卻沒什么變動。
解決辦法是什么呢?我們可以分批寫入,比如我們一次生成長度為一萬的列表,再循環着去把一千萬的任務完成。這樣, Python和Java虛擬機達到負載均衡。
下面的示例測試10萬條數據分批寫入的速度:
import time
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch()
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
print('共耗時約 {:.2f} 秒'.format(time.time() - start))
return res
return wrapper
@timer
def batch_data():
""" 批量寫入數據 """
# 分批寫
# for i in range(1, 10000001, 10000):
# action = [{
# "_index": "s2",
# "_type": "doc",
# "_source": {
# "title": k
# }
# } for k in range(i, i + 10000)]
# helpers.bulk(es, action)
# 使用生成器
for i in range(1, 100001, 1000):
action = ({
"_index": "s2",
"_type": "doc",
"_source": {
"title": k
}
} for k in range(i, i + 1000))
helpers.bulk(es, action)
if __name__ == '__main__':
# create_data()
batch_data()
注釋的內容是使用列表完成,然后使用生成器完成。結果耗時約93.53 秒。
較勁,我就想一次寫入一千萬條
經過灑家多年臨床經驗發現,程序員為什么掉頭發?都是因為愛較勁!
上面的例子已經不錯了,但是仔細觀察,還是使用了兩次for循環,但是代碼可否優化,答案是可以的,我們直接使用生成器:
import time
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch()
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
print('共耗時約 {:.2f} 秒'.format(time.time() - start))
return res
return wrapper
@timer
def gen():
""" 使用生成器批量寫入數據 """
action = ({
"_index": "s2",
"_type": "doc",
"_source": {
"title": i
}
} for i in range(100000))
helpers.bulk(es, action)
if __name__ == '__main__':
# create_data()
# batch_data()
gen()
我們將生成器交給es去處理,這樣,Python的壓力更小了,你要說Java虛擬機不是壓力更大了,無論是分批處理還是使用生成器,虛擬機的壓力都不小,寫入操作本來就耗時嘛!上例測試結果大約是耗時90秒鍾,還行,一千萬的任務還是留給你去測試吧!
歡迎斧正,that's all