Flink DataSet中的數據寫入到ElasticSearch(高級篇)


如果我們去閱讀一下Flink是如何將DataStream中的數據寫入ElasticSearch,你會發現其就是在使用bulk API。我順便將這個類改寫成支持將DataSet寫入到ElasticSearch中了,也就是實現了ElasticSearchOutputFormat,目前代碼我開源到GitHub:https://github.com/397090770/flink-elasticsearch2-connector。編譯的包已經上傳到Maven中央倉庫(可以參見《如何發布Jar包到Maven中央倉庫》),這也就意味着你可以直接在pom.xml文件中使用我那個ElasticSearchOutputFormat

< dependency >
        < groupId >com.iteblog</ groupId >
        < artifactId >flink-elasticsearch2-connector</ artifactId >
        < version >1.0.2</ version >
</ dependency >

在Scala中使用

import scala.collection.JavaConversions. _
val config = Map( "bulk.flush.max.actions" -> "1000" , "cluster.name" -> "elasticsearch" )
val hosts = "www.iteblog.com"
 
val transports = hosts.split( "," ).map(host = > new InetSocketAddress(InetAddress.getByName(host), 9300 )).toList
 
val data : DataSet[String] = ....
data.output( new ElasticSearchOutputFormat(config, transports, new ElasticsearchSinkFunction[String] {
       def createIndexRequest(element : String) : IndexRequest = {
         Requests.indexRequest.index( "iteblog" ).` type `( "info" ).source(element)
       }
 
       override def process(element : String, ctx : RuntimeContext, indexer : RequestIndexer) {
         indexer.add(createIndexRequest(element))
       }
}))

在Java中使用

Map<String, String> config = new HashMap<>();
config.put( "bulk.flush.max.actions" , "1000" );
config.put( "cluster.name" , "elasticsearch" );
 
String hosts = "www.iteblog.com" ;
 
List<InetSocketAddress> list = Lists.newArrayList();
for (String host : hosts.split( "," )) {
     list.add( new InetSocketAddress(InetAddress.getByName(host), 9300 ));
}
 
DataSet<String> data  = ....;
 
data.output( new ElasticSearchOutputFormat<>(config, list, new ElasticsearchSinkFunction<String>() {
     @Override
     public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
         indexer.add(createIndexRequest(element));
     }
 
     private IndexRequest createIndexRequest(String element) {
         return Requests.indexRequest().index( "iteblog" ).type( "info" ).source(element);
     }
}));

我在同樣的環境下測試了寫入1,172,235條數據到ElasticSearch中,這次我只使用了20s不到!可見效率提高不少啊。

這個ElasticSearchOutputFormat目前只支持寫入到Elasticsearch: 2.x.x,后面我會再寫個支持ElasticSearch 1.x.x的版本。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM