由於ES集群在拉取數據時可以提供過濾功能,因此在采用ES集群作為spark運算時的數據來源時,
根據過濾條件在拉取的源頭就可以過濾了(ES提供過濾),就不必像從hdfs那樣必須全部加載進spark的內存根據filter算子過濾,費時費力。
代碼:
import org.apache.spark.{SparkConf, SparkContext} import org.elasticsearch.spark._ object Spark2Elasticsearch { def main(args: Array[String]): Unit = { val conf =new SparkConf().setAppName("Spark2ES").setMaster("local[2]") conf.set("es.nodes","hadoop1,hadoop2,hadoop3") conf.set("es.port","9200") conf.set("es.index.auto.create","true") val sc =new SparkContext(conf) val query:String =s"""{ "query" : { "match_all" : {} }, "filter" : { "term" : { "price" : 50.55 } } }""" val rdd = sc.esRDD("store", query) println(rdd.collect().toBuffer) } }
運行結果:
采坑點:
那個sc.esRDD方法其實是ES提供的jar包里的一個隱試轉換,在import org.elasticsearch.spark._這個包下,
配置mavin依賴時注意spark的配套版本,本文1.6的spark依賴如下: