******重點中的重點,這是首先要注意的問題:
就是導入的org.elasticsearch.elasticsearch-spark-20_2.11 Jar包的版本一定要和要讀取的ES數據庫的版本保持一致,
如果比數據庫版本低,會直接報錯,如果高於數據庫的版本,數據的解析會出現問題。
首先配置SparkConf
1 SparkConf conf = new SparkConf() 2 .setAppName("ElasticSearch-spark") 3 .setMaster("local[1]") 4 .set("es.es.index.auto.create", "true") 5 6 .set("es.nodes","127.0.0.1") 7 .set("es.port","9200") 8 .set("es.nodes.wan.only", "true");
第一種讀取方式:
1 SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate(); 2 JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapter 3 JavaRDD<Map<String, Object>> searchRdd = esRDD(jsc, "index" ).values(); 4 for (Map<String, Object> item : searchRdd.collect()) { 5 item.forEach((key, value)->{ 6 System.out.println("search key:" + key + ", search value:" + value); 7 }); 8 }
第二種讀取方式:
1 JavaSparkContext sc = new JavaSparkContext(conf); 2 JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(sc, "index"); 3 System.out.println(esRDD.count()); 4 System.out.println(esRDD.collect().toString()); 5 for(Tuple2 tuple:esRDD.collect()){ 6 System.out.print(tuple._1()+"----------"); 7 System.out.println(tuple._2()); 8 }
第三種讀取方式:
1 SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); 2 Dataset<Row> a = spark 3 .read() 4 .format("es") 5 .load("index") 6 ; 7 System.out.println(a.schema()); 8 a.show();
以上三種方式都可以返回ES中的數據,可針對需求自行選擇。
記於2019年5月22日20點41分
