Flink實時處理並將結果寫入ElasticSearch實戰


參考原博客: https://blog.csdn.net/weixin_44516305/article/details/90258883

 

1 需求分析

使用Flink對實時數據流進行實時處理,並將處理后的結果保存到Elasticsearch中,在Elasticsearch中使用IK Analyzer中文分詞器對指定字段進行分詞。

為了模擬獲取流式數據,自定義一個流式並行數據源,每隔10ms生成一個Customer類型的數據對象並返回給Flink進行處理。

Flink處理后的結果保存在Elasticsearch中的index_customer索引的type_customer類型中,並且對description字段的數據使用IK Analyzer中文分詞器進行分詞。

 

 

2 Flink實時處理

2.1 版本說明

 

  1. Flink:1.8.0
  2. Elasticsearch:6.5.4
  3. JDK:1.8

使用IDEA創建一個名稱為FlinkElasticsearchDemo的Maven工程,目錄結構如下圖所示:

 

 

 

2.3 程序代碼

  1. 在pom.xml中引入flink以及flink連接elasticsearch相關的依賴,代碼如下所示:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.flink</groupId>
    <artifactId>flink-elasticsearch-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.56</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

  

2. 創建兩個具有依賴關系的實體類Customer和Address,用於封裝實時數據,代碼如下所示:

package com.flink.domain;

import java.util.Date;

/**
 * 客戶實體類
 */
public class Customer {
    private Long id;
    private String name;
    private Boolean gender;
    private Date birth;
    private Address address;
    private String description;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Boolean getGender() {
        return gender;
    }

    public void setGender(Boolean gender) {
        this.gender = gender;
    }

    public Date getBirth() {
        return birth;
    }

    public void setBirth(Date birth) {
        this.birth = birth;
    }

    public Address getAddress() {
        return address;
    }

    public void setAddress(Address address) {
        this.address = address;
    }

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }
}

  

package com.flink.domain;

/**
 * 地址實體類
 */
public class Address {
    private Integer id;
    private String province;
    private String city;

    public Address(Integer id, String province, String city) {
        this.id = id;
        this.province = province;
        this.city = city;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getProvince() {
        return province;
    }

    public void setProvince(String province) {
        this.province = province;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }
}

  

 

3. 自定義一個獲取流式實時數據的Flink數據源,如下所示:

 

package com.flink.source;

import com.flink.domain.Address;
import com.flink.domain.Customer;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.Date;
import java.util.Random;

/**
 * 自定義的流式並行數據源
 */
public class StreamParallelSource implements ParallelSourceFunction<Customer> {

    private boolean isRunning = true;
    private String[] names = new String[5];
    private Address[] addresses = new Address[5];
    private Random random = new Random();
    private Long id = 1L;

    public  void init() {
        names[0] = "劉備";
        names[1] = "關羽";
        names[2] = "張飛";
        names[3] = "曹操";
        names[4] = "諸葛亮";

        addresses[0]= new Address(1, "湖北省", "武漢市");
        addresses[1]= new Address(2, "湖北省", "黃岡市");
        addresses[2]= new Address(3, "廣東省", "廣州市");
        addresses[3]= new Address(4, "廣東省", "深圳市");
        addresses[4]= new Address(5, "浙江省", "杭州市");
    }

    /**
     * 每隔10ms生成一個Customer數據對象(模擬獲取實時數據)
     */
    @Override
    public void run(SourceContext sourceContext) throws Exception {
        init();
        while(isRunning) {
            int nameIndex = random.nextInt(5);
            int addressIndex = random.nextInt(5);

            Customer customer = new Customer();
            customer.setId(id++);
            customer.setName(names[nameIndex]);
            customer.setGender(random.nextBoolean());
            customer.setBirth(new Date());
            customer.setAddress(addresses[addressIndex]);
            customer.setDescription("" + names[nameIndex] + "來自" + addresses[addressIndex].getProvince() + addresses[addressIndex].getCity());
            /**
             * 把創建的數據返回給Flink進行處理
             */
            sourceContext.collect(customer);
            Thread.sleep(10);
        }
    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }
}

 

  

4. 編寫一個Flink實時處理流式數據的主程序,代碼如下所示:

package com.flink.main;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.flink.domain.Customer;
import com.flink.source.StreamParallelSource;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.List;

/**
 * Flink實時處理並將結果寫入到ElasticSearch主程序
 */
public class FlinkToElasticSearchApp {

    public static void main(String[] args) throws Exception {
        /**
         * 獲取流處理環境並設置並行度
         */
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        /**
         * 指定數據源為自定義的流式並行數據源
         */
        DataStream<Customer> source = env.addSource(new StreamParallelSource());

        /**
         * 對數據進行過濾
         */
        DataStream<Customer> filterSource = source.filter(new FilterFunction<Customer>() {
            @Override
            public boolean filter(Customer customer) throws Exception {
                if (customer.getName().equals("曹操") && customer.getAddress().getProvince().equals("湖北省")) {
                    return false;
                }
                return true;
            }
        });

        /**
         * 對過濾后的數據進行轉換
         */
        DataStream<JSONObject> transSource = filterSource.map(new MapFunction<Customer, JSONObject>() {
            @Override
            public JSONObject map(Customer customer) throws Exception {
                String jsonString = JSONObject.toJSONString(customer, SerializerFeature.WriteDateUseDateFormat);
                System.out.println("當前正在處理:" + jsonString);
                JSONObject jsonObject = JSONObject.parseObject(jsonString);
                return jsonObject;
            }
        });

        /**
         * 創建一個ElasticSearchSink對象
         */
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("localhost", 9200, "http"));
        ElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<JSONObject>(
                httpHosts,
                new ElasticsearchSinkFunction<JSONObject>() {
                    @Override
                    public void process(JSONObject customer, RuntimeContext ctx, RequestIndexer indexer) {
                        // 數據保存在Elasticsearch中名稱為index_customer的索引中,保存的類型名稱為type_customer
                        indexer.add(Requests.indexRequest().index("index_customer").type("type_customer").id(String.valueOf(customer.getLong("id"))).source(customer));
                    }
                }
        );
        // 設置批量寫數據的緩沖區大小
        esSinkBuilder.setBulkFlushMaxActions(50);

        /**
         * 把轉換后的數據寫入到ElasticSearch中
         */
        transSource.addSink(esSinkBuilder.build());

        /**
         * 執行
         */
        env.execute("execute FlinkElasticsearchDemo");
    }

}

  

至此,使用Flink對流式數據進行實時處理並將處理結果保存到Elasticsearch中的程序已經全部完成。

說明:Flink把數據保存到Elasticsearch時,如果Elasticsearch中沒有提前創建對應名稱的索引,則會自動創建對應名稱的索引。

如果不需要在Elasticsearch中對指定字段使用IK Analyzer中文分詞器進行分詞,則不需要閱讀第3節內容,直接閱讀第4節即可。

 

 

3 Elasticsearch准備

如果希望對Elasticsearch中指定索引中的數據的指定字段使用中文分詞器進行分詞,則需要先在Elasticsearch中創建索引並指定分詞器,所以需要先確保Elasticsearch中已經安裝了分詞器插件。

 

說明:本文使用Elasticsearch可視化插件操作Elasticsearch。

 

3.1 安裝IK Analyzer中文分詞器
本文中使用的是IK Analyzer中文分詞器,並且基於Window 10操作系統,具體的安裝過程如下圖所示:

 

1 打開CMD命令窗口並切換到Elasticsearch安裝目錄下的bin目錄中。
2 運行以下命令下載elasticsearch 6.5.4版本對應的IK Analyzer中文分詞器:

elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.5.4/elasticsearch-analysis-ik-6.5.4.zip

  

3 下載完成后提示是否安裝,直接輸入y進行安裝,完整的過程如下圖所示:

 

 

4 安裝完成后,在Elasticsearch的安裝目錄的plugins目錄下會有一個analysis-ik目錄,則表示安裝完成,如下所示:

 

 

5 重啟elasticsearch,並通過elasticsearch-head插件來檢驗IK Analyzer中文分詞器是否已安裝成功,在復合查詢頁面輸入如下圖所示內容,然后提交請求,如果出現如右圖所示的分詞結果就表明IK Analyzer中文分詞器安裝成功:

 

 

3.2 在Elasticsearch中創建索引

本文是要把過濾后符合條件的Customer類型的數據保存到ElasticSearch中,並能夠對Customer中的description字段進行中文分詞,

所以需要在Elasticsearch中創建一個索引,通過elasticsearch-head插件創建索引如下圖所示,提交請求后如果如下圖右邊所示則創建成功:

創建索引index_customer的具體json體如下所示:

{
    "settings": {
        "index": {
            "number_of_shards": "5",
            "number_of_replicas": "1"
        },
        "analysis":{
            "analyzer":{
                "ik":{
                    "tokenizer": "ik_max_word"
                }
            }
        }
    },
    "mappings": {
        "type_customer": {
            "properties": {
                "id": {
                    "type": "long"
                },
                "name": {
                    "type": "text"
                },
                "gender": {
                    "type": "boolean"
                },
                "birth": {
                    "type": "date",
                    "format": "yyyy-MM-dd HH:mm:ss"
                },
                "address": {
                    "properties": {
                        "id": {
                            "type": "integer"
                        },
                        "province": {
                            "type": "keyword"
                        },
                        "city": {
                            "type": "keyword"
                        }
                    }
                },
                "description": {
                    "type": "text",
                    "analyzer": "ik_max_word",
                    "search_analyzer": "ik_max_word"
                }
            }
        }
    }
}

  

創建成功后在概覽頁面可以查看到如下信息:

 

 

4 測試Flink實時處理

啟動Elasticsearch並成功創建索引后,直接運行程序中的FlinkToElasticSearchApp程序,在IDEA的控制台就可以看到如下輸出信息,則表示Flink程序正在運行並進行實時處理:

此時,在Elasticsearch-head插件中可以查看到index_customer索引中的數據如下圖所示,則表示Flink程序實時處理的結果已經正常保存到了Elasticsearch中:

由於本文在創建index_customer索引時,指定了對description字段使用IK Analyzer中文分詞器,所以,在左側的description字段索引框中輸入查詢內容之后,右邊就會快速查詢出description字段中包含了查詢內容的所有的數據.

 

 

Flink寫入數據到ElasticSearch (ElasticSearch詳細使用指南及采坑記錄)

https://blog.csdn.net/lisongjia123/article/details/81121994

 

Flink 寫入數據到 ElasticSearch

https://blog.csdn.net/weixin_44876457/article/details/89398743


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM