如何使用python將Spark數據寫入ElasticSearch


這里以將Apache的日志寫入到ElasticSearch為例,來演示一下如何使用Python將Spark數據導入到ES中。

實際工作中,由於數據與使用框架或技術的復雜性,數據的寫入變得比較復雜,在這里我們簡單演示一下。

如果使用Scala或Java的話,Spark提供自帶了支持寫入ES的支持庫,但Python不支持。所以首先你需要去這里下載依賴的ES官方開發的依賴包包。

下載完成后,放在本地目錄,以下面命令方式啟動pyspark:

     pyspark --jars elasticsearch-hadoop-6.4.1.jar

 

如果你想pyspark使用Python3,請設置環境變量:

 export PYSPARK_PYTHON=/usr/bin/python3

理解如何寫入ES的關鍵是要明白,ES是一個JSON格式的數據庫,它有一個必須的要求。數據格式必須采用以下格式

 
{ "id: { the rest of your json}}

 

往下會展示如何轉換成這種格式。

解析Apache日志文件

我們將Apache的日志文件讀入,構建Spark RDD。然后我們寫一個parse()函數用正則表達式處理每條日志,提取我們需要的字

rdd = sc.textFile("/home/ubuntu/walker/apache_logs")
regex='^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'
 
p=re.compile(regex)

def parse(str):
    s=p.match(str)
    d = {}
    d['ip']=s.group(1)
    d['date']=s.group(4)
    d['operation']=s.group(5)
    d['uri']=s.group(6)
    return d  

 

換句話說,我們剛開始從日志文件讀入RDD的數據類似如下:

['83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"']

 

然后我們使用map函數轉換每條記錄:

rdd2 = rdd.map(parse)

rdd2.take(1)

[{'date': '17/May/2015:10:05:03 +0000', 'ip': '83.149.9.216', 'operation': 'GET', 'uri': '/presentations/logstash-monitorama-2013/images/kibana-search.png'}]

 

現在看起來像JSON,但並不是JSON字符串,我們需要使用json.dumps將dict對象轉換。

我們同時增加一個doc_id字段作為整個JSON的ID。在配置ES中我們增加如下配置“es.mapping.id”: “doc_id”告訴ES我們將這個字段作為ID。

這里我們使用SHA算法,將這個JSON字符串作為參數,得到一個唯一ID。
計算結果類似如下,可以看到ID是一個很長的SHA數值。

rdd3.take(1)

[('a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c', '{"date": "17/May/2015:10:05:03 +0000", "ip": "83.149.9.216", "operation": "GET", "doc_id": "a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c", "uri": "/presentations/logstash-monitorama-2013/images/kibana-search.png"}')]

 

 

現在我們需要制定ES配置,比較重要的兩項是:

  • “es.resource” : ‘walker/apache’: "walker"是索引,apache是類型,兩者一般合稱索引
  • “es.mapping.id”: “doc_id”: 告訴ES那個字段作為整個文檔的ID,也就是查詢結果中的_id
    其他的配置自己去探索。

然后我們使用saveAsNewAPIHadoopFile()將RDD寫入到ES。這部分代碼對於所有的ES都是一樣的,比較固定,不需要理解每一個細節

es_write_conf = {
        "es.nodes" : "localhost",
        "es.port" : "9200",
        "es.resource" : 'walker/apache',
        "es.input.json": "yes",
        "es.mapping.id": "doc_id"
    }
       
rdd3.saveAsNewAPIHadoopFile(
        path='-',
     outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",       keyClass="org.apache.hadoop.io.NullWritable",
        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
        conf=es_write_conf)

rdd3 = rdd2.map(addID)

def addId(data):
    j=json.dumps(data).encode('ascii', 'ignore')
    data['doc_id'] = hashlib.sha224(j).hexdigest()
    return (data['doc_id'], json.dumps(data))

 

最后我們可以使用curl進行查詢

curl http://localhost:9200s/walker/apache/_search?pretty=true&?q=*


{
        "_index" : "walker",
        "_type" : "apache",
        "_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2",
        "_score" : 1.0,
        "_source" : {
          "date" : "17/May/2015:10:05:32 +0000",
          "ip" : "91.177.205.119",
          "operation" : "GET",
          "doc_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2",
          "uri" : "/favicon.ico"
        }

 

如下是所有代碼:

import json
import hashlib
import re

def addId(data):
    j=json.dumps(data).encode('ascii', 'ignore')
    data['doc_id'] = hashlib.sha224(j).hexdigest()
    return (data['doc_id'], json.dumps(data))

def parse(str):
    s=p.match(str)
    d = {}
    d['ip']=s.group(1)
    d['date']=s.group(4)
    d['operation']=s.group(5)
    d['uri']=s.group(6)
    return d    

regex='^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'

p=re.compile(regex)

rdd = sc.textFile("/home/ubuntu/walker/apache_logs")

rdd2 = rdd.map(parse)

rdd3 = rdd2.map(addID)

es_write_conf = {
        "es.nodes" : "localhost",
        "es.port" : "9200",
        "es.resource" : 'walker/apache',
        "es.input.json": "yes",
        "es.mapping.id": "doc_id"
    }
     
rdd3.saveAsNewAPIHadoopFile(
        path='-',
     outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",       keyClass="org.apache.hadoop.io.NullWritable",
        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
        conf=es_write_conf)
        

 

也可以這么封裝,其實原理是一樣的

import hashlib
import json
from pyspark import Sparkcontext

def make_md5(line):
    md5_obj=hashlib.md5()
    md5_obj.encode(line)
    return md5_obj.hexdigest()

def parse(line):
    dic={}
    l = line.split('\t')
    doc_id=make_md5(line)
    dic['name']=l[1]
    dic['age'] =l[2]
    dic['doc_id']=doc_id
    return dic   #記得這邊返回的是字典類型的,在寫入es之前要記得dumps

def saveData2es(pdd, es_host, port,index, index_type, key):
    """
    把saprk的運行結果寫入es
    :param pdd: 一個rdd類型的數據
    :param es_host: 要寫es的ip
    :param index: 要寫入數據的索引
    :param index_type: 索引的類型
    :param key: 指定文檔的id,就是要以文檔的那個字段作為_id
    :return:
    """
    #實例es客戶端記得單例模式
    if es.exist.index(index):
        es.index.create(index, 'spo')
    es_write_conf = {
        "es.nodes": es_host,
        "es.port": port,
        "es.resource": index/index_type,
        "es.input.json": "yes",
        "es.mapping.id": key
    }

    (pdd.map(lambda _dic: ('', json.dumps(_dic))))   #這百年是為把這個數據構造成元組格式,如果傳進來的_dic是字典則需要jdumps,如果傳進來之前就已經dumps,這便就不需要dumps了
    .saveAsNewAPIHadoopFile(
        path='-',
        outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable",
        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
        conf=es_write_conf)

    )


if __name__ == '__main__':

    #實例化sp對象
    sc=Sparkcontext()
    #文件中的呢內容一行一行用sc的讀取出來
    json_text=sc.textFile('./1.txt')
    #進行轉換
    json_data=json_text.map(lambda line:parse(line))

    saveData2es(json_data,'127.0.01','9200','index_test','index_type','doc_id')

    sc.stop()

 看到了把,面那個例子在寫入es之前加了一個id,返回一個元組格式的,現在這個封裝指定_id就會比較靈活了




鏈接:https://www.jianshu.com/p/c7365b9bda0a


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM