使用Apache Spark將數據寫入ElasticSearch


ElasticSearch是一個基於Lucene的搜索服務器。它提供了一個分布式多用戶能力的全文搜索引擎,基於RESTful web接口。企業級搜索引擎。設計用於雲計算中,能夠達到實時搜索,穩定,可靠,快速,安裝使用方便。

  本文並不打算介紹ElasticSearch的概念,安裝部署等知識,或者直接介紹如何使用Apache Spark將數據寫入ElasticSearch中。此處使用的是類庫是elasticsearch-hadoop,其從2.1版本開始提供了內置支持Apache Spark的功能,在使用elasticsearch-hadoop之前,我們需要約會依賴:

< dependency >
     < groupId >org.elasticsearch</ groupId >
     < artifactId >elasticsearch-hadoop</ artifactId >
     < version >2.3.4</ version >
</ dependency >

  為了方便,這里直接在spark-shell中操作ElasticSearch。在此之前,我們需要在$SPARK_HOME/conf/spark-default.conf文件中加入以下配置:

spark.es.nodes  www.iteblog.com
spark.es.port  9200

其中spark.es.nodes指定您es替換的機器列表,但是不需要把您合並所有的索引都列在里面;spark.es.port表示要使用HTTP端口。之所以要加上spark重定向是因為Spark通過從文件里面或者命令行里面重新配置參數會加載spark elasticsearch-hadoop開頭去掉。

如果您直接將代碼寫入文件,那么您可以在初始化SparkContext之前設置好ElasticSearch相關的參數,如下:

import org.apache.spark.SparkConf
 
val conf = new SparkConf().setAppName( "iteblog" ).setMaster(master)
conf.set( "es.nodes" , "www.iteblog.com" )
conf.set( "es.port" , "9200" )
conf.set( "es.index.auto.create" , "true" )

在寫入數據之前,先導入org.elasticsearch.spark._包,這將使所有的RDD擁有saveToEs方法。下面我將一一介紹將不同類型的數據寫入ElasticSearch中。

將地圖對象編寫ElasticSearch

scala> import org.elasticsearch.spark. _
import org.elasticsearch.spark. _
 
scala> val numbers = Map( "one" -> 1 , "two" -> 2 , "three" -> 3 )
numbers : scala.collection.immutable.Map[String,Int] = Map(one -> 1 , two -> 2 , three -> 3 )
 
scala> val airports = Map( "OTP" -> "Otopeni" , "SFO" -> "San Fran" )
airports : scala.collection.immutable.Map[String,String] = Map(OTP -> Otopeni, SFO -> San Fran)
 
scala> sc.makeRDD(Seq(numbers, airports)).saveToEs( "iteblog/docs" )

上面創建了兩個地圖對象,然后將其寫入到ElasticSearch中;其中saveToEs里面的參數的iteblog表示索引(索引),而文檔表示類型。然后我們可以通過以下URL查看iteblog這個索引的屬性:

curl -XGET : 9200 /iteblog
 
{
     "iteblog" : {
         "aliases" : { },
         "mappings" : {
             "docs" : {
                 "properties" : {
                     "SFO" : {
                         "type" : "string"
                     },
                     "arrival" : {
                         "type" : "string"
                     },
                     "one" : {
                         "type" : "long"
                     },
                     "three" : {
                         "type" : "long"
                     },
                     "two" : {
                         "type" : "long"
                     }
                 }
             }
         },
         "settings" : {
             "index" : {
                 "creation_date" : "1470805957888" ,
                 "uuid" : "HNIcGZ69Tf6qX3XVccwKUg" ,
                 "number_of_replicas" : "1" ,
                 "number_of_shards" : "5" ,
                 "version" : {
                     "created" : "2030499"
                 }
             }
         },
         "warmers" : { }
     }
}

同時使用下面URL搜索出所有的文檔:

: 9200 /iteblog/docs/ _ search
 
{
     "took" : 2 ,
     "timed_out" : false ,
     "_shards" : {
         "total" : 5 ,
         "successful" : 5 ,
         "failed" : 0
     },
     "hits" : {
         "total" : 2 ,
         "max_score" : 1 ,
         "hits" : [
             {
                 "_index" : "iteblog" ,
                 "_type" : "docs" ,
                 "_id" : "AVZy3d5sJfxPRwCjtWM9" ,
                 "_score" : 1 ,
                 "_source" : {
                     "one" : 1 ,
                     "two" : 2 ,
                     "three" : 3
                 }
             },
             {
                 "_index" : "iteblog" ,
                 "_type" : "docs" ,
                 "_id" : "AVZy3d5sJfxPRwCjtWM-" ,
                 "_score" : 1 ,
                 "_source" : {
                     "arrival" : "Otopeni" ,
                     "SFO" : "San Fran"
                 }
             }
         ]
     }
}

將案例類對象寫入ElasticSearch

我們還可以將Scala中的case類對象寫入到ElasticSearch; Java中可以寫入JavaBean對象,如下:

scala> case class Trip(departure : String, arrival : String)
defined class Trip
 
scala> val upcomingTrip = Trip( "OTP" , "SFO" )
upcomingTrip : Trip = Trip(OTP,SFO)
 
scala> val lastWeekTrip = Trip( "MUC" , "OTP" )
lastWeekTrip : Trip = Trip(MUC,OTP)
 
scala> val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip)) 
rdd : org.apache.spark.rdd.RDD[Trip] = ParallelCollectionRDD[ 1 ] at makeRDD at <console> : 37
 
scala> rdd.saveToEs( "iteblog/class" )

上面的代碼片段將upcomingTrip和lastWeekTrip寫入到稱為iteblog的_index中,類型是類。上面都是通過隱式轉換才設置rdd擁有saveToEs方法。elasticsearch-hadoop還提供了顯式方法來把RDD寫入到ElasticSearch中,如下:

scala> import org.elasticsearch.spark.rdd.EsSpark
import org.elasticsearch.spark.rdd.EsSpark
 
scala> val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))
rdd : org.apache.spark.rdd.RDD[Trip] = ParallelCollectionRDD[ 0 ] at makeRDD at <console> : 34
 
scala> EsSpark.saveToEs(rdd, "spark/docs" )

將Json字符串寫入ElasticSearch

我們可以直接將Json字符串寫入到ElasticSearch中,如下:

scala> val json 1 = "" "{" id " : 1, " blog " : " www.iteblog.com ", " weixin " : " iteblog _ hadoop "}" ""
json 1 : String = { "id" : 1 , "blog" : "www.iteblog.com" , "weixin" : "iteblog_hadoop" }
 
scala> val json 2 = "" "{" id " : 2, " blog " : " books.iteblog.com ", " weixin " : " iteblog _ hadoop "}" ""
json 2 : String = { "id" : 2 , "blog" : "books.iteblog.com" , "weixin" : "iteblog_hadoop" }
 
scala> sc.makeRDD(Seq(json 1 , json 2 )).saveJsonToEs( "iteblog/json" )

動態設置插入的類型

有很多場景下同一個一個Job中有很多類型的數據,我們希望一次就可以將不同的數據寫入到不同的類型中,從而屬於書的信息全部寫入到type為book里面;而屬於cd的信息全部寫入到type為cd里面。很高興的是elasticsearch-hadoop為我們提供了這個功能,如下:

scala> val game = Map( "media_type" -> "game" , "title" -> "FF VI" , "year" -> "1994" )
game : scala.collection.immutable.Map[String,String] = Map(media _ type -> game, title -> FF VI, year -> 1994 )
 
scala> val book = Map( "media_type" -> "book" , "title" -> "Harry Potter" , "year" -> "2010" )
book : scala.collection.immutable.Map[String,String] = Map(media _ type -> book, title -> Harry Potter, year -> 2010 )
 
scala> val cd = Map( "media_type" -> "music" , "title" -> "Surfing With The Alien" )
cd : scala.collection.immutable.Map[String,String] = Map(media _ type -> music, title -> Surfing With The Alien)
 
scala> sc.makeRDD(Seq(game, book, cd)).saveToEs( "iteblog/{media_type}" )

類型是通過{media_type}通配符設置的,這個在寫入的時候可以獲取到,然后將不同類型的數據寫入到不同的類型中。

自定義id

在ElasticSearch中,_index/_type/_id的組合可以唯一確定一個文檔。如果我們不指定id的話,ElasticSearch將會自動為我們生產唯一的ID,自動生成的ID有20個字符長如下:

{
     "_index" : "iteblog" ,
     "_type" : "docs" ,
     "_id" : "AVZy3d5sJfxPRwCjtWM-" ,
     "_score" : 1 ,
     "_source" : {
         "arrival" : "Otopeni" ,
         "SFO" : "San Fran"
     }
}

很明顯,這么長的字符串沒啥意義,而且也不適合我們記憶使用。不過我們可以在插入數據的時候手動指定id的值,如下:

scala> val otp = Map( "iata" -> "OTP" , "name" -> "Otopeni" )
otp : scala.collection.immutable.Map[String,String] = Map(iata -> OTP, name -> Otopeni)
 
scala> val muc = Map( "iata" -> "MUC" , "name" -> "Munich" )
muc : scala.collection.immutable.Map[String,String] = Map(iata -> MUC, name -> Munich)
 
scala> val sfo = Map( "iata" -> "SFO" , "name" -> "San Fran" )
sfo : scala.collection.immutable.Map[String,String] = Map(iata -> SFO, name -> San Fran)
 
scala> val airportsRDD = sc.makeRDD(Seq(( 1 , otp), ( 2 , muc), ( 3 , sfo)))
 
scala> airportsRDD.saveToEsWithMeta( "iteblog/2015" )

上面的Seq((1, otp), (2, muc), (3, sfo))語句指定為各個對象指定了id值,分別為1、2、3。然后您可以通過/iteblog/2015/1URL搜索到otp對象的值。我們還可以如下方式指定id:

scala> val json 1 = "" "{" id " : 1, " blog " : " www.iteblog.com ", " weixin " : " iteblog _ hadoop "}" ""
json 1 : String = { "id" : 1 , "blog" : "www.iteblog.com" , "weixin" : "iteblog_hadoop" }
 
scala> val json 2 = "" "{" id " : 2, " blog " : " books.iteblog.com ", " weixin " : " iteblog _ hadoop "}" ""
json 2 : String = { "id" : 2 , "blog" : "books.iteblog.com" , "weixin" : "iteblog_hadoop" }
 
scala> val rdd = sc.makeRDD(Seq(json 1 , json 2 ))
 
scala> EsSpark.saveToEs(rdd, "iteblog/docs" , Map( "es.mapping.id" -> "id" ))

上面通過es.mapping.id參數將對象中的id映射為每條記錄的id。

自定義記錄的元數據

我們甚至可以在寫入數據的時候自定義記錄的元數據,如下:

scala> import org.elasticsearch.spark.rdd.Metadata. _         
import org.elasticsearch.spark.rdd.Metadata. _
 
scala> val otp = Map( "iata" -> "OTP" , "name" -> "Otopeni" )
otp : scala.collection.immutable.Map[String,String] = Map(iata -> OTP, name -> Otopeni)
 
scala> val muc = Map( "iata" -> "MUC" , "name" -> "Munich" )
muc : scala.collection.immutable.Map[String,String] = Map(iata -> MUC, name -> Munich)
 
scala> val sfo = Map( "iata" -> "SFO" , "name" -> "San Fran" )
sfo : scala.collection.immutable.Map[String,String] = Map(iata -> SFO, name -> San Fran)
 
scala> val otpMeta = Map(ID -> 1 , TTL -> "3h"
 
scala> val mucMeta = Map(ID -> 2 , VERSION -> "23" )
 
scala> val sfoMeta = Map(ID -> 3 )
 
scala> val airportsRDD = sc.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
 
scala> airportsRDD.saveToEsWithMeta( "iteblog/2015" )

上面的代碼片段分別為otp,muc和sfo設置了不同的元數據,這在很多場景下是非常有用的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM