Spark讀取ElasticSearch數據庫三種配置方式及其注意事項


******重點中的重點,這是首先要注意的問題:

就是導入的org.elasticsearch.elasticsearch-spark-20_2.11  Jar包的版本一定要和要讀取的ES數據庫的版本保持一致,

如果比數據庫版本低,會直接報錯,如果高於數據庫的版本,數據的解析會出現問題。

首先配置SparkConf

1         SparkConf conf = new SparkConf()
2                 .setAppName("ElasticSearch-spark")
3                 .setMaster("local[1]")
4                 .set("es.es.index.auto.create", "true")
5 
6                 .set("es.nodes","127.0.0.1")
7                 .set("es.port","9200")
8                 .set("es.nodes.wan.only", "true");

 

第一種讀取方式:

1         SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
2         JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapter
3         JavaRDD<Map<String, Object>> searchRdd = esRDD(jsc, "index" ).values();
4         for (Map<String, Object> item : searchRdd.collect()) {
5             item.forEach((key, value)->{
6                 System.out.println("search key:" + key + ", search value:" + value);
7             });
8         }

 

第二種讀取方式:

1         JavaSparkContext sc = new JavaSparkContext(conf);
2         JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(sc, "index");
3         System.out.println(esRDD.count());
4         System.out.println(esRDD.collect().toString());
5         for(Tuple2 tuple:esRDD.collect()){
6             System.out.print(tuple._1()+"----------");
7             System.out.println(tuple._2());
8         }

第三種讀取方式:

1         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
2         Dataset<Row> a  = spark
3                 .read()
4                 .format("es")
5                 .load("index")
6                 ;
7         System.out.println(a.schema());
8         a.show();

以上三種方式都可以返回ES中的數據,可針對需求自行選擇。

記於2019年5月22日20點41分


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM