https://www.jianshu.com/p/3c17561691a5
Elasticsearch在NoSQL和时间序列的数据存储中占的比重越来越大。
Elasticsearch 公司的产品栈非常全面,打通数据采集,传递,存储,展示,而且部署简单快速,半天时间就可以搭建一套完整的POC出来。

目前大数据当道,数据的结构变化越来越快,越来越多的公司把原始数据存储在ES中,数据经过二次处理后在存储的mysql等结构化的数据库中。
作为数据分析师,平时和ES打交道的时间越来越多,除了对ES的查询语法熟悉之外,还需要会使用python从ES中提取自己想要的数据。
这里记录的便是基于es的python客户端来从es中提取超过10000条记录的方法。
默认ES 查询返回的记录数为10000,当然这个数字可以通过修改ES的配置来变大或者变小。但是作为数据分析师,一般不会有ES修改配置的权限。
import json from elasticsearch import Elasticsearch hosts = [] es = Elasticsearch(hosts=hosts) indices = ['indice0', 'indice1'] # Initialize the scroll page = es.search( index=','.join(indices), doc_type='demo', scroll='2m', search_type='scan', size=1000, q='user_id:123 AND type:user' # 填写 Kibana 搜索栏里的 Lucene 查询语法字符串 ) sid = page['_scroll_id'] scroll_size = page['hits']['total'] print 'total scroll_size: ', scroll_size l = [] # Start scrolling while scroll_size > 0: print "Scrolling..." page = es.scroll(scroll_id=sid, scroll='2m') # Update the scroll ID sid = page['_scroll_id'] # Get the number of results that we returned in the last scroll scroll_size = len(page['hits']['hits']) print "scroll size: " + str(scroll_size) # Do something with the obtained page docs = page['hits']['hits'] l += [x['_source'] for x in docs] print 'total docs: ', len(l) file_path = 'demo.json' with open(file_path, 'wb') as f: json.dump(l, f, indent=2)
可以对比打印出来的doc数量与scroll size便可以检查是否全部记录都提取出来了。最后将数据存储到json文件中。
基于ES提供的python 客户端的方式可以提取的数量不要超过100万行,否则很容易超时失败。应该跟底层的http库有关系。
要从一个Index中提取超过千万行的数据,最佳实践是基于Java的客户端或者ES提供的Hadoop库,或者使用Python自己构造http请求,处理错误信息。
本系列文章均为实际工作中遇到的场景,以此记录下来,共同进步,更愉悦的工作。
作者:接地气的大仙儿
链接:https://www.jianshu.com/p/3c17561691a5
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。