參考原博客: https://blog.csdn.net/weixin_44516305/article/details/90258883
1 需求分析
使用Flink對實時數據流進行實時處理,並將處理后的結果保存到Elasticsearch中,在Elasticsearch中使用IK Analyzer中文分詞器對指定字段進行分詞。
為了模擬獲取流式數據,自定義一個流式並行數據源,每隔10ms生成一個Customer類型的數據對象並返回給Flink進行處理。
Flink處理后的結果保存在Elasticsearch中的index_customer索引的type_customer類型中,並且對description字段的數據使用IK Analyzer中文分詞器進行分詞。
2 Flink實時處理
2.1 版本說明
- Flink:1.8.0
- Elasticsearch:6.5.4
- JDK:1.8
使用IDEA創建一個名稱為FlinkElasticsearchDemo的Maven工程,目錄結構如下圖所示:

2.3 程序代碼
- 在pom.xml中引入flink以及flink連接elasticsearch相關的依賴,代碼如下所示:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.flink</groupId>
<artifactId>flink-elasticsearch-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2. 創建兩個具有依賴關系的實體類Customer和Address,用於封裝實時數據,代碼如下所示:
package com.flink.domain;
import java.util.Date;
/**
* 客戶實體類
*/
public class Customer {
private Long id;
private String name;
private Boolean gender;
private Date birth;
private Address address;
private String description;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Boolean getGender() {
return gender;
}
public void setGender(Boolean gender) {
this.gender = gender;
}
public Date getBirth() {
return birth;
}
public void setBirth(Date birth) {
this.birth = birth;
}
public Address getAddress() {
return address;
}
public void setAddress(Address address) {
this.address = address;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
}
package com.flink.domain;
/**
* 地址實體類
*/
public class Address {
private Integer id;
private String province;
private String city;
public Address(Integer id, String province, String city) {
this.id = id;
this.province = province;
this.city = city;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
}
3. 自定義一個獲取流式實時數據的Flink數據源,如下所示:
package com.flink.source;
import com.flink.domain.Address;
import com.flink.domain.Customer;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.Date;
import java.util.Random;
/**
* 自定義的流式並行數據源
*/
public class StreamParallelSource implements ParallelSourceFunction<Customer> {
private boolean isRunning = true;
private String[] names = new String[5];
private Address[] addresses = new Address[5];
private Random random = new Random();
private Long id = 1L;
public void init() {
names[0] = "劉備";
names[1] = "關羽";
names[2] = "張飛";
names[3] = "曹操";
names[4] = "諸葛亮";
addresses[0]= new Address(1, "湖北省", "武漢市");
addresses[1]= new Address(2, "湖北省", "黃岡市");
addresses[2]= new Address(3, "廣東省", "廣州市");
addresses[3]= new Address(4, "廣東省", "深圳市");
addresses[4]= new Address(5, "浙江省", "杭州市");
}
/**
* 每隔10ms生成一個Customer數據對象(模擬獲取實時數據)
*/
@Override
public void run(SourceContext sourceContext) throws Exception {
init();
while(isRunning) {
int nameIndex = random.nextInt(5);
int addressIndex = random.nextInt(5);
Customer customer = new Customer();
customer.setId(id++);
customer.setName(names[nameIndex]);
customer.setGender(random.nextBoolean());
customer.setBirth(new Date());
customer.setAddress(addresses[addressIndex]);
customer.setDescription("" + names[nameIndex] + "來自" + addresses[addressIndex].getProvince() + addresses[addressIndex].getCity());
/**
* 把創建的數據返回給Flink進行處理
*/
sourceContext.collect(customer);
Thread.sleep(10);
}
}
@Override
public void cancel() {
this.isRunning = false;
}
}
4. 編寫一個Flink實時處理流式數據的主程序,代碼如下所示:
package com.flink.main;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.flink.domain.Customer;
import com.flink.source.StreamParallelSource;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.List;
/**
* Flink實時處理並將結果寫入到ElasticSearch主程序
*/
public class FlinkToElasticSearchApp {
public static void main(String[] args) throws Exception {
/**
* 獲取流處理環境並設置並行度
*/
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
/**
* 指定數據源為自定義的流式並行數據源
*/
DataStream<Customer> source = env.addSource(new StreamParallelSource());
/**
* 對數據進行過濾
*/
DataStream<Customer> filterSource = source.filter(new FilterFunction<Customer>() {
@Override
public boolean filter(Customer customer) throws Exception {
if (customer.getName().equals("曹操") && customer.getAddress().getProvince().equals("湖北省")) {
return false;
}
return true;
}
});
/**
* 對過濾后的數據進行轉換
*/
DataStream<JSONObject> transSource = filterSource.map(new MapFunction<Customer, JSONObject>() {
@Override
public JSONObject map(Customer customer) throws Exception {
String jsonString = JSONObject.toJSONString(customer, SerializerFeature.WriteDateUseDateFormat);
System.out.println("當前正在處理:" + jsonString);
JSONObject jsonObject = JSONObject.parseObject(jsonString);
return jsonObject;
}
});
/**
* 創建一個ElasticSearchSink對象
*/
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("localhost", 9200, "http"));
ElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<JSONObject>(
httpHosts,
new ElasticsearchSinkFunction<JSONObject>() {
@Override
public void process(JSONObject customer, RuntimeContext ctx, RequestIndexer indexer) {
// 數據保存在Elasticsearch中名稱為index_customer的索引中,保存的類型名稱為type_customer
indexer.add(Requests.indexRequest().index("index_customer").type("type_customer").id(String.valueOf(customer.getLong("id"))).source(customer));
}
}
);
// 設置批量寫數據的緩沖區大小
esSinkBuilder.setBulkFlushMaxActions(50);
/**
* 把轉換后的數據寫入到ElasticSearch中
*/
transSource.addSink(esSinkBuilder.build());
/**
* 執行
*/
env.execute("execute FlinkElasticsearchDemo");
}
}
至此,使用Flink對流式數據進行實時處理並將處理結果保存到Elasticsearch中的程序已經全部完成。
說明:Flink把數據保存到Elasticsearch時,如果Elasticsearch中沒有提前創建對應名稱的索引,則會自動創建對應名稱的索引。
如果不需要在Elasticsearch中對指定字段使用IK Analyzer中文分詞器進行分詞,則不需要閱讀第3節內容,直接閱讀第4節即可。
3 Elasticsearch准備
如果希望對Elasticsearch中指定索引中的數據的指定字段使用中文分詞器進行分詞,則需要先在Elasticsearch中創建索引並指定分詞器,所以需要先確保Elasticsearch中已經安裝了分詞器插件。
說明:本文使用Elasticsearch可視化插件操作Elasticsearch。
3.1 安裝IK Analyzer中文分詞器
本文中使用的是IK Analyzer中文分詞器,並且基於Window 10操作系統,具體的安裝過程如下圖所示:
1 打開CMD命令窗口並切換到Elasticsearch安裝目錄下的bin目錄中。
2 運行以下命令下載elasticsearch 6.5.4版本對應的IK Analyzer中文分詞器:
elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.5.4/elasticsearch-analysis-ik-6.5.4.zip
3 下載完成后提示是否安裝,直接輸入y進行安裝,完整的過程如下圖所示:

4 安裝完成后,在Elasticsearch的安裝目錄的plugins目錄下會有一個analysis-ik目錄,則表示安裝完成,如下所示:

5 重啟elasticsearch,並通過elasticsearch-head插件來檢驗IK Analyzer中文分詞器是否已安裝成功,在復合查詢頁面輸入如下圖所示內容,然后提交請求,如果出現如右圖所示的分詞結果就表明IK Analyzer中文分詞器安裝成功:

3.2 在Elasticsearch中創建索引
本文是要把過濾后符合條件的Customer類型的數據保存到ElasticSearch中,並能夠對Customer中的description字段進行中文分詞,
所以需要在Elasticsearch中創建一個索引,通過elasticsearch-head插件創建索引如下圖所示,提交請求后如果如下圖右邊所示則創建成功:
創建索引index_customer的具體json體如下所示:
{
"settings": {
"index": {
"number_of_shards": "5",
"number_of_replicas": "1"
},
"analysis":{
"analyzer":{
"ik":{
"tokenizer": "ik_max_word"
}
}
}
},
"mappings": {
"type_customer": {
"properties": {
"id": {
"type": "long"
},
"name": {
"type": "text"
},
"gender": {
"type": "boolean"
},
"birth": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"address": {
"properties": {
"id": {
"type": "integer"
},
"province": {
"type": "keyword"
},
"city": {
"type": "keyword"
}
}
},
"description": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word"
}
}
}
}
}
創建成功后在概覽頁面可以查看到如下信息:

4 測試Flink實時處理
啟動Elasticsearch並成功創建索引后,直接運行程序中的FlinkToElasticSearchApp程序,在IDEA的控制台就可以看到如下輸出信息,則表示Flink程序正在運行並進行實時處理:
此時,在Elasticsearch-head插件中可以查看到index_customer索引中的數據如下圖所示,則表示Flink程序實時處理的結果已經正常保存到了Elasticsearch中:
由於本文在創建index_customer索引時,指定了對description字段使用IK Analyzer中文分詞器,所以,在左側的description字段索引框中輸入查詢內容之后,右邊就會快速查詢出description字段中包含了查詢內容的所有的數據.
Flink寫入數據到ElasticSearch (ElasticSearch詳細使用指南及采坑記錄)
https://blog.csdn.net/lisongjia123/article/details/81121994
Flink 寫入數據到 ElasticSearch
https://blog.csdn.net/weixin_44876457/article/details/89398743
