參考原博客: https://blog.csdn.net/weixin_44516305/article/details/90258883
1 需求分析
使用Flink對實時數據流進行實時處理,並將處理后的結果保存到Elasticsearch中,在Elasticsearch中使用IK Analyzer中文分詞器對指定字段進行分詞。
為了模擬獲取流式數據,自定義一個流式並行數據源,每隔10ms生成一個Customer類型的數據對象並返回給Flink進行處理。
Flink處理后的結果保存在Elasticsearch中的index_customer索引的type_customer類型中,並且對description字段的數據使用IK Analyzer中文分詞器進行分詞。
2 Flink實時處理
2.1 版本說明
- Flink:1.8.0
- Elasticsearch:6.5.4
- JDK:1.8
使用IDEA創建一個名稱為FlinkElasticsearchDemo的Maven工程,目錄結構如下圖所示:
2.3 程序代碼
- 在pom.xml中引入flink以及flink連接elasticsearch相關的依賴,代碼如下所示:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.flink</groupId> <artifactId>flink-elasticsearch-demo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.56</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2. 創建兩個具有依賴關系的實體類Customer和Address,用於封裝實時數據,代碼如下所示:
package com.flink.domain; import java.util.Date; /** * 客戶實體類 */ public class Customer { private Long id; private String name; private Boolean gender; private Date birth; private Address address; private String description; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Boolean getGender() { return gender; } public void setGender(Boolean gender) { this.gender = gender; } public Date getBirth() { return birth; } public void setBirth(Date birth) { this.birth = birth; } public Address getAddress() { return address; } public void setAddress(Address address) { this.address = address; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } }
package com.flink.domain; /** * 地址實體類 */ public class Address { private Integer id; private String province; private String city; public Address(Integer id, String province, String city) { this.id = id; this.province = province; this.city = city; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getProvince() { return province; } public void setProvince(String province) { this.province = province; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } }
3. 自定義一個獲取流式實時數據的Flink數據源,如下所示:
package com.flink.source; import com.flink.domain.Address; import com.flink.domain.Customer; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import java.util.Date; import java.util.Random; /** * 自定義的流式並行數據源 */ public class StreamParallelSource implements ParallelSourceFunction<Customer> { private boolean isRunning = true; private String[] names = new String[5]; private Address[] addresses = new Address[5]; private Random random = new Random(); private Long id = 1L; public void init() { names[0] = "劉備"; names[1] = "關羽"; names[2] = "張飛"; names[3] = "曹操"; names[4] = "諸葛亮"; addresses[0]= new Address(1, "湖北省", "武漢市"); addresses[1]= new Address(2, "湖北省", "黃岡市"); addresses[2]= new Address(3, "廣東省", "廣州市"); addresses[3]= new Address(4, "廣東省", "深圳市"); addresses[4]= new Address(5, "浙江省", "杭州市"); } /** * 每隔10ms生成一個Customer數據對象(模擬獲取實時數據) */ @Override public void run(SourceContext sourceContext) throws Exception { init(); while(isRunning) { int nameIndex = random.nextInt(5); int addressIndex = random.nextInt(5); Customer customer = new Customer(); customer.setId(id++); customer.setName(names[nameIndex]); customer.setGender(random.nextBoolean()); customer.setBirth(new Date()); customer.setAddress(addresses[addressIndex]); customer.setDescription("" + names[nameIndex] + "來自" + addresses[addressIndex].getProvince() + addresses[addressIndex].getCity()); /** * 把創建的數據返回給Flink進行處理 */ sourceContext.collect(customer); Thread.sleep(10); } } @Override public void cancel() { this.isRunning = false; } }
4. 編寫一個Flink實時處理流式數據的主程序,代碼如下所示:
package com.flink.main; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; import com.flink.domain.Customer; import com.flink.source.StreamParallelSource; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; import org.apache.http.HttpHost; import org.elasticsearch.client.Requests; import java.util.ArrayList; import java.util.List; /** * Flink實時處理並將結果寫入到ElasticSearch主程序 */ public class FlinkToElasticSearchApp { public static void main(String[] args) throws Exception { /** * 獲取流處理環境並設置並行度 */ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); /** * 指定數據源為自定義的流式並行數據源 */ DataStream<Customer> source = env.addSource(new StreamParallelSource()); /** * 對數據進行過濾 */ DataStream<Customer> filterSource = source.filter(new FilterFunction<Customer>() { @Override public boolean filter(Customer customer) throws Exception { if (customer.getName().equals("曹操") && customer.getAddress().getProvince().equals("湖北省")) { return false; } return true; } }); /** * 對過濾后的數據進行轉換 */ DataStream<JSONObject> transSource = filterSource.map(new MapFunction<Customer, JSONObject>() { @Override public JSONObject map(Customer customer) throws Exception { String jsonString = JSONObject.toJSONString(customer, SerializerFeature.WriteDateUseDateFormat); System.out.println("當前正在處理:" + jsonString); JSONObject jsonObject = JSONObject.parseObject(jsonString); return jsonObject; } }); /** * 創建一個ElasticSearchSink對象 */ List<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("localhost", 9200, "http")); ElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<JSONObject>( httpHosts, new ElasticsearchSinkFunction<JSONObject>() { @Override public void process(JSONObject customer, RuntimeContext ctx, RequestIndexer indexer) { // 數據保存在Elasticsearch中名稱為index_customer的索引中,保存的類型名稱為type_customer indexer.add(Requests.indexRequest().index("index_customer").type("type_customer").id(String.valueOf(customer.getLong("id"))).source(customer)); } } ); // 設置批量寫數據的緩沖區大小 esSinkBuilder.setBulkFlushMaxActions(50); /** * 把轉換后的數據寫入到ElasticSearch中 */ transSource.addSink(esSinkBuilder.build()); /** * 執行 */ env.execute("execute FlinkElasticsearchDemo"); } }
至此,使用Flink對流式數據進行實時處理並將處理結果保存到Elasticsearch中的程序已經全部完成。
說明:Flink把數據保存到Elasticsearch時,如果Elasticsearch中沒有提前創建對應名稱的索引,則會自動創建對應名稱的索引。
如果不需要在Elasticsearch中對指定字段使用IK Analyzer中文分詞器進行分詞,則不需要閱讀第3節內容,直接閱讀第4節即可。
3 Elasticsearch准備
如果希望對Elasticsearch中指定索引中的數據的指定字段使用中文分詞器進行分詞,則需要先在Elasticsearch中創建索引並指定分詞器,所以需要先確保Elasticsearch中已經安裝了分詞器插件。
說明:本文使用Elasticsearch可視化插件操作Elasticsearch。
3.1 安裝IK Analyzer中文分詞器
本文中使用的是IK Analyzer中文分詞器,並且基於Window 10操作系統,具體的安裝過程如下圖所示:
1 打開CMD命令窗口並切換到Elasticsearch安裝目錄下的bin目錄中。
2 運行以下命令下載elasticsearch 6.5.4版本對應的IK Analyzer中文分詞器:
elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.5.4/elasticsearch-analysis-ik-6.5.4.zip
3 下載完成后提示是否安裝,直接輸入y進行安裝,完整的過程如下圖所示:
4 安裝完成后,在Elasticsearch的安裝目錄的plugins目錄下會有一個analysis-ik目錄,則表示安裝完成,如下所示:
5 重啟elasticsearch,並通過elasticsearch-head插件來檢驗IK Analyzer中文分詞器是否已安裝成功,在復合查詢頁面輸入如下圖所示內容,然后提交請求,如果出現如右圖所示的分詞結果就表明IK Analyzer中文分詞器安裝成功:
3.2 在Elasticsearch中創建索引
本文是要把過濾后符合條件的Customer類型的數據保存到ElasticSearch中,並能夠對Customer中的description字段進行中文分詞,
所以需要在Elasticsearch中創建一個索引,通過elasticsearch-head插件創建索引如下圖所示,提交請求后如果如下圖右邊所示則創建成功:
創建索引index_customer的具體json體如下所示:
{ "settings": { "index": { "number_of_shards": "5", "number_of_replicas": "1" }, "analysis":{ "analyzer":{ "ik":{ "tokenizer": "ik_max_word" } } } }, "mappings": { "type_customer": { "properties": { "id": { "type": "long" }, "name": { "type": "text" }, "gender": { "type": "boolean" }, "birth": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss" }, "address": { "properties": { "id": { "type": "integer" }, "province": { "type": "keyword" }, "city": { "type": "keyword" } } }, "description": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" } } } } }
創建成功后在概覽頁面可以查看到如下信息:
4 測試Flink實時處理
啟動Elasticsearch並成功創建索引后,直接運行程序中的FlinkToElasticSearchApp程序,在IDEA的控制台就可以看到如下輸出信息,則表示Flink程序正在運行並進行實時處理:
此時,在Elasticsearch-head插件中可以查看到index_customer索引中的數據如下圖所示,則表示Flink程序實時處理的結果已經正常保存到了Elasticsearch中:
由於本文在創建index_customer索引時,指定了對description字段使用IK Analyzer中文分詞器,所以,在左側的description字段索引框中輸入查詢內容之后,右邊就會快速查詢出description字段中包含了查詢內容的所有的數據.
Flink寫入數據到ElasticSearch (ElasticSearch詳細使用指南及采坑記錄)
https://blog.csdn.net/lisongjia123/article/details/81121994
Flink 寫入數據到 ElasticSearch
https://blog.csdn.net/weixin_44876457/article/details/89398743