如果我們去閱讀一下Flink是如何將DataStream中的數據寫入ElasticSearch,你會發現其就是在使用bulk API。我順便將這個類改寫成支持將DataSet寫入到ElasticSearch中了,也就是實現了ElasticSearchOutputFormat
,目前代碼我開源到GitHub:https://github.com/397090770/flink-elasticsearch2-connector。編譯的包已經上傳到Maven中央倉庫(可以參見《如何發布Jar包到Maven中央倉庫》),這也就意味着你可以直接在pom.xml
文件中使用我那個ElasticSearchOutputFormat
:
<
dependency
>
<
groupId
>com.iteblog</
groupId
>
<
artifactId
>flink-elasticsearch2-connector</
artifactId
>
<
version
>1.0.2</
version
>
</
dependency
>
|
在Scala中使用
import
scala.collection.JavaConversions.
_
val
config
=
Map(
"bulk.flush.max.actions"
->
"1000"
,
"cluster.name"
->
"elasticsearch"
)
val
hosts
=
"www.iteblog.com"
val
transports
=
hosts.split(
","
).map(host
=
>
new
InetSocketAddress(InetAddress.getByName(host),
9300
)).toList
val
data
:
DataSet[String]
=
....
data.output(
new
ElasticSearchOutputFormat(config, transports,
new
ElasticsearchSinkFunction[String] {
def
createIndexRequest(element
:
String)
:
IndexRequest
=
{
Requests.indexRequest.index(
"iteblog"
).`
type
`(
"info"
).source(element)
}
override
def
process(element
:
String, ctx
:
RuntimeContext, indexer
:
RequestIndexer) {
indexer.add(createIndexRequest(element))
}
}))
|
在Java中使用
Map<String, String> config =
new
HashMap<>();
config.put(
"bulk.flush.max.actions"
,
"1000"
);
config.put(
"cluster.name"
,
"elasticsearch"
);
String hosts =
"www.iteblog.com"
;
List<InetSocketAddress> list = Lists.newArrayList();
for
(String host : hosts.split(
","
)) {
list.add(
new
InetSocketAddress(InetAddress.getByName(host),
9300
));
}
DataSet<String> data = ....;
data.output(
new
ElasticSearchOutputFormat<>(config, list,
new
ElasticsearchSinkFunction<String>() {
@Override
public
void
process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
private
IndexRequest createIndexRequest(String element) {
return
Requests.indexRequest().index(
"iteblog"
).type(
"info"
).source(element);
}
}));
|
我在同樣的環境下測試了寫入1,172,235條數據到ElasticSearch中,這次我只使用了20s不到!可見效率提高不少啊。
這個ElasticSearchOutputFormat
目前只支持寫入到Elasticsearch: 2.x.x,后面我會再寫個支持ElasticSearch 1.x.x的版本。