Flink寫入數據到Elasticsearch示例


版本說明:flink-v1.11 elasticsearch-7.9

1.添加maven依賴

        <!-- elasticsearch connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
            <version>1.11.0</version>
        </dependency>

        <!-- jackson -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.11.1</version>
        </dependency>

2.設置Builder

        List<HttpHost> elsearchHosts = new ArrayList<>(); 
        elsearchHosts.add(new HttpHost("192.168.32.36", 9200, "http"));
        elsearchHosts.add(new HttpHost("192.168.32.37", 9200, "http"));
        elsearchHosts.add(new HttpHost("192.168.32.38", 9200, "http"));

        ObjectMapper mapper = new ObjectMapper(); // jaskson ObjectMapper

        ElasticsearchSink.Builder<ResultCollector> esSinkBuilder = new ElasticsearchSink.Builder<>( // ResultCollector 是你要保存的對象類型,替換即可
                elsearchHosts,
                new ElasticsearchSinkFunction<ResultCollector>() {

                    private static final long serialVersionUID = -6797861015704600807L;

                    public IndexRequest createIndexRequest(ResultCollector collector) throws Exception {
                        return Requests.indexRequest()
                                .index("flink-test-index") // 設置Index
                                .id(collector.getId()) // 設置ID
                    // 這里要特別注意需要傳map .source(mapper.readValue(mapper.writeValueAsString(collector), Map.
class)); } @SneakyThrows @Override public void process(ResultCollector collector, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { requestIndexer.add(createIndexRequest(collector)); } } ); esSinkBuilder.setBulkFlushMaxActions(1); esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler()); esSinkBuilder.setRestClientFactory((RestClientFactory) restClientBuilder -> { Header[] headers = new BasicHeader[]{new BasicHeader("Content-Type", "application/json")}; restClientBuilder.setDefaultHeaders(headers); });

3.addSink

        //stream.addSink(esSinkBuilder.build());

 參考: https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM