python urllib2導出elasticsearch數據時 返回 "urllib2.HTTPError: HTTP Error 500: Internal Server Error"


0、業務場景

將ES中某個index的某個字段的所有數據,導出到文件中

 

1、ES數據導出方法簡述

ES數據導出方法,我主要找到了以下幾個方面,歡迎大家補充:

  • ES官方API:snapshot and restore module

The snapshot and restore module allows to create snapshots of individual indices or an entire cluster into a remote repository like shared file system, S3, or HDFS. These snapshots are great for backups because they can be restored relatively quickly but they are not archival because they can only be restored to versions of Elasticsearch that can read the index.

   簡而言之,是個對ES集群的鏡像化以及快速回復的工具。不滿足本次需求的針對某個字段輸出的要求,所以不再繼續看。感興趣的同學可以查看 Elasticsearch Reference [5.0] » Modules » Snapshot And Restore

  • ES的Java API:

   雖說Java大法是我用的最多的編程語言,但是linux上運行Java腳本實在麻煩。拋出一個Java ES導出文件的鏈接,感興趣的同學請自便:elasticsearch使用Java API批量數據導入和導出

  • ES的Python API:

   回歸正題,Google搜“elasticsearch導出數據”的第一匹配結果,是一個Python腳本寫的,鏈接是:lein-wang/elasticsearch_migrate

#!/usr/bin/python
#coding:utf-8
'''
    Export and Import ElasticSearch Data.
    Simple Example At __main__
    @author: wgzh159@163.com
    @modifier: lzkhit@163.com
    @note:  uncheck consistency of data, please do it by self
'''


import json
import os
import sys
import time
import urllib2

reload(sys)
sys.setdefaultencoding('utf-8')

class exportEsData():
    size = 10000
    def __init__(self, url,index,type,target_index):
        self.url = url+"/"+index+"/"+type+"/_search"
        self.index = index
        self.type = type
        self.target_index = target_index #替換原有的index
        self.file_name = self.target_index+"_"+self.type+".json"
    def exportData(self):
        print("export data begin...\n")
        begin = time.time()
        try:
            os.remove(self.file_name)
        except:
            os.mknod(self.file_name)
        msg = urllib2.urlopen(self.url).read()
        #print(msg)
        obj = json.loads(msg)
        num = obj["hits"]["total"]
        start = 0
        end =  num/self.size+1 # read size data one bulk
        while(start<end):
            try:
                msg = urllib2.urlopen(self.url+"?from="+str(start*self.size)+"&size="+str(self.size)).read()
                self.writeFile(msg)
                start=start+1
            except urllib2.HTTPError, e:
                print 'There was an error with the request'
                print e
                break
        print(start)
        print("export data end!!!\n total consuming time:"+str(time.time()-begin)+"s")
    def writeFile(self,msg):
        obj = json.loads(msg)
        vals = obj["hits"]["hits"]
        try:
            cnt = 0
            f = open(self.file_name,"a")
            for val in vals:
                val_json = val["_source"]["content"]
                f.write(str(val_json)+"\n")
                cnt += 1
        finally:
            print(cnt)
            f.flush()
            f.close()

class importEsData():
    def __init__(self,url,index,type):
        self.url = url
        self.index = index
        self.type = type
        self.file_name = self.index+"_"+self.type+".json"
    def importData(self):
        print("import data begin...\n")
        begin = time.time()
        try:
            s = os.path.getsize(self.file_name)
            f = open(self.file_name,"r")
            data = f.read(s)
            #此處有坑: 注意bulk操作需要的格式(以\n換行)
            self.post(data)
        finally:
            f.close()
        print("import data end!!!\n total consuming time:"+str(time.time()-begin)+"s")
    def post(self,data):
        print data
        print self.url
        req = urllib2.Request(self.url,data)
        r = urllib2.urlopen(req)
        response = r.read()
        print response
        r.close()

if __name__ == '__main__':
    '''
        Export Data
        e.g.
                            URL                    index        type
        exportEsData("http://10.100.142.60:9200","watchdog","mexception").exportData()
        
        export file name: watchdog_mexception.json
    '''
    exportEsData("http://88.88.88.88:9200","mtnews","articles","corpus").exportData()
    
    '''
        Import Data
        
        *import file name:watchdog_test.json    (important)
                    "_" front part represents the elasticsearch index
                    "_" after part represents the  elasticsearch type
        e.g.
                            URL                    index        type
        mportEsData("http://10.100.142.60:9200","watchdog","test").importData()
    '''
    #importEsData("http://10.100.142.60:9200","watchdog","test").importData()
    #importEsData("http://127.0.0.1:9200/_bulk","chat","CHAT").importData()
    #importEsData("http://127.0.0.1:9200/_bulk","chat","TOPIC").importData()

3、遇到的問題

萬事俱備,python run代碼后,出現了問題:

"urllib2.HTTPError: HTTP Error 500: Internal Server Error"

而且根據程序中的doc count計數信息,發現不論bulk size如何變(嘗試了10/50/100/500/1000/5000/10000),總是卡在了第10000篇文檔,然后urllib就拋異常。

同事黃大哥分析原因,可能是以下幾個方面:

  • 沒有平衡bulk的速率,生產多,超過了消費能力,超過了es服務端的TPS (這里黃大哥按照人生經驗建議一個bulk在5~15MB最合適)
  • 系統端問題,需查看日志

首先,通過在while循環里面增加sleep語句並減少bulk size,降低ES的TPS,但是仍然在10000篇文檔導出的時候出現了 HTTP STATUS 500 的錯誤,此法不通。

第二種原因,這時候需登錄ES宿主機查看log。

發現log中有如下信息,

Caused by: QueryPhaseExecutionException[Result window is too large, from + size must be less than or equal to: [10000] but was [11000]. 
See the scroll api for a more efficient way to request lar ge data sets. This limit can be set by changing the [index.max_result_window]
index level parameter.]

正如 urllib2中HTTP狀態碼含義 一文中的

“5XX 回應代碼以“5”開頭的狀態碼表示服務器端發現自己出現錯誤,不能繼續執行請求”

確實是服務器端的問題。

4、解決的方法

言歸正傳,這個問題既然定位了,那么解決方法肯定是有的,參考 ES報錯Result window is too large問題處理

需要對對應index在配置上,做如下定義:

curl -XPUT http://88.88.88.88:9200/mtnews/_settings -d '{ "index" : { "max_result_window" : 10000000}}'

對log中提示的 index.max_result_window 字段進行修改(默認的為10000)

5、ES學習的經驗

  • 發現問題要及時看日志,這樣可以節約時間 23333


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM