https://www.jianshu.com/p/3c17561691a5
Elasticsearch在NoSQL和時間序列的數據存儲中占的比重越來越大。
Elasticsearch 公司的產品棧非常全面,打通數據采集,傳遞,存儲,展示,而且部署簡單快速,半天時間就可以搭建一套完整的POC出來。

目前大數據當道,數據的結構變化越來越快,越來越多的公司把原始數據存儲在ES中,數據經過二次處理后在存儲的mysql等結構化的數據庫中。
作為數據分析師,平時和ES打交道的時間越來越多,除了對ES的查詢語法熟悉之外,還需要會使用python從ES中提取自己想要的數據。
這里記錄的便是基於es的python客戶端來從es中提取超過10000條記錄的方法。
默認ES 查詢返回的記錄數為10000,當然這個數字可以通過修改ES的配置來變大或者變小。但是作為數據分析師,一般不會有ES修改配置的權限。
import json from elasticsearch import Elasticsearch hosts = [] es = Elasticsearch(hosts=hosts) indices = ['indice0', 'indice1'] # Initialize the scroll page = es.search( index=','.join(indices), doc_type='demo', scroll='2m', search_type='scan', size=1000, q='user_id:123 AND type:user' # 填寫 Kibana 搜索欄里的 Lucene 查詢語法字符串 ) sid = page['_scroll_id'] scroll_size = page['hits']['total'] print 'total scroll_size: ', scroll_size l = [] # Start scrolling while scroll_size > 0: print "Scrolling..." page = es.scroll(scroll_id=sid, scroll='2m') # Update the scroll ID sid = page['_scroll_id'] # Get the number of results that we returned in the last scroll scroll_size = len(page['hits']['hits']) print "scroll size: " + str(scroll_size) # Do something with the obtained page docs = page['hits']['hits'] l += [x['_source'] for x in docs] print 'total docs: ', len(l) file_path = 'demo.json' with open(file_path, 'wb') as f: json.dump(l, f, indent=2)
可以對比打印出來的doc數量與scroll size便可以檢查是否全部記錄都提取出來了。最后將數據存儲到json文件中。
基於ES提供的python 客戶端的方式可以提取的數量不要超過100萬行,否則很容易超時失敗。應該跟底層的http庫有關系。
要從一個Index中提取超過千萬行的數據,最佳實踐是基於Java的客戶端或者ES提供的Hadoop庫,或者使用Python自己構造http請求,處理錯誤信息。
本系列文章均為實際工作中遇到的場景,以此記錄下來,共同進步,更愉悅的工作。
作者:接地氣的大仙兒
鏈接:https://www.jianshu.com/p/3c17561691a5
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。