0、業務場景
將ES中某個index的某個字段的所有數據,導出到文件中
1、ES數據導出方法簡述
ES數據導出方法,我主要找到了以下幾個方面,歡迎大家補充:
- ES官方API:snapshot and restore module
The snapshot and restore module allows to create snapshots of individual indices or an entire cluster into a remote repository like shared file system, S3, or HDFS. These snapshots are great for backups because they can be restored relatively quickly but they are not archival because they can only be restored to versions of Elasticsearch that can read the index.
簡而言之,是個對ES集群的鏡像化以及快速回復的工具。不滿足本次需求的針對某個字段輸出的要求,所以不再繼續看。感興趣的同學可以查看
- ES的Java API:
雖說Java大法是我用的最多的編程語言,但是linux上運行Java腳本實在麻煩。拋出一個Java ES導出文件的鏈接,感興趣的同學請自便:elasticsearch使用Java API批量數據導入和導出
- ES的Python API:
回歸正題,Google搜“elasticsearch導出數據”的第一匹配結果,是一個Python腳本寫的,鏈接是:lein-wang/elasticsearch_migrate
#!/usr/bin/python #coding:utf-8 ''' Export and Import ElasticSearch Data. Simple Example At __main__ @author: wgzh159@163.com @modifier: lzkhit@163.com @note: uncheck consistency of data, please do it by self ''' import json import os import sys import time import urllib2 reload(sys) sys.setdefaultencoding('utf-8') class exportEsData(): size = 10000 def __init__(self, url,index,type,target_index): self.url = url+"/"+index+"/"+type+"/_search" self.index = index self.type = type self.target_index = target_index #替換原有的index self.file_name = self.target_index+"_"+self.type+".json" def exportData(self): print("export data begin...\n") begin = time.time() try: os.remove(self.file_name) except: os.mknod(self.file_name) msg = urllib2.urlopen(self.url).read() #print(msg) obj = json.loads(msg) num = obj["hits"]["total"] start = 0 end = num/self.size+1 # read size data one bulk while(start<end): try: msg = urllib2.urlopen(self.url+"?from="+str(start*self.size)+"&size="+str(self.size)).read() self.writeFile(msg) start=start+1 except urllib2.HTTPError, e: print 'There was an error with the request' print e break print(start) print("export data end!!!\n total consuming time:"+str(time.time()-begin)+"s") def writeFile(self,msg): obj = json.loads(msg) vals = obj["hits"]["hits"] try: cnt = 0 f = open(self.file_name,"a") for val in vals: val_json = val["_source"]["content"] f.write(str(val_json)+"\n") cnt += 1 finally: print(cnt) f.flush() f.close() class importEsData(): def __init__(self,url,index,type): self.url = url self.index = index self.type = type self.file_name = self.index+"_"+self.type+".json" def importData(self): print("import data begin...\n") begin = time.time() try: s = os.path.getsize(self.file_name) f = open(self.file_name,"r") data = f.read(s) #此處有坑: 注意bulk操作需要的格式(以\n換行) self.post(data) finally: f.close() print("import data end!!!\n total consuming time:"+str(time.time()-begin)+"s") def post(self,data): print data print self.url req = urllib2.Request(self.url,data) r = urllib2.urlopen(req) response = r.read() print response r.close() if __name__ == '__main__': ''' Export Data e.g. URL index type exportEsData("http://10.100.142.60:9200","watchdog","mexception").exportData() export file name: watchdog_mexception.json ''' exportEsData("http://88.88.88.88:9200","mtnews","articles","corpus").exportData() ''' Import Data *import file name:watchdog_test.json (important) "_" front part represents the elasticsearch index "_" after part represents the elasticsearch type e.g. URL index type mportEsData("http://10.100.142.60:9200","watchdog","test").importData() ''' #importEsData("http://10.100.142.60:9200","watchdog","test").importData() #importEsData("http://127.0.0.1:9200/_bulk","chat","CHAT").importData() #importEsData("http://127.0.0.1:9200/_bulk","chat","TOPIC").importData()
3、遇到的問題
萬事俱備,python run代碼后,出現了問題:
"urllib2.HTTPError: HTTP Error 500: Internal Server Error"
而且根據程序中的doc count計數信息,發現不論bulk size如何變(嘗試了10/50/100/500/1000/5000/10000),總是卡在了第10000篇文檔,然后urllib就拋異常。
同事黃大哥分析原因,可能是以下幾個方面:
- 沒有平衡bulk的速率,生產多,超過了消費能力,超過了es服務端的TPS (這里黃大哥按照人生經驗建議一個bulk在5~15MB最合適)
- 系統端問題,需查看日志
首先,通過在while循環里面增加sleep語句並減少bulk size,降低ES的TPS,但是仍然在10000篇文檔導出的時候出現了 HTTP STATUS 500 的錯誤,此法不通。
第二種原因,這時候需登錄ES宿主機查看log。
發現log中有如下信息,
Caused by: QueryPhaseExecutionException[Result window is too large, from + size must be less than or equal to: [10000] but was [11000].
See the scroll api for a more efficient way to request lar ge data sets. This limit can be set by changing the [index.max_result_window]
index level parameter.]
正如 urllib2中HTTP狀態碼含義 一文中的
“5XX 回應代碼以“5”開頭的狀態碼表示服務器端發現自己出現錯誤,不能繼續執行請求”
確實是服務器端的問題。
4、解決的方法
言歸正傳,這個問題既然定位了,那么解決方法肯定是有的,參考 ES報錯Result window is too large問題處理
需要對對應index在配置上,做如下定義:
curl -XPUT http://88.88.88.88:9200/mtnews/_settings -d '{ "index" : { "max_result_window" : 10000000}}'
對log中提示的 index.max_result_window 字段進行修改(默認的為10000)
5、ES學習的經驗
- 發現問題要及時看日志,這樣可以節約時間 23333