使用Flink實現索引數據到Elasticsearch


使用Flink實現索引數據到Elasticsearch

使用Flink處理數據時,可以基於Flink提供的批式處理(Batch Processing)和流式處理(Streaming Processing)API來實現,分別能夠滿足不同場景下應用數據的處理。這兩種模式下,輸入處理都被抽象為Source Operator,包含對應輸入數據的處理邏輯;輸出處理都被抽象為Sink Operator,包含了對應輸出數據的處理邏輯。這里,我們只關注輸出的Sink Operator實現。
Flink批式處理模式,運行Flink Batch Job時作用在有界的輸入數據集上,所以Job運行的時間是有時限的,一旦Job運行完成,對應的整個數據處理應用就已經結束,比如,輸入是一個數據文件,或者一個Hive SQL查詢對應的結果集,等等。在批式處理模式下處理數據的輸出時,主要需要實現一個自定義的OutputFormat,然后基於該OutputFormat來構建一個Sink,下面看下OutputFormat接口的定義,如下所示:

1
2
3
4
5
6
7
@Public
public interface OutputFormat<IT> extends Serializable {
     void configure(Configuration parameters);
     void open( int taskNumber, int numTasks) throws IOException;
     void writeRecord(IT record) throws IOException;
     void close() throws IOException;
}

上面,configure()方法用來配置一個OutputFormat的一些輸出參數;open()方法用來實現與外部存儲系統建立連接;writeRecord()方法用來實現對Flink Batch Job處理后,將數據記錄輸出到外部存儲系統。開發Batch Job時,通過調用DataSet的output()方法,參數值使用一個OutputFormat的具體實現即可。后面,我們會基於Elasticsearch來實現上面接口中的各個方法。
Flink流式處理模式,運行Flink Streaming Job時一般輸入的數據集為流數據集,也就是說輸入數據元素會持續不斷地進入到Streaming Job的處理過程中,但你仍然可以使用一個HDFS數據文件作為Streaming Job的輸入,即使這樣,一個Flink Streaming Job啟動運行后便會永遠運行下去,除非有意外故障或有計划地操作使其終止。在流式處理模式下處理數據的輸出時,我們需要是實現一個SinkFunction,它指定了如下將流數據處理后的結果,輸出到指定的外部存儲系統中,下面看下SinkFunction的接口定義,如下所示:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
@Public
public interface SinkFunction<IN> extends Function, Serializable {
     @Deprecated
     default void invoke(IN value) throws Exception {}
     default void invoke(IN value, Context context) throws Exception {
         invoke(value);
     }
 
     @Public
     interface Context<T> {
         long currentProcessingTime();
         long currentWatermark();
         Long timestamp();
     }
}

通過上面接口可以看到,需要實現一個invoke()方法,實現該方法來將一個輸入的IN value輸出到外部存儲系統中。一般情況下,對一些主流的外部存儲系統,Flink實現了一下內置(社區貢獻)的SinkFunction,我們只需要配置一下就可以直接使用。而且,對於Streaming Job來說,實現的SinkFunction比較豐富一些,可以減少自己開發的工作量。開發Streaming Job時,通過調用DataStream的addSink()方法,參數是一個SinkFlink的具體實現。
下面,我們分別基於批式處理模式和批式處理模式,分別使用或實現對應組件將Streaming Job和Batch Job的處理結果輸出到Elasticsearch中:

基於Flink DataSteam API實現

在開發基於Flink的應用程序過程中,發現Flink Streaming API對Elasticsearch的支持還是比較好的,比如,如果想要從Kafka消費事件記錄,經過處理最終將數據記錄索引到Elasticsearch 5.x,可以直接在Maven的POM文件中添加如下依賴即可:

1
2
3
4
5
< dependency >
    < groupId >org.apache.flink</ groupId >
    < artifactId >flink-connector-elasticsearch5_2.11</ artifactId >
    < version >1.5.3</ version >
  </ dependency >

我們使用Flink Streaming API來實現將流式數據處理后,寫入到Elasticsearch中。其中,輸入數據源是Kafka中的某個Topic;輸出處理結果到lasticsearch中,我們使用使用Transport API的方式來連接Elasticsearch,需要指定Transport地址和端口。具體實現,對應的Scala代碼,如下所示:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def main(args : Array[String]) : Unit = {
   // parse input arguments
   val params = ParameterTool.fromArgs(args)
 
   if (params.getNumberOfParameters < 9 ) {
     val cmd = getClass.getName
     println( "Missing parameters!\n"
       + "Usage: " + cmd
       + " --input-topic <topic> "
       + "--es-cluster-name <es cluster name> "
       + "--es-transport-addresses <es address> "
       + "--es-port <es port> "
       + "--es-index <es index> "
       + "--es-type <es type> "
       + "--bootstrap.servers <kafka brokers> "
       + "--zookeeper.connect <zk quorum> "
       + "--group.id <some id> [--prefix <prefix>]" )
     return
   }
 
   val env = StreamExecutionEnvironment.getExecutionEnvironment
 
   val kafkaConsumer = new FlinkKafkaConsumer 010 [String](
     params.getRequired( "input-topic" ),
     new SimpleStringSchema(),
     params.getProperties
   )
 
   val dataStream = env
     .addSource(kafkaConsumer)
     .filter(! _ .isEmpty)
 
   val esClusterName = params.getRequired( "es-cluster-name" )
   val esAddresses = params.getRequired( "es-transport-addresses" )
   val esPort = params.getInt( "es-port" , 9300 )
   val transportAddresses = new java.util.ArrayList[InetSocketAddress]
 
   val config = new java.util.HashMap[String, String]
   config.put( "cluster.name" , esClusterName)
   // This instructs the sink to emit after every element, otherwise they would be buffered
   config.put( "bulk.flush.max.actions" , "100" )
 
   esAddresses.split( "," ).foreach(address = > {
     transportAddresses.add( new InetSocketAddress(InetAddress.getByName(address), esPort))
   })
   val esIndex = params.getRequired( "es-index" )
   val esType = params.getRequired( "es-type" )
   val sink = new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
 
     def createIndexRequest(element : String) : IndexRequest = {
       return Requests.indexRequest()
         .index(esIndex)
         .` type `(esType)
         .source(element)
     }
 
     override def process(t : String, runtimeContext : RuntimeContext, requestIndexer : RequestIndexer) : Unit = {
       requestIndexer.add(createIndexRequest(t))
     }
   })
   dataStream.addSink(sink)
 
   val jobName = getClass.getSimpleName
   env.execute(jobName)
}

上面有關數據索引到Elasticsearch的處理中, 最核心的就是創建一個ElasticsearchSink,然后通過DataStream的API調用addSink()添加一個Sink,實際是一個SinkFunction的實現,可以參考Flink對應DataStream類的addSink()方法代碼,如下所示:

1
2
def addSink(sinkFunction : SinkFunction[T]) : DataStreamSink[T] =
   stream.addSink(sinkFunction)

基於Flink DataSet API實現

目前,Flink還沒有在Batch處理模式下實現對應Elasticsearch對應的Connector,需要自己根據需要實現,所以我們基於Flink已經存在的Streaming處理模式下已經實現的Elasticsearch Connector對應的代碼,經過部分修改,可以直接拿來在Batch處理模式下,將數據記錄批量索引到Elasticsearch中。
我們基於Flink 1.6.1版本,以及Elasticsearch 6.3.2版本,並且使用Elasticsearch推薦的High Level REST API來實現(為了復用Flink 1.6.1中對應的Streaming處理模式下的Elasticsearch 6 Connector實現代碼,我們選擇使用該REST Client),需要在Maven的POM文件中添加如下依賴:

01
02
03
04
05
06
07
08
09
10
<dependency>
   <groupId>org.elasticsearch</groupId>
   <artifactId>elasticsearch</artifactId>
   <version>6.3.2</version>
</dependency>
<dependency>
   <groupId>org.elasticsearch.client</groupId>
   <artifactId>elasticsearch-rest-high-level-client</artifactId>
   <version>6.3.2</version>
</dependency>

我們實現的各個類的類圖及其關系,如下圖所示:
Flink-Batch-Connector-Elasticsearch
如果熟悉Flink Streaming處理模式下Elasticsearch對應的Connector實現,可以看到上面的很多類都在org.apache.flink.streaming.connectors.elasticsearch包里面存在,其中包括批量向Elasticsearch中索引數據(內部實現了使用BulkProcessor)。上圖中引入的ElasticsearchApiCallBridge,目的是能夠實現對Elasticsearch不同版本的支持,只需要根據Elasticsearch不同版本中不同Client實現,進行一些適配,上層抽象保持不變。
如果需要在Batch處理模式下批量索引數據到Elasticsearch,可以直接使用ElasticsearchOutputFormat即可實現。但是創建ElasticsearchOutputFormat,需要幾個參數:

1
2
3
4
5
6
7
8
private ElasticsearchOutputFormat(
     Map<String, String> bulkRequestsConfig,
     List<HttpHost> httpHosts,
     ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
     DocWriteRequestFailureHandler failureHandler,
     RestClientFactory restClientFactory) {
   super ( new Elasticsearch6ApiCallBridge(httpHosts, restClientFactory),  bulkRequestsConfig, elasticsearchSinkFunction, failureHandler);
}

當然,我們可以通過代碼中提供的Builder來非常方便的創建一個ElasticsearchOutputFormat。下面,我們看下我們Flink Batch Job實現邏輯。

  • 實現ElasticsearchSinkFunction

我們需要實現ElasticsearchSinkFunction接口,實現一個能夠索引數據到Elasticsearch中的功能,代碼如下所示:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
final ElasticsearchSinkFunction<String> elasticsearchSinkFunction = new ElasticsearchSinkFunction<String>() {
 
    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
      indexer.add(createIndexRequest(element, parameterTool));
    }
 
    private IndexRequest createIndexRequest(String element, ParameterTool parameterTool) {
      LOG.info( "Create index req: " + element);
      JSONObject o = JSONObject.parseObject(element);
      return Requests.indexRequest()
              .index(parameterTool.getRequired( "es-index" ))
              .type(parameterTool.getRequired( "es-type" ))
              .source(o);
    }
  };

上面代碼,主要是把一個將要輸出的數據記錄,通過RequestIndexer來實現索引到Elasticsearch中。

  • 讀取Elasticsearch配置參數

配置連接Elasticsearch的參數。從程序輸入的ParameterTool中讀取Elasticsearch相關的配置:

01
02
03
04
05
06
07
08
09
10
11
12
13
String esHttpHosts = parameterTool.getRequired( "es-http-hosts" );
LOG.info( "Config: esHttpHosts=" + esHttpHosts);
int esHttpPort = parameterTool.getInt( "es-http-port" , 9200 );
LOG.info( "Config: esHttpPort=" + esHttpPort);
 
final List<HttpHost> httpHosts = Arrays.asList(esHttpHosts.split( "," ))
         .stream()
         .map(host -> new HttpHost(host, esHttpPort, "http" ))
         .collect(Collectors.toList());
 
int bulkFlushMaxSizeMb = parameterTool.getInt( "bulk-flush-max-size-mb" , 10 );
int bulkFlushIntervalMillis = parameterTool.getInt( "bulk-flush-interval-millis" , 10 * 1000 );
int bulkFlushMaxActions = parameterTool.getInt( "bulk-flush-max-actions" , 1 );
  • 創建ElasticsearchOutputFormat

創建一個我們實現的ElasticsearchOutputFormat,代碼片段如下所示:

1
2
3
4
5
6
7
8
final ElasticsearchOutputFormat outputFormat = new Builder<>(httpHosts, elasticsearchSinkFunction)
         .setBulkFlushBackoff( true )
         .setBulkFlushBackoffRetries( 2 )
         .setBulkFlushBackoffType(ElasticsearchApiCallBridge.FlushBackoffType.EXPONENTIAL)
         .setBulkFlushMaxSizeMb(bulkFlushMaxSizeMb)
         .setBulkFlushInterval(bulkFlushIntervalMillis)
         .setBulkFlushMaxActions(bulkFlushMaxActions)
         .build();

上面很多配置項指定了向Elasticsearch中進行批量寫入的行為,在ElasticsearchOutputFormat內部會進行設置並創建Elasticsearch6BulkProcessorIndexer,優化索引數據處理的性能。

  • 實現Batch Job主控制流程

最后我們就可以構建我們的Flink Batch應用程序了,代碼如下所示:

1
2
3
4
5
6
7
8
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.readTextFile(file)
     .filter(line -> !line.isEmpty())
     .map(line -> line)
     .output(outputFormat);
 
final String jobName = ImportHDFSDataToES. class .getSimpleName();
env.execute(jobName);

我們輸入的HDFS文件中,是一些已經加工好的JSON格式記錄行,這里為了簡單,直接將原始JSON字符串索引到Elasticsearch中,而沒有進行更多其他的處理操作。

有關Flink批式處理模式下,Elasticsearch對應的OutputFormat實現的完整代碼,可以參考這里:
https://github.com/shirdrn/flink-app-jobs/tree/master/src/main/java/org/shirdrn/flink/connector/batch/elasticsearch

參考鏈接

Creative Commons License

本文基於署名-非商業性使用-相同方式共享 4.0許可協議發布,歡迎轉載、使用、重新發布,但務必保留文章署名時延軍(包含鏈接:http://shiyanjun.cn),不得用於商業目的,基於本文修改后的作品務必以相同的許可發布。如有任何疑問,請與我聯系


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM