數據分析python技能之es數據提取


https://www.jianshu.com/p/3c17561691a5

 

Elasticsearch在NoSQL和時間序列的數據存儲中占的比重越來越大。

Elasticsearch 公司的產品棧非常全面,打通數據采集,傳遞,存儲,展示,而且部署簡單快速,半天時間就可以搭建一套完整的POC出來。

 
 

目前大數據當道,數據的結構變化越來越快,越來越多的公司把原始數據存儲在ES中,數據經過二次處理后在存儲的mysql等結構化的數據庫中。

作為數據分析師,平時和ES打交道的時間越來越多,除了對ES的查詢語法熟悉之外,還需要會使用python從ES中提取自己想要的數據。

這里記錄的便是基於es的python客戶端來從es中提取超過10000條記錄的方法。

默認ES 查詢返回的記錄數為10000,當然這個數字可以通過修改ES的配置來變大或者變小。但是作為數據分析師,一般不會有ES修改配置的權限。


import json from elasticsearch import Elasticsearch hosts = [] es = Elasticsearch(hosts=hosts) indices = ['indice0', 'indice1'] # Initialize the scroll page = es.search( index=','.join(indices), doc_type='demo', scroll='2m', search_type='scan', size=1000, q='user_id:123 AND type:user' # 填寫 Kibana 搜索欄里的 Lucene 查詢語法字符串 ) sid = page['_scroll_id'] scroll_size = page['hits']['total'] print 'total scroll_size: ', scroll_size l = [] # Start scrolling while scroll_size > 0: print "Scrolling..." page = es.scroll(scroll_id=sid, scroll='2m') # Update the scroll ID sid = page['_scroll_id'] # Get the number of results that we returned in the last scroll scroll_size = len(page['hits']['hits']) print "scroll size: " + str(scroll_size) # Do something with the obtained page docs = page['hits']['hits'] l += [x['_source'] for x in docs] print 'total docs: ', len(l) file_path = 'demo.json' with open(file_path, 'wb') as f: json.dump(l, f, indent=2) 

可以對比打印出來的doc數量與scroll size便可以檢查是否全部記錄都提取出來了。最后將數據存儲到json文件中。

基於ES提供的python 客戶端的方式可以提取的數量不要超過100萬行,否則很容易超時失敗。應該跟底層的http庫有關系。

要從一個Index中提取超過千萬行的數據,最佳實踐是基於Java的客戶端或者ES提供的Hadoop庫,或者使用Python自己構造http請求,處理錯誤信息。


本系列文章均為實際工作中遇到的場景,以此記錄下來,共同進步,更愉悅的工作。



作者:接地氣的大仙兒
鏈接:https://www.jianshu.com/p/3c17561691a5
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM