flink寫入es


flink寫入es

介紹

主要介紹實際中flink如何寫入設置es

flink版本:1.13.2

github地址:https://github.com/dahai1996/mdw-flink-quickstart


寫入es

引入依賴

    <!--es-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
			<version>${flink.version}</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.logging.log4j</groupId>
					<artifactId>log4j-to-slf4j</artifactId>
				</exclusion>
				<!--                <exclusion>-->
				<!--                    <groupId>com.fasterxml.jackson.core</groupId>-->
				<!--                    <artifactId>jackson-core</artifactId>-->
				<!--                </exclusion>-->
			</exclusions>
		</dependency>

注:排除日志的包防止沖突打不出日志

正常使用

List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost(ip, host, "http"));
        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new ESSink()
        );
        /*     必須設置flush參數     */
        //刷新前緩沖的最大動作量
        esSinkBuilder.setBulkFlushMaxActions(10);
        //刷新前緩沖區的最大數據大小(以MB為單位)
        esSinkBuilder.setBulkFlushMaxSizeMb(5);
        //論緩沖操作的數量或大小如何都要刷新的時間間隔
        esSinkBuilder.setBulkFlushInterval(5000L);

    //數據流添加sink
    dataStream.addSink(esSinkBuilder.build());

注:其中ESSink()方法是如何寫入es的具體實現,大概如下:

public static class ESSink implements ElasticsearchSinkFunction<String> 

寫一個類包裝下,方便后面快速創建

public class SinkEs<T> {
    public List<HttpHost> httpHosts = new ArrayList<>(1);
    public ElasticsearchSink.Builder<T> esSinkBuilder;

    /**
     * 獲取es sinkFunction
     * @param runEnv 包含執行環境地址的枚舉類
     * @param elasticsearchSinkFunction es轉化單條數據的邏輯方法
     */
    public SinkEs(RunEnv runEnv, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
        httpHosts.add(new HttpHost(runEnv.getEsHost(), runEnv.getEsPort(), "http"));
        esSinkBuilder = new ElasticsearchSink.Builder<T>(
                httpHosts,
                elasticsearchSinkFunction
        );
        esSinkBuilder.setBulkFlushMaxActions(1);
        esSinkBuilder.setBulkFlushMaxSizeMb(1);
        esSinkBuilder.setBulkFlushInterval(5000L);
        esSinkBuilder.setRestClientFactory(new RestClientFactory() {
            @Override
            public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
                restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                        httpAsyncClientBuilder.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
                            @Override
                            public long getKeepAliveDuration(HttpResponse httpResponse, HttpContext httpContext) {
                                return Duration.ofMinutes(5).toMillis();
                            }
                        });
                        return httpAsyncClientBuilder;
                    }
                });
            }
        });
    }

    /**
     * 獲取es sinkFunction
     * @param runEnv 包含執行環境地址的枚舉類
     * @param elasticsearchSinkFunction elasticsearchSinkFunction es轉化單條數據的邏輯方法
     * @param bulkFlushMaxActions 刷新前緩沖的最大動作量
     * @param bulkFlushMaxSizeMb 刷新前緩沖區的最大數據大小(以MB為單位)
     * @param bulkFlushInterval 論緩沖操作的數量或大小如何都要刷新的時間間隔
     */
    public SinkEs(RunEnv runEnv, ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
                  int bulkFlushMaxActions, int bulkFlushMaxSizeMb, Long bulkFlushInterval) {
        httpHosts.add(new HttpHost(runEnv.getEsHost(), runEnv.getEsPort(), "http"));
        esSinkBuilder = new ElasticsearchSink.Builder<T>(
                httpHosts,
                elasticsearchSinkFunction
        );
        esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);
        esSinkBuilder.setBulkFlushMaxSizeMb(bulkFlushMaxSizeMb);
        esSinkBuilder.setBulkFlushInterval(bulkFlushInterval);
    }


    public ElasticsearchSink<T> getSink() {
        return esSinkBuilder.build();
    }
}

之后可以快速創建es sink了:

SinkFunction<String> sinkEs = new SinkEs<>(
                uat,
                new ElasticsearchSinkFunction<String>() {
                    @Override
                    public void process(String s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {

                    }
                },
                1,
                5,
                5000L)
                .getSink();

注:這里的uat變量是包含了各個環境地址的枚舉類,詳情查看github代碼

注2:其中設置 setRestClientFactory() 方法的相關代碼功能如下:

es客戶端會創建一個存活時間無限的長連接,后續以使用這個長連接發送請求到服務器
如果長連接死亡,后續還是會使用這個長連接,就會報錯。
因此上面設置了長連接存活時間
具體哪個博客看的遺忘了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM