代碼:
import json from pyspark.sql import SparkSession from pyspark import SparkConf def trans_form(data_tuple): """ 對從es讀取出來的每一條數據進行格式轉換 :param data_tuple: :return: """ data = data_tuple[1] dic = json.loads(data) return dic def get_es_conf(es_hot, es_port, index, type_, query_dic): query = {"query": {"match_all": {}}} if isinstance(query_dic, dict): query = json.dumps(query_dic) else: query = json.dumps(query) es_read_conf = { "es.nodes": es_hot, "es.port": es_port, # 必須是字符串類型 "es.resource": '{}/{}'.format(index, type_), "es.out.json": "yes", "es.query": query } return es_read_conf def read_data_from_es(sc, es_hot, es_port, index, type_, query_dic): es_read_conf = get_es_conf(es_hot, es_port, index, type_, query_dic) es_rdd = sc.newAPIHadoopRDD( inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_read_conf ) return es_rdd if __name__ == '__main__': conf = SparkConf() spark = SparkSession().builder.config(conf).appName('test').getOrCreate() sc = spark.SparkContext es_host = '127.0.0.1' es_port = '9200' index = 'test' type_name = 'result' query = {"query": {"match_all": {}}} es_rdd = read_data_from_es(sc, es_host, es_port, index, type_name, query) # 讀取出來的是_id和數據組成的元組,轉換格式之后過濾空值就是我們要的數據 hdd = es_rdd.map(lambda x: trans_form(x)).filter(lambda x: x)