flink寫入es
介紹
主要介紹實際中flink如何寫入設置es
flink版本:1.13.2
github地址:https://github.com/dahai1996/mdw-flink-quickstart
寫入es
引入依賴
<!--es-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
</exclusion>
<!-- <exclusion>-->
<!-- <groupId>com.fasterxml.jackson.core</groupId>-->
<!-- <artifactId>jackson-core</artifactId>-->
<!-- </exclusion>-->
</exclusions>
</dependency>
注:排除日志的包防止沖突打不出日志
正常使用
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost(ip, host, "http"));
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ESSink()
);
/* 必須設置flush參數 */
//刷新前緩沖的最大動作量
esSinkBuilder.setBulkFlushMaxActions(10);
//刷新前緩沖區的最大數據大小(以MB為單位)
esSinkBuilder.setBulkFlushMaxSizeMb(5);
//論緩沖操作的數量或大小如何都要刷新的時間間隔
esSinkBuilder.setBulkFlushInterval(5000L);
//數據流添加sink
dataStream.addSink(esSinkBuilder.build());
注:其中ESSink()方法是如何寫入es的具體實現,大概如下:
public static class ESSink implements ElasticsearchSinkFunction<String>
寫一個類包裝下,方便后面快速創建
public class SinkEs<T> {
public List<HttpHost> httpHosts = new ArrayList<>(1);
public ElasticsearchSink.Builder<T> esSinkBuilder;
/**
* 獲取es sinkFunction
* @param runEnv 包含執行環境地址的枚舉類
* @param elasticsearchSinkFunction es轉化單條數據的邏輯方法
*/
public SinkEs(RunEnv runEnv, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
httpHosts.add(new HttpHost(runEnv.getEsHost(), runEnv.getEsPort(), "http"));
esSinkBuilder = new ElasticsearchSink.Builder<T>(
httpHosts,
elasticsearchSinkFunction
);
esSinkBuilder.setBulkFlushMaxActions(1);
esSinkBuilder.setBulkFlushMaxSizeMb(1);
esSinkBuilder.setBulkFlushInterval(5000L);
esSinkBuilder.setRestClientFactory(new RestClientFactory() {
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
httpAsyncClientBuilder.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
@Override
public long getKeepAliveDuration(HttpResponse httpResponse, HttpContext httpContext) {
return Duration.ofMinutes(5).toMillis();
}
});
return httpAsyncClientBuilder;
}
});
}
});
}
/**
* 獲取es sinkFunction
* @param runEnv 包含執行環境地址的枚舉類
* @param elasticsearchSinkFunction elasticsearchSinkFunction es轉化單條數據的邏輯方法
* @param bulkFlushMaxActions 刷新前緩沖的最大動作量
* @param bulkFlushMaxSizeMb 刷新前緩沖區的最大數據大小(以MB為單位)
* @param bulkFlushInterval 論緩沖操作的數量或大小如何都要刷新的時間間隔
*/
public SinkEs(RunEnv runEnv, ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
int bulkFlushMaxActions, int bulkFlushMaxSizeMb, Long bulkFlushInterval) {
httpHosts.add(new HttpHost(runEnv.getEsHost(), runEnv.getEsPort(), "http"));
esSinkBuilder = new ElasticsearchSink.Builder<T>(
httpHosts,
elasticsearchSinkFunction
);
esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);
esSinkBuilder.setBulkFlushMaxSizeMb(bulkFlushMaxSizeMb);
esSinkBuilder.setBulkFlushInterval(bulkFlushInterval);
}
public ElasticsearchSink<T> getSink() {
return esSinkBuilder.build();
}
}
之后可以快速創建es sink了:
SinkFunction<String> sinkEs = new SinkEs<>(
uat,
new ElasticsearchSinkFunction<String>() {
@Override
public void process(String s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
}
},
1,
5,
5000L)
.getSink();
注:這里的uat變量是包含了各個環境地址的枚舉類,詳情查看github代碼
注2:其中設置 setRestClientFactory() 方法的相關代碼功能如下:
es客戶端會創建一個存活時間無限的長連接,后續以使用這個長連接發送請求到服務器
如果長連接死亡,后續還是會使用這個長連接,就會報錯。
因此上面設置了長連接存活時間
具體哪個博客看的遺忘了
