Flink Table & SQL 概述
背景
我們在前面的課時中講過 Flink 的分層模型,Flink 自身提供了不同級別的抽象來支持我們開發流式或者批量處理程序,下圖描述了 Flink 支持的 4 種不同級別的抽象。
Table API 和 SQL 處於最頂端,是 Flink 提供的高級 API 操作。Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設計的一套符合標准 SQL 語義的開發語言。
Flink 在編程模型上提供了 DataStream 和 DataSet 兩套 API,並沒有做到事實上的批流統一,因為用戶和開發者還是開發了兩套代碼。正是因為 Flink Table & SQL 的加入,可以說 Flink 在某種程度上做到了事實上的批流一體。
原理
你之前可能都了解過 Hive,在離線計算場景下 Hive 幾乎扛起了離線數據處理的半壁江山。它的底層對 SQL 的解析用到了 Apache Calcite,Flink 同樣把 SQL 的解析、優化和執行教給了 Calcite。
下圖是一張經典的 Flink Table & SQL 實現原理圖,可以看到 Calcite 在整個架構中處於絕對核心地位。
從圖中可以看到無論是批查詢 SQL 還是流式查詢 SQL,都會經過對應的轉換器 Parser 轉換成為節點樹 SQLNode tree,然后生成邏輯執行計划 Logical Plan,邏輯執行計划在經過優化后生成真正可以執行的物理執行計划,交給 DataSet 或者 DataStream 的 API 去執行。
在這里不對 Calcite 的原理過度展開,有興趣的可以直接在官網上學習。
一個完整的 Flink Table & SQL Job 也是由 Source、Transformation、Sink 構成:
Source 部分來源於外部數據源,我們經常用的有 Kafka、MySQL 等;
Transformation 部分則是 Flink Table & SQL 支持的常用 SQL 算子,比如簡單的 Select、Groupby 等,當然在這里也有更為復雜的多流 Join、流與維表的 Join 等;
Sink 部分是指的結果存儲比如 MySQL、HBase 或 Kakfa 等。
動態表
與傳統的表 SQL 查詢相比,Flink Table & SQL 在處理流數據時會時時刻刻處於動態的數據變化中,所以便有了一個動態表的概念。
動態表的查詢與靜態表一樣,但是,在查詢動態表的時候,SQL 會做連續查詢,不會終止。
我們舉個簡單的例子,Flink 程序接受一個 Kafka 流作為輸入,Kafka 中為用戶的購買記錄:
首先,Kafka 的消息會被源源不斷的解析成一張不斷增長的動態表,我們在動態表上執行的 SQL 會不斷生成新的動態表作為結果表。
Flink Table & SQL 算子和內置函數
我們在講解 Flink Table & SQL 所支持的常用算子前,需要說明一點,Flink 自從 0.9 版本開始支持 Table & SQL 功能一直處於完善開發中,且在不斷進行迭代。
我們在官網中也可以看到這樣的提示:
Please note that the Table API and SQL are not yet feature complete and are being actively developed. Not all operations are supported by every combination of [Table API, SQL] and [stream, batch] input.
Flink Table & SQL 的開發一直在進行中,並沒有支持所有場景下的計算邏輯。從我個人實踐角度來講,在使用原生的 Flink Table & SQL 時,務必查詢官網當前版本對 Table & SQL 的支持程度,盡量選擇場景明確,邏輯不是極其復雜的場景。
常用算子
目前 Flink SQL 支持的語法主要如下:
query: values
| { select
| selectWithoutFrom | query UNION [ ALL ] query | query EXCEPT query | query INTERSECT query } [ ORDER BY orderItem [, orderItem ]* ] [ LIMIT { count | ALL } ]
[ OFFSET start { ROW | ROWS } ]
[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY] orderItem: expression [ ASC | DESC ]
select: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ]
[ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ] selectWithoutFrom: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } projectItem: expression [ [ AS ] columnAlias ] | tableAlias . * tableExpression: tableReference [, tableReference ]*
| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ] joinCondition: ON booleanExpression | USING '(' column [, column ]* ')' tableReference: tablePrimary [ matchRecognize ]
[ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ] tablePrimary: [ TABLE ] [ [ catalogName . ] schemaName . ] tableName | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
| UNNEST '(' expression ')'
values: VALUES expression [, expression ]* groupItem: expression | '(' ')'
| '(' expression [, expression ]* ')'
| CUBE '(' expression [, expression ]* ')'
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')' windowRef: windowName | windowSpec windowSpec: [ windowName ]
'('
[ ORDER BY orderItem [, orderItem ]* ] [ PARTITION BY expression [, expression ]* ] [ RANGE numericOrIntervalExpression {PRECEDING} | ROWS numericExpression {PRECEDING} ]
')' ...
可以看到 Flink SQL 和傳統的 SQL 一樣,支持了包含查詢、連接、聚合等場景,另外還支持了包括窗口、排序等場景。下面我就以最常用的算子來做詳細的講解。
SELECT/AS/WHERE
SELECT、WHERE 和傳統 SQL 用法一樣,用於篩選和過濾數據,同時適用於 DataStream 和 DataSet。
SELECT * FROM Table; SELECT name,age FROM Table;
GROUP BY / DISTINCT/HAVING
GROUP BY 用於進行分組操作,DISTINCT 用於結果去重。
HAVING 和傳統 SQL 一樣,可以用來在聚合函數之后進行篩選。
SELECT DISTINCT name FROM Table; SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name; SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name HAVING
SUM(score) > 300;
JOIN
JOIN 可以用於把來自兩個表的數據聯合起來形成結果表,目前 Flink 的 Join 只支持等值連接。Flink 支持的 JOIN 類型包括:
JOIN - INNER JOIN
LEFT JOIN - LEFT OUTER JOIN
RIGHT JOIN - RIGHT OUTER JOIN
FULL JOIN - FULL OUTER JOIN
例如,用用戶表和商品表進行關聯:
SELECT *
FROM User LEFT JOIN Product ON User.name = Product.buyer SELECT *
FROM User RIGHT JOIN Product ON User.name = Product.buyer SELECT *
FROM User FULL OUTER JOIN Product ON User.name = Product.buyer
LEFT JOIN、RIGHT JOIN 、FULL JOIN 相與我們傳統 SQL 中含義一樣。
WINDOW
根據窗口數據划分的不同,目前 Apache Flink 有如下 3 種:
滾動窗口,窗口數據有固定的大小,窗口中的數據不會疊加;
滑動窗口,窗口數據有固定大小,並且有生成間隔;
會話窗口,窗口數據沒有固定的大小,根據用戶傳入的參數進行划分,窗口數據無疊加;
滾動窗口
滾動窗口的特點是:有固定大小、窗口中的數據不會重疊,如下圖所示:
滾動窗口的語法:
SELECT
[gk], [TUMBLE_START(timeCol, size)], [TUMBLE_END(timeCol, size)], agg1(col1), ... aggn(colN) FROM Tab1 GROUP BY [gk], TUMBLE(timeCol, size)
舉例說明,我們需要計算每個用戶每天的訂單數量:
SELECT user, TUMBLE_START(timeLine, INTERVAL '1' DAY) as winStart, SUM(amount) FROM Orders GROUP BY TUMBLE(timeLine, INTERVAL '1' DAY), user;
其中,TUMBLE_START 和 TUMBLE_END 代表窗口的開始時間和窗口的結束時間,TUMBLE (timeLine, INTERVAL '1' DAY) 中的 timeLine 代表時間字段所在的列,INTERVAL '1' DAY 表示時間間隔為一天。
滑動窗口
滑動窗口有固定的大小,與滾動窗口不同的是滑動窗口可以通過 slide 參數控制滑動窗口的創建頻率。需要注意的是,多個滑動窗口可能會發生數據重疊,具體語義如下:
滑動窗口的語法與滾動窗口相比,只多了一個 slide 參數:
SELECT
[gk], [HOP_START(timeCol, slide, size)] , [HOP_END(timeCol, slide, size)], agg1(col1), ... aggN(colN) FROM Tab1 GROUP BY [gk], HOP(timeCol, slide, size)
例如,我們要每間隔一小時計算一次過去 24 小時內每個商品的銷量:
SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product
上述案例中的 INTERVAL '1' HOUR 代表滑動窗口生成的時間間隔。
會話窗口
會話窗口定義了一個非活動時間,假如在指定的時間間隔內沒有出現事件或消息,則會話窗口關閉。
會話窗口的語法如下:
SELECT
[gk], SESSION_START(timeCol, gap) AS winStart, SESSION_END(timeCol, gap) AS winEnd, agg1(col1), ... aggn(colN) FROM Tab1 GROUP BY [gk], SESSION(timeCol, gap)
舉例,我們需要計算每個用戶過去 1 小時內的訂單量:
SELECT user, SESSION_START(rowtime, INTERVAL '1' HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL '1' HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(rowtime, INTERVAL '1' HOUR), user
內置函數
Flink 中還有大量的內置函數,我們可以直接使用,將內置函數分類如下:
比較函數
邏輯函數
算術函數
字符串處理函數
時間函數
Flink Table & SQL 案例
上面分別介紹了 Flink Table & SQL 的原理和支持的算子,我們模擬一個實時的數據流,然后講解 SQL JOIN 的用法。
之前利用 Flink 提供的自定義 Source 功能來實現一個自定義的實時數據源,具體實現如下:
package wyh.tableApi; import org.apache.flink.streaming.api.functions.source.SourceFunction; import wyh.datastreamingApi.Item; import java.util.ArrayList; import java.util.Random; class MyStreamingSourceTable implements SourceFunction<Item> { private boolean isRunning = true; /** * 重寫run方法產生一個源源不斷的數據發送源 * @param ctx * @throws Exception */ @Override public void run(SourceContext<Item> ctx) throws Exception { while(isRunning){ Item item = generateItem(); ctx.collect(item); //每秒產生一條數據 Thread.sleep(1000); } } @Override public void cancel() { isRunning = false; } //隨機產生一條商品數據 private Item generateItem(){ int i = new Random().nextInt(1000); ArrayList<String> list = new ArrayList<>(); list.add("HAT"); list.add("TIE"); list.add("SHOE"); Item item = new Item(); item.setName(list.get(new Random().nextInt(3))); item.setId(i); return item; } }
我們把實時的商品數據流進行分流,分成 even 和 odd 兩個流進行 JOIN,條件是名稱相同,最后,把兩個流的 JOIN 結果輸出。
package wyh.tableApi; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import wyh.datastreamingApi.Item; import java.util.ArrayList; public class StreamingDemoTable { public static void main(String[] args) throws Exception{ //BlinkPlanner 對SQl進行了一些優化 比如去重,取TopN EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //流處理環境 StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); //支持table sql環境 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); SingleOutputStreamOperator<Item> source = bsEnv.addSource(new MyStreamingSourceTable()).map(new MapFunction<Item, Item>() { @Override public Item map(Item item) throws Exception { return item; } }); DataStream<Item> splitAll = source.split(new OutputSelector<Item>() { @Override public Iterable<String> select(Item item) { //將實時商品流分成even和odd兩個流 ArrayList<String> output = new ArrayList<>(); if (item.getId() % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; } }); //將兩個流篩選出來 DataStream<Item> evenSelect = ((SplitStream<Item>) splitAll).select("even"); DataStream<Item> oddSelect = ((SplitStream<Item>) splitAll).select("odd"); //把這兩個流在我們的Flink環境中注冊為臨時表 bsTableEnv.createTemporaryView("evenTable",evenSelect,"name,id"); bsTableEnv.createTemporaryView("oddTable",oddSelect,"name,id"); Table quertTable = bsTableEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name=b.name"); quertTable.printSchema(); bsTableEnv.toRetractStream(quertTable,TypeInformation.of(new TypeHint<Tuple4<Integer,String,Integer,String>>() {})).print(); bsEnv.execute("streaming sql job"); } }
直接右鍵運行,在控制台可以看到輸出: