使用Flink實現索引數據到Elasticsearch
使用Flink處理數據時,可以基於Flink提供的批式處理(Batch Processing)和流式處理(Streaming Processing)API來實現,分別能夠滿足不同場景下應用數據的處理。這兩種模式下,輸入處理都被抽象為Source Operator,包含對應輸入數據的處理邏輯;輸出處理都被抽象為Sink Operator,包含了對應輸出數據的處理邏輯。這里,我們只關注輸出的Sink Operator實現。
Flink批式處理模式,運行Flink Batch Job時作用在有界的輸入數據集上,所以Job運行的時間是有時限的,一旦Job運行完成,對應的整個數據處理應用就已經結束,比如,輸入是一個數據文件,或者一個Hive SQL查詢對應的結果集,等等。在批式處理模式下處理數據的輸出時,主要需要實現一個自定義的OutputFormat,然后基於該OutputFormat來構建一個Sink,下面看下OutputFormat接口的定義,如下所示:
|
1
2
3
4
5
6
7
|
@Public
public
interface
OutputFormat<IT>
extends
Serializable {
void
configure(Configuration parameters);
void
open(
int
taskNumber,
int
numTasks)
throws
IOException;
void
writeRecord(IT record)
throws
IOException;
void
close()
throws
IOException;
}
|
上面,configure()方法用來配置一個OutputFormat的一些輸出參數;open()方法用來實現與外部存儲系統建立連接;writeRecord()方法用來實現對Flink Batch Job處理后,將數據記錄輸出到外部存儲系統。開發Batch Job時,通過調用DataSet的output()方法,參數值使用一個OutputFormat的具體實現即可。后面,我們會基於Elasticsearch來實現上面接口中的各個方法。
Flink流式處理模式,運行Flink Streaming Job時一般輸入的數據集為流數據集,也就是說輸入數據元素會持續不斷地進入到Streaming Job的處理過程中,但你仍然可以使用一個HDFS數據文件作為Streaming Job的輸入,即使這樣,一個Flink Streaming Job啟動運行后便會永遠運行下去,除非有意外故障或有計划地操作使其終止。在流式處理模式下處理數據的輸出時,我們需要是實現一個SinkFunction,它指定了如下將流數據處理后的結果,輸出到指定的外部存儲系統中,下面看下SinkFunction的接口定義,如下所示:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
@Public
public
interface
SinkFunction<IN>
extends
Function, Serializable {
@Deprecated
default
void
invoke(IN value)
throws
Exception {}
default
void
invoke(IN value, Context context)
throws
Exception {
invoke(value);
}
@Public
interface
Context<T> {
long
currentProcessingTime();
long
currentWatermark();
Long timestamp();
}
}
|
通過上面接口可以看到,需要實現一個invoke()方法,實現該方法來將一個輸入的IN value輸出到外部存儲系統中。一般情況下,對一些主流的外部存儲系統,Flink實現了一下內置(社區貢獻)的SinkFunction,我們只需要配置一下就可以直接使用。而且,對於Streaming Job來說,實現的SinkFunction比較豐富一些,可以減少自己開發的工作量。開發Streaming Job時,通過調用DataStream的addSink()方法,參數是一個SinkFlink的具體實現。
下面,我們分別基於批式處理模式和批式處理模式,分別使用或實現對應組件將Streaming Job和Batch Job的處理結果輸出到Elasticsearch中:
基於Flink DataSteam API實現
在開發基於Flink的應用程序過程中,發現Flink Streaming API對Elasticsearch的支持還是比較好的,比如,如果想要從Kafka消費事件記錄,經過處理最終將數據記錄索引到Elasticsearch 5.x,可以直接在Maven的POM文件中添加如下依賴即可:
|
1
2
3
4
5
|
<
dependency
>
<
groupId
>org.apache.flink</
groupId
>
<
artifactId
>flink-connector-elasticsearch5_2.11</
artifactId
>
<
version
>1.5.3</
version
>
</
dependency
>
|
我們使用Flink Streaming API來實現將流式數據處理后,寫入到Elasticsearch中。其中,輸入數據源是Kafka中的某個Topic;輸出處理結果到lasticsearch中,我們使用使用Transport API的方式來連接Elasticsearch,需要指定Transport地址和端口。具體實現,對應的Scala代碼,如下所示:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
def
main(args
:
Array[String])
:
Unit
=
{
// parse input arguments
val
params
=
ParameterTool.fromArgs(args)
if
(params.getNumberOfParameters <
9
) {
val
cmd
=
getClass.getName
println(
"Missing parameters!\n"
+
"Usage: "
+ cmd
+
" --input-topic <topic> "
+
"--es-cluster-name <es cluster name> "
+
"--es-transport-addresses <es address> "
+
"--es-port <es port> "
+
"--es-index <es index> "
+
"--es-type <es type> "
+
"--bootstrap.servers <kafka brokers> "
+
"--zookeeper.connect <zk quorum> "
+
"--group.id <some id> [--prefix <prefix>]"
)
return
}
val
env
=
StreamExecutionEnvironment.getExecutionEnvironment
val
kafkaConsumer
=
new
FlinkKafkaConsumer
010
[String](
params.getRequired(
"input-topic"
),
new
SimpleStringSchema(),
params.getProperties
)
val
dataStream
=
env
.addSource(kafkaConsumer)
.filter(!
_
.isEmpty)
val
esClusterName
=
params.getRequired(
"es-cluster-name"
)
val
esAddresses
=
params.getRequired(
"es-transport-addresses"
)
val
esPort
=
params.getInt(
"es-port"
,
9300
)
val
transportAddresses
=
new
java.util.ArrayList[InetSocketAddress]
val
config
=
new
java.util.HashMap[String, String]
config.put(
"cluster.name"
, esClusterName)
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put(
"bulk.flush.max.actions"
,
"100"
)
esAddresses.split(
","
).foreach(address
=
> {
transportAddresses.add(
new
InetSocketAddress(InetAddress.getByName(address), esPort))
})
val
esIndex
=
params.getRequired(
"es-index"
)
val
esType
=
params.getRequired(
"es-type"
)
val
sink
=
new
ElasticsearchSink(config, transportAddresses,
new
ElasticsearchSinkFunction[String] {
def
createIndexRequest(element
:
String)
:
IndexRequest
=
{
return
Requests.indexRequest()
.index(esIndex)
.`
type
`(esType)
.source(element)
}
override
def
process(t
:
String, runtimeContext
:
RuntimeContext, requestIndexer
:
RequestIndexer)
:
Unit
=
{
requestIndexer.add(createIndexRequest(t))
}
})
dataStream.addSink(sink)
val
jobName
=
getClass.getSimpleName
env.execute(jobName)
}
|
上面有關數據索引到Elasticsearch的處理中, 最核心的就是創建一個ElasticsearchSink,然后通過DataStream的API調用addSink()添加一個Sink,實際是一個SinkFunction的實現,可以參考Flink對應DataStream類的addSink()方法代碼,如下所示:
|
1
2
|
def
addSink(sinkFunction
:
SinkFunction[T])
:
DataStreamSink[T]
=
stream.addSink(sinkFunction)
|
基於Flink DataSet API實現
目前,Flink還沒有在Batch處理模式下實現對應Elasticsearch對應的Connector,需要自己根據需要實現,所以我們基於Flink已經存在的Streaming處理模式下已經實現的Elasticsearch Connector對應的代碼,經過部分修改,可以直接拿來在Batch處理模式下,將數據記錄批量索引到Elasticsearch中。
我們基於Flink 1.6.1版本,以及Elasticsearch 6.3.2版本,並且使用Elasticsearch推薦的High Level REST API來實現(為了復用Flink 1.6.1中對應的Streaming處理模式下的Elasticsearch 6 Connector實現代碼,我們選擇使用該REST Client),需要在Maven的POM文件中添加如下依賴:
|
01
02
03
04
05
06
07
08
09
10
|
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.3.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.3.2</version>
</dependency>
|
我們實現的各個類的類圖及其關系,如下圖所示:
如果熟悉Flink Streaming處理模式下Elasticsearch對應的Connector實現,可以看到上面的很多類都在org.apache.flink.streaming.connectors.elasticsearch包里面存在,其中包括批量向Elasticsearch中索引數據(內部實現了使用BulkProcessor)。上圖中引入的ElasticsearchApiCallBridge,目的是能夠實現對Elasticsearch不同版本的支持,只需要根據Elasticsearch不同版本中不同Client實現,進行一些適配,上層抽象保持不變。
如果需要在Batch處理模式下批量索引數據到Elasticsearch,可以直接使用ElasticsearchOutputFormat即可實現。但是創建ElasticsearchOutputFormat,需要幾個參數:
|
1
2
3
4
5
6
7
8
|
private
ElasticsearchOutputFormat(
Map<String, String> bulkRequestsConfig,
List<HttpHost> httpHosts,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
DocWriteRequestFailureHandler failureHandler,
RestClientFactory restClientFactory) {
super
(
new
Elasticsearch6ApiCallBridge(httpHosts, restClientFactory), bulkRequestsConfig, elasticsearchSinkFunction, failureHandler);
}
|
當然,我們可以通過代碼中提供的Builder來非常方便的創建一個ElasticsearchOutputFormat。下面,我們看下我們Flink Batch Job實現邏輯。
- 實現ElasticsearchSinkFunction
我們需要實現ElasticsearchSinkFunction接口,實現一個能夠索引數據到Elasticsearch中的功能,代碼如下所示:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
final
ElasticsearchSinkFunction<String> elasticsearchSinkFunction =
new
ElasticsearchSinkFunction<String>() {
@Override
public
void
process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element, parameterTool));
}
private
IndexRequest createIndexRequest(String element, ParameterTool parameterTool) {
LOG.info(
"Create index req: "
+ element);
JSONObject o = JSONObject.parseObject(element);
return
Requests.indexRequest()
.index(parameterTool.getRequired(
"es-index"
))
.type(parameterTool.getRequired(
"es-type"
))
.source(o);
}
};
|
上面代碼,主要是把一個將要輸出的數據記錄,通過RequestIndexer來實現索引到Elasticsearch中。
- 讀取Elasticsearch配置參數
配置連接Elasticsearch的參數。從程序輸入的ParameterTool中讀取Elasticsearch相關的配置:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
|
String esHttpHosts = parameterTool.getRequired(
"es-http-hosts"
);
LOG.info(
"Config: esHttpHosts="
+ esHttpHosts);
int
esHttpPort = parameterTool.getInt(
"es-http-port"
,
9200
);
LOG.info(
"Config: esHttpPort="
+ esHttpPort);
final
List<HttpHost> httpHosts = Arrays.asList(esHttpHosts.split(
","
))
.stream()
.map(host ->
new
HttpHost(host, esHttpPort,
"http"
))
.collect(Collectors.toList());
int
bulkFlushMaxSizeMb = parameterTool.getInt(
"bulk-flush-max-size-mb"
,
10
);
int
bulkFlushIntervalMillis = parameterTool.getInt(
"bulk-flush-interval-millis"
,
10
*
1000
);
int
bulkFlushMaxActions = parameterTool.getInt(
"bulk-flush-max-actions"
,
1
);
|
- 創建ElasticsearchOutputFormat
創建一個我們實現的ElasticsearchOutputFormat,代碼片段如下所示:
|
1
2
3
4
5
6
7
8
|
final
ElasticsearchOutputFormat outputFormat =
new
Builder<>(httpHosts, elasticsearchSinkFunction)
.setBulkFlushBackoff(
true
)
.setBulkFlushBackoffRetries(
2
)
.setBulkFlushBackoffType(ElasticsearchApiCallBridge.FlushBackoffType.EXPONENTIAL)
.setBulkFlushMaxSizeMb(bulkFlushMaxSizeMb)
.setBulkFlushInterval(bulkFlushIntervalMillis)
.setBulkFlushMaxActions(bulkFlushMaxActions)
.build();
|
上面很多配置項指定了向Elasticsearch中進行批量寫入的行為,在ElasticsearchOutputFormat內部會進行設置並創建Elasticsearch6BulkProcessorIndexer,優化索引數據處理的性能。
- 實現Batch Job主控制流程
最后我們就可以構建我們的Flink Batch應用程序了,代碼如下所示:
|
1
2
3
4
5
6
7
8
|
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.readTextFile(file)
.filter(line -> !line.isEmpty())
.map(line -> line)
.output(outputFormat);
final
String jobName = ImportHDFSDataToES.
class
.getSimpleName();
env.execute(jobName);
|
我們輸入的HDFS文件中,是一些已經加工好的JSON格式記錄行,這里為了簡單,直接將原始JSON字符串索引到Elasticsearch中,而沒有進行更多其他的處理操作。
有關Flink批式處理模式下,Elasticsearch對應的OutputFormat實現的完整代碼,可以參考這里:
https://github.com/shirdrn/flink-app-jobs/tree/master/src/main/java/org/shirdrn/flink/connector/batch/elasticsearch。
參考鏈接
- https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/elasticsearch.html
- https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/#data-sinks
本文基於署名-非商業性使用-相同方式共享 4.0許可協議發布,歡迎轉載、使用、重新發布,但務必保留文章署名時延軍(包含鏈接:http://shiyanjun.cn),不得用於商業目的,基於本文修改后的作品務必以相同的許可發布。如有任何疑問,請與我聯系。
