ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。
本文并不打算介绍ElasticSearch的概念,安装部署等知识,或者直接介绍如何使用Apache Spark将数据写入到ElasticSearch中。此处使用的是类库是elasticsearch-hadoop
,其从2.1版本开始提供了内置支持Apache Spark的功能,在使用elasticsearch-hadoop
之前,我们需要约会依赖:
<
dependency
>
<
groupId
>org.elasticsearch</
groupId
>
<
artifactId
>elasticsearch-hadoop</
artifactId
>
<
version
>2.3.4</
version
>
</
dependency
>
|
为了方便,这里直接在spark-shell中操作ElasticSearch。在此之前,我们需要在$SPARK_HOME/conf/spark-default.conf
文件中加入以下配置:
spark.es.nodes www.iteblog.com
spark.es.port 9200
|
其中spark.es.nodes
指定您es替换的机器列表,但是不需要把您合并所有的索引都列在里面;spark.es.port
表示要使用HTTP端口。之所以要加上spark重定向是因为Spark通过从文件里面或者命令行里面重新配置参数会加载spark elasticsearch-hadoop
开头去掉。
如果您直接将代码写入文件,那么您可以在初始化SparkContext之前设置好ElasticSearch相关的参数,如下:
import
org.apache.spark.SparkConf
val
conf
=
new
SparkConf().setAppName(
"iteblog"
).setMaster(master)
conf.set(
"es.nodes"
,
"www.iteblog.com"
)
conf.set(
"es.port"
,
"9200"
)
conf.set(
"es.index.auto.create"
,
"true"
)
|
在写入数据之前,先导入org.elasticsearch.spark._
包,这将使所有的RDD拥有saveToEs
方法。下面我将一一介绍将不同类型的数据写入ElasticSearch中。
将地图对象编写ElasticSearch
scala>
import
org.elasticsearch.spark.
_
import
org.elasticsearch.spark.
_
scala>
val
numbers
=
Map(
"one"
->
1
,
"two"
->
2
,
"three"
->
3
)
numbers
:
scala.collection.immutable.Map[String,Int]
=
Map(one ->
1
, two ->
2
, three ->
3
)
scala>
val
airports
=
Map(
"OTP"
->
"Otopeni"
,
"SFO"
->
"San Fran"
)
airports
:
scala.collection.immutable.Map[String,String]
=
Map(OTP -> Otopeni, SFO -> San Fran)
scala> sc.makeRDD(Seq(numbers, airports)).saveToEs(
"iteblog/docs"
)
|
上面创建了两个地图对象,然后将其写入到ElasticSearch中;其中saveToEs里面的参数的iteblog表示索引(索引),而文档表示类型。然后我们可以通过以下URL查看iteblog这个索引的属性:
curl -XGET
:
9200
/iteblog
{
"iteblog"
:
{
"aliases"
:
{ },
"mappings"
:
{
"docs"
:
{
"properties"
:
{
"SFO"
:
{
"type"
:
"string"
},
"arrival"
:
{
"type"
:
"string"
},
"one"
:
{
"type"
:
"long"
},
"three"
:
{
"type"
:
"long"
},
"two"
:
{
"type"
:
"long"
}
}
}
},
"settings"
:
{
"index"
:
{
"creation_date"
:
"1470805957888"
,
"uuid"
:
"HNIcGZ69Tf6qX3XVccwKUg"
,
"number_of_replicas"
:
"1"
,
"number_of_shards"
:
"5"
,
"version"
:
{
"created"
:
"2030499"
}
}
},
"warmers"
:
{ }
}
}
|
同时使用下面URL搜索出所有的文档:
:
9200
/iteblog/docs/
_
search
{
"took"
:
2
,
"timed_out"
:
false
,
"_shards"
:
{
"total"
:
5
,
"successful"
:
5
,
"failed"
:
0
},
"hits"
:
{
"total"
:
2
,
"max_score"
:
1
,
"hits"
:
[
{
"_index"
:
"iteblog"
,
"_type"
:
"docs"
,
"_id"
:
"AVZy3d5sJfxPRwCjtWM9"
,
"_score"
:
1
,
"_source"
:
{
"one"
:
1
,
"two"
:
2
,
"three"
:
3
}
},
{
"_index"
:
"iteblog"
,
"_type"
:
"docs"
,
"_id"
:
"AVZy3d5sJfxPRwCjtWM-"
,
"_score"
:
1
,
"_source"
:
{
"arrival"
:
"Otopeni"
,
"SFO"
:
"San Fran"
}
}
]
}
}
|
将案例类对象写入ElasticSearch
我们还可以将Scala中的case类对象写入到ElasticSearch; Java中可以写入JavaBean对象,如下:
scala>
case
class
Trip(departure
:
String, arrival
:
String)
defined
class
Trip
scala>
val
upcomingTrip
=
Trip(
"OTP"
,
"SFO"
)
upcomingTrip
:
Trip
=
Trip(OTP,SFO)
scala>
val
lastWeekTrip
=
Trip(
"MUC"
,
"OTP"
)
lastWeekTrip
:
Trip
=
Trip(MUC,OTP)
scala>
val
rdd
=
sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))
rdd
:
org.apache.spark.rdd.RDD[Trip]
=
ParallelCollectionRDD[
1
] at makeRDD at <console>
:
37
scala> rdd.saveToEs(
"iteblog/class"
)
|
上面的代码片段将upcomingTrip和lastWeekTrip写入到称为iteblog的_index中,类型是类。上面都是通过隐式转换才设置rdd拥有saveToEs
方法。elasticsearch-hadoop
还提供了显式方法来把RDD写入到ElasticSearch中,如下:
scala>
import
org.elasticsearch.spark.rdd.EsSpark
import
org.elasticsearch.spark.rdd.EsSpark
scala>
val
rdd
=
sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))
rdd
:
org.apache.spark.rdd.RDD[Trip]
=
ParallelCollectionRDD[
0
] at makeRDD at <console>
:
34
scala> EsSpark.saveToEs(rdd,
"spark/docs"
)
|
将Json字符串写入ElasticSearch
我们可以直接将Json字符串写入到ElasticSearch中,如下:
scala>
val
json
1
=
""
"{"
id
" : 1, "
blog
" : "
www.iteblog.com
", "
weixin
" : "
iteblog
_
hadoop
"}"
""
json
1
:
String
=
{
"id"
:
1
,
"blog"
:
"www.iteblog.com"
,
"weixin"
:
"iteblog_hadoop"
}
scala>
val
json
2
=
""
"{"
id
" : 2, "
blog
" : "
books.iteblog.com
", "
weixin
" : "
iteblog
_
hadoop
"}"
""
json
2
:
String
=
{
"id"
:
2
,
"blog"
:
"books.iteblog.com"
,
"weixin"
:
"iteblog_hadoop"
}
scala> sc.makeRDD(Seq(json
1
, json
2
)).saveJsonToEs(
"iteblog/json"
)
|
动态设置插入的类型
有很多场景下同一个一个Job中有很多类型的数据,我们希望一次就可以将不同的数据写入到不同的类型中,从而属于书的信息全部写入到type为book里面;而属于cd的信息全部写入到type为cd里面。很高兴的是elasticsearch-hadoop
为我们提供了这个功能,如下:
scala>
val
game
=
Map(
"media_type"
->
"game"
,
"title"
->
"FF VI"
,
"year"
->
"1994"
)
game
:
scala.collection.immutable.Map[String,String]
=
Map(media
_
type -> game, title -> FF VI, year ->
1994
)
scala>
val
book
=
Map(
"media_type"
->
"book"
,
"title"
->
"Harry Potter"
,
"year"
->
"2010"
)
book
:
scala.collection.immutable.Map[String,String]
=
Map(media
_
type -> book, title -> Harry Potter, year ->
2010
)
scala>
val
cd
=
Map(
"media_type"
->
"music"
,
"title"
->
"Surfing With The Alien"
)
cd
:
scala.collection.immutable.Map[String,String]
=
Map(media
_
type -> music, title -> Surfing With The Alien)
scala> sc.makeRDD(Seq(game, book, cd)).saveToEs(
"iteblog/{media_type}"
)
|
类型是通过{media_type}
通配符设置的,这个在写入的时候可以获取到,然后将不同类型的数据写入到不同的类型中。
自定义id
在ElasticSearch中,_index/_type/_id
的组合可以唯一确定一个文档。如果我们不指定id的话,ElasticSearch将会自动为我们生产唯一的ID,自动生成的ID有20个字符长如下:
{
"_index"
:
"iteblog"
,
"_type"
:
"docs"
,
"_id"
:
"AVZy3d5sJfxPRwCjtWM-"
,
"_score"
:
1
,
"_source"
:
{
"arrival"
:
"Otopeni"
,
"SFO"
:
"San Fran"
}
}
|
很明显,这么长的字符串没啥意义,而且也不适合我们记忆使用。不过我们可以在插入数据的时候手动指定id的值,如下:
scala>
val
otp
=
Map(
"iata"
->
"OTP"
,
"name"
->
"Otopeni"
)
otp
:
scala.collection.immutable.Map[String,String]
=
Map(iata -> OTP, name -> Otopeni)
scala>
val
muc
=
Map(
"iata"
->
"MUC"
,
"name"
->
"Munich"
)
muc
:
scala.collection.immutable.Map[String,String]
=
Map(iata -> MUC, name -> Munich)
scala>
val
sfo
=
Map(
"iata"
->
"SFO"
,
"name"
->
"San Fran"
)
sfo
:
scala.collection.immutable.Map[String,String]
=
Map(iata -> SFO, name -> San Fran)
scala>
val
airportsRDD
=
sc.makeRDD(Seq((
1
, otp), (
2
, muc), (
3
, sfo)))
scala> airportsRDD.saveToEsWithMeta(
"iteblog/2015"
)
|
上面的Seq((1, otp), (2, muc), (3, sfo))
语句指定为各个对象指定了id值,分别为1、2、3。然后您可以通过/iteblog/2015/1
URL搜索到otp对象的值。我们还可以如下方式指定id:
scala>
val
json
1
=
""
"{"
id
" : 1, "
blog
" : "
www.iteblog.com
", "
weixin
" : "
iteblog
_
hadoop
"}"
""
json
1
:
String
=
{
"id"
:
1
,
"blog"
:
"www.iteblog.com"
,
"weixin"
:
"iteblog_hadoop"
}
scala>
val
json
2
=
""
"{"
id
" : 2, "
blog
" : "
books.iteblog.com
", "
weixin
" : "
iteblog
_
hadoop
"}"
""
json
2
:
String
=
{
"id"
:
2
,
"blog"
:
"books.iteblog.com"
,
"weixin"
:
"iteblog_hadoop"
}
scala>
val
rdd
=
sc.makeRDD(Seq(json
1
, json
2
))
scala> EsSpark.saveToEs(rdd,
"iteblog/docs"
, Map(
"es.mapping.id"
->
"id"
))
|
上面通过es.mapping.id
参数将对象中的id映射为每条记录的id。
自定义记录的元数据
我们甚至可以在写入数据的时候自定义记录的元数据,如下:
scala>
import
org.elasticsearch.spark.rdd.Metadata.
_
import
org.elasticsearch.spark.rdd.Metadata.
_
scala>
val
otp
=
Map(
"iata"
->
"OTP"
,
"name"
->
"Otopeni"
)
otp
:
scala.collection.immutable.Map[String,String]
=
Map(iata -> OTP, name -> Otopeni)
scala>
val
muc
=
Map(
"iata"
->
"MUC"
,
"name"
->
"Munich"
)
muc
:
scala.collection.immutable.Map[String,String]
=
Map(iata -> MUC, name -> Munich)
scala>
val
sfo
=
Map(
"iata"
->
"SFO"
,
"name"
->
"San Fran"
)
sfo
:
scala.collection.immutable.Map[String,String]
=
Map(iata -> SFO, name -> San Fran)
scala>
val
otpMeta
=
Map(ID ->
1
, TTL ->
"3h"
)
scala>
val
mucMeta
=
Map(ID ->
2
, VERSION ->
"23"
)
scala>
val
sfoMeta
=
Map(ID ->
3
)
scala>
val
airportsRDD
=
sc.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
scala> airportsRDD.saveToEsWithMeta(
"iteblog/2015"
)
|
上面的代码片段分别为otp,muc和sfo设置了不同的元数据,这在很多场景下是非常有用的。