ElasticSearch是一個基於Lucene的搜索服務器。它提供了一個分布式多用戶能力的全文搜索引擎,基於RESTful web接口。企業級搜索引擎。設計用於雲計算中,能夠達到實時搜索,穩定,可靠,快速,安裝使用方便。
本文並不打算介紹ElasticSearch的概念,安裝部署等知識,或者直接介紹如何使用Apache Spark將數據寫入到ElasticSearch中。此處使用的是類庫是elasticsearch-hadoop
,其從2.1版本開始提供了內置支持Apache Spark的功能,在使用elasticsearch-hadoop
之前,我們需要約會依賴:
<
dependency
>
<
groupId
>org.elasticsearch</
groupId
>
<
artifactId
>elasticsearch-hadoop</
artifactId
>
<
version
>2.3.4</
version
>
</
dependency
>
|
為了方便,這里直接在spark-shell中操作ElasticSearch。在此之前,我們需要在$SPARK_HOME/conf/spark-default.conf
文件中加入以下配置:
spark.es.nodes www.iteblog.com
spark.es.port 9200
|
其中spark.es.nodes
指定您es替換的機器列表,但是不需要把您合並所有的索引都列在里面;spark.es.port
表示要使用HTTP端口。之所以要加上spark重定向是因為Spark通過從文件里面或者命令行里面重新配置參數會加載spark elasticsearch-hadoop
開頭去掉。
如果您直接將代碼寫入文件,那么您可以在初始化SparkContext之前設置好ElasticSearch相關的參數,如下:
import
org.apache.spark.SparkConf
val
conf
=
new
SparkConf().setAppName(
"iteblog"
).setMaster(master)
conf.set(
"es.nodes"
,
"www.iteblog.com"
)
conf.set(
"es.port"
,
"9200"
)
conf.set(
"es.index.auto.create"
,
"true"
)
|
在寫入數據之前,先導入org.elasticsearch.spark._
包,這將使所有的RDD擁有saveToEs
方法。下面我將一一介紹將不同類型的數據寫入ElasticSearch中。
將地圖對象編寫ElasticSearch
scala>
import
org.elasticsearch.spark.
_
import
org.elasticsearch.spark.
_
scala>
val
numbers
=
Map(
"one"
->
1
,
"two"
->
2
,
"three"
->
3
)
numbers
:
scala.collection.immutable.Map[String,Int]
=
Map(one ->
1
, two ->
2
, three ->
3
)
scala>
val
airports
=
Map(
"OTP"
->
"Otopeni"
,
"SFO"
->
"San Fran"
)
airports
:
scala.collection.immutable.Map[String,String]
=
Map(OTP -> Otopeni, SFO -> San Fran)
scala> sc.makeRDD(Seq(numbers, airports)).saveToEs(
"iteblog/docs"
)
|
上面創建了兩個地圖對象,然后將其寫入到ElasticSearch中;其中saveToEs里面的參數的iteblog表示索引(索引),而文檔表示類型。然后我們可以通過以下URL查看iteblog這個索引的屬性:
curl -XGET
:
9200
/iteblog
{
"iteblog"
:
{
"aliases"
:
{ },
"mappings"
:
{
"docs"
:
{
"properties"
:
{
"SFO"
:
{
"type"
:
"string"
},
"arrival"
:
{
"type"
:
"string"
},
"one"
:
{
"type"
:
"long"
},
"three"
:
{
"type"
:
"long"
},
"two"
:
{
"type"
:
"long"
}
}
}
},
"settings"
:
{
"index"
:
{
"creation_date"
:
"1470805957888"
,
"uuid"
:
"HNIcGZ69Tf6qX3XVccwKUg"
,
"number_of_replicas"
:
"1"
,
"number_of_shards"
:
"5"
,
"version"
:
{
"created"
:
"2030499"
}
}
},
"warmers"
:
{ }
}
}
|
同時使用下面URL搜索出所有的文檔:
:
9200
/iteblog/docs/
_
search
{
"took"
:
2
,
"timed_out"
:
false
,
"_shards"
:
{
"total"
:
5
,
"successful"
:
5
,
"failed"
:
0
},
"hits"
:
{
"total"
:
2
,
"max_score"
:
1
,
"hits"
:
[
{
"_index"
:
"iteblog"
,
"_type"
:
"docs"
,
"_id"
:
"AVZy3d5sJfxPRwCjtWM9"
,
"_score"
:
1
,
"_source"
:
{
"one"
:
1
,
"two"
:
2
,
"three"
:
3
}
},
{
"_index"
:
"iteblog"
,
"_type"
:
"docs"
,
"_id"
:
"AVZy3d5sJfxPRwCjtWM-"
,
"_score"
:
1
,
"_source"
:
{
"arrival"
:
"Otopeni"
,
"SFO"
:
"San Fran"
}
}
]
}
}
|
將案例類對象寫入ElasticSearch
我們還可以將Scala中的case類對象寫入到ElasticSearch; Java中可以寫入JavaBean對象,如下:
scala>
case
class
Trip(departure
:
String, arrival
:
String)
defined
class
Trip
scala>
val
upcomingTrip
=
Trip(
"OTP"
,
"SFO"
)
upcomingTrip
:
Trip
=
Trip(OTP,SFO)
scala>
val
lastWeekTrip
=
Trip(
"MUC"
,
"OTP"
)
lastWeekTrip
:
Trip
=
Trip(MUC,OTP)
scala>
val
rdd
=
sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))
rdd
:
org.apache.spark.rdd.RDD[Trip]
=
ParallelCollectionRDD[
1
] at makeRDD at <console>
:
37
scala> rdd.saveToEs(
"iteblog/class"
)
|
上面的代碼片段將upcomingTrip和lastWeekTrip寫入到稱為iteblog的_index中,類型是類。上面都是通過隱式轉換才設置rdd擁有saveToEs
方法。elasticsearch-hadoop
還提供了顯式方法來把RDD寫入到ElasticSearch中,如下:
scala>
import
org.elasticsearch.spark.rdd.EsSpark
import
org.elasticsearch.spark.rdd.EsSpark
scala>
val
rdd
=
sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))
rdd
:
org.apache.spark.rdd.RDD[Trip]
=
ParallelCollectionRDD[
0
] at makeRDD at <console>
:
34
scala> EsSpark.saveToEs(rdd,
"spark/docs"
)
|
將Json字符串寫入ElasticSearch
我們可以直接將Json字符串寫入到ElasticSearch中,如下:
scala>
val
json
1
=
""
"{"
id
" : 1, "
blog
" : "
www.iteblog.com
", "
weixin
" : "
iteblog
_
hadoop
"}"
""
json
1
:
String
=
{
"id"
:
1
,
"blog"
:
"www.iteblog.com"
,
"weixin"
:
"iteblog_hadoop"
}
scala>
val
json
2
=
""
"{"
id
" : 2, "
blog
" : "
books.iteblog.com
", "
weixin
" : "
iteblog
_
hadoop
"}"
""
json
2
:
String
=
{
"id"
:
2
,
"blog"
:
"books.iteblog.com"
,
"weixin"
:
"iteblog_hadoop"
}
scala> sc.makeRDD(Seq(json
1
, json
2
)).saveJsonToEs(
"iteblog/json"
)
|
動態設置插入的類型
有很多場景下同一個一個Job中有很多類型的數據,我們希望一次就可以將不同的數據寫入到不同的類型中,從而屬於書的信息全部寫入到type為book里面;而屬於cd的信息全部寫入到type為cd里面。很高興的是elasticsearch-hadoop
為我們提供了這個功能,如下:
scala>
val
game
=
Map(
"media_type"
->
"game"
,
"title"
->
"FF VI"
,
"year"
->
"1994"
)
game
:
scala.collection.immutable.Map[String,String]
=
Map(media
_
type -> game, title -> FF VI, year ->
1994
)
scala>
val
book
=
Map(
"media_type"
->
"book"
,
"title"
->
"Harry Potter"
,
"year"
->
"2010"
)
book
:
scala.collection.immutable.Map[String,String]
=
Map(media
_
type -> book, title -> Harry Potter, year ->
2010
)
scala>
val
cd
=
Map(
"media_type"
->
"music"
,
"title"
->
"Surfing With The Alien"
)
cd
:
scala.collection.immutable.Map[String,String]
=
Map(media
_
type -> music, title -> Surfing With The Alien)
scala> sc.makeRDD(Seq(game, book, cd)).saveToEs(
"iteblog/{media_type}"
)
|
類型是通過{media_type}
通配符設置的,這個在寫入的時候可以獲取到,然后將不同類型的數據寫入到不同的類型中。
自定義id
在ElasticSearch中,_index/_type/_id
的組合可以唯一確定一個文檔。如果我們不指定id的話,ElasticSearch將會自動為我們生產唯一的ID,自動生成的ID有20個字符長如下:
{
"_index"
:
"iteblog"
,
"_type"
:
"docs"
,
"_id"
:
"AVZy3d5sJfxPRwCjtWM-"
,
"_score"
:
1
,
"_source"
:
{
"arrival"
:
"Otopeni"
,
"SFO"
:
"San Fran"
}
}
|
很明顯,這么長的字符串沒啥意義,而且也不適合我們記憶使用。不過我們可以在插入數據的時候手動指定id的值,如下:
scala>
val
otp
=
Map(
"iata"
->
"OTP"
,
"name"
->
"Otopeni"
)
otp
:
scala.collection.immutable.Map[String,String]
=
Map(iata -> OTP, name -> Otopeni)
scala>
val
muc
=
Map(
"iata"
->
"MUC"
,
"name"
->
"Munich"
)
muc
:
scala.collection.immutable.Map[String,String]
=
Map(iata -> MUC, name -> Munich)
scala>
val
sfo
=
Map(
"iata"
->
"SFO"
,
"name"
->
"San Fran"
)
sfo
:
scala.collection.immutable.Map[String,String]
=
Map(iata -> SFO, name -> San Fran)
scala>
val
airportsRDD
=
sc.makeRDD(Seq((
1
, otp), (
2
, muc), (
3
, sfo)))
scala> airportsRDD.saveToEsWithMeta(
"iteblog/2015"
)
|
上面的Seq((1, otp), (2, muc), (3, sfo))
語句指定為各個對象指定了id值,分別為1、2、3。然后您可以通過/iteblog/2015/1
URL搜索到otp對象的值。我們還可以如下方式指定id:
scala>
val
json
1
=
""
"{"
id
" : 1, "
blog
" : "
www.iteblog.com
", "
weixin
" : "
iteblog
_
hadoop
"}"
""
json
1
:
String
=
{
"id"
:
1
,
"blog"
:
"www.iteblog.com"
,
"weixin"
:
"iteblog_hadoop"
}
scala>
val
json
2
=
""
"{"
id
" : 2, "
blog
" : "
books.iteblog.com
", "
weixin
" : "
iteblog
_
hadoop
"}"
""
json
2
:
String
=
{
"id"
:
2
,
"blog"
:
"books.iteblog.com"
,
"weixin"
:
"iteblog_hadoop"
}
scala>
val
rdd
=
sc.makeRDD(Seq(json
1
, json
2
))
scala> EsSpark.saveToEs(rdd,
"iteblog/docs"
, Map(
"es.mapping.id"
->
"id"
))
|
上面通過es.mapping.id
參數將對象中的id映射為每條記錄的id。
自定義記錄的元數據
我們甚至可以在寫入數據的時候自定義記錄的元數據,如下:
scala>
import
org.elasticsearch.spark.rdd.Metadata.
_
import
org.elasticsearch.spark.rdd.Metadata.
_
scala>
val
otp
=
Map(
"iata"
->
"OTP"
,
"name"
->
"Otopeni"
)
otp
:
scala.collection.immutable.Map[String,String]
=
Map(iata -> OTP, name -> Otopeni)
scala>
val
muc
=
Map(
"iata"
->
"MUC"
,
"name"
->
"Munich"
)
muc
:
scala.collection.immutable.Map[String,String]
=
Map(iata -> MUC, name -> Munich)
scala>
val
sfo
=
Map(
"iata"
->
"SFO"
,
"name"
->
"San Fran"
)
sfo
:
scala.collection.immutable.Map[String,String]
=
Map(iata -> SFO, name -> San Fran)
scala>
val
otpMeta
=
Map(ID ->
1
, TTL ->
"3h"
)
scala>
val
mucMeta
=
Map(ID ->
2
, VERSION ->
"23"
)
scala>
val
sfoMeta
=
Map(ID ->
3
)
scala>
val
airportsRDD
=
sc.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
scala> airportsRDD.saveToEsWithMeta(
"iteblog/2015"
)
|
上面的代碼片段分別為otp,muc和sfo設置了不同的元數據,這在很多場景下是非常有用的。