Flink學習(四) Flink Table & SQL 實現wordcount Java版本


Flink Table & SQL WordCount
Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設計的一套符合標准 SQL 語義的開發語言。

一個完整的 Flink SQL 編寫的程序包括如下三部分。

Source Operator:是對外部數據源的抽象, 目前 Apache Flink 內置了很多常用的數據源實現,比如 MySQL、Kafka 等。
Transformation Operators:算子操作主要完成比如查詢、聚合操作等,目前 Flink SQL 支持了 Union、Join、Projection、Difference、Intersection 及 window 等大多數傳統數據庫支持的操作。
Sink Operator:是對外結果表的抽象,目前 Apache Flink 也內置了很多常用的結果表的抽象,比如 Kafka Sink 等。
我們也是通過用一個最經典的 WordCount 程序作為入門,上面已經通過 DataSet/DataStream API 開發,那么實現同樣的 WordCount 功能, Flink Table & SQL 核心只需要一行代碼:

//省略掉初始化環境等公共代碼
SELECT word, COUNT(word) FROM table GROUP BY word;

首先,整個工程中我們 pom 中的依賴如下圖所示:

<dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-java</artifactId>
         <version>1.10.0</version>
     </dependency>
<dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-streaming-java_2.11
         <version>1.10.0</version>
</dependency>
<dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-table-api-java-bridge_2.11</artifactId>
         <version>1.10.0</version>
</dependency>
<dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-table-planner-blink_2.11</artifactId>
         <version>1.10.0</version>
</dependency>
<dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-table-planner_2.11</artifactId>
         <version>1.10.0</version>
</dependency>
     <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
         <version>1.10.0</version>
</dependency>

 

第一步,創建上下文環境:

ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

 

第二步,讀取一行模擬數據作為輸入:

String words = "hello flink hello lagou";
String[] split = words.split("\\W+");
ArrayList<WC> list = new ArrayList<>();

for(String word : split){
    WC wc = new WC(word,1);
    list.add(wc);
}
DataSet<WC> input = fbEnv.fromCollection(list);

 

第三步,注冊成表,執行 SQL,然后輸出:

//DataSet 轉sql, 指定字段名
Table table = fbTableEnv.fromDataSet(input, "word,frequency");
table.printSchema();

//注冊為一個表
fbTableEnv.createTemporaryView("WordCount", table);

Table table02 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount GROUP BY word");

//將表轉換DataSet
DataSet<WC> ds3  = fbTableEnv.toDataSet(table02, WC.class);
ds3.printToErr();

 

整體代碼結構如下:

 
         
package wyh.tableApi;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;

import java.util.ArrayList;

public class WCTableApi {
public static void main(String[] args) {
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();

BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

String words="hello flink hello shujia";
String[] split = words.split("\\W+");

ArrayList<WC> list = new ArrayList<>();

for (String word : split) {
WC wc = new WC(word, 1L);
list.add(wc);
}


// DataSet<WC> input = fbEnv.fromCollection(list);
DataSource<WC> input = fbEnv.fromCollection(list);

Table table = fbTableEnv.fromDataSet(input, "word,frequency");
table.printSchema();

fbTableEnv.createTemporaryView("wordcount",table);

Table table1 = fbTableEnv.sqlQuery("select word,sum(frequency) as frequency from wordcount group by word");

DataSet<WC> ds3 = fbTableEnv.toDataSet(table1, WC.class);

try {
ds3.printToErr();
} catch (Exception e) {
e.printStackTrace();
}


}

public static class WC{
public String word;
public Long frequency;

public WC() {
}

public WC(String word, Long frequency) {
this.word = word;
this.frequency = frequency;
}

@Override
public String toString() {
return "WC{" +
"word='" + word + '\'' +
", frequency=" + frequency +
'}';
}
}
}
 

 

我們直接運行該程序,在控制台可以看到輸出結果:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM