Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設計的一套符合標准 SQL 語義的開發語言。
自 2015 年開始,阿里巴巴開始調研開源流計算引擎,最終決定基於 Flink 打造新一代計算引擎,針對 Flink 存在的不足進行優化和改進,並且在 2019 年初將最終代碼開源,也就是我們熟知的 Blink。Blink 在原來的 Flink 基礎上最顯著的一個貢獻就是 Flink SQL 的實現。
Flink SQL 是面向用戶的 API 層,在我們傳統的流式計算領域,比如 Storm、Spark Streaming 都會提供一些 Function 或者 Datastream API,用戶通過 Java 或 Scala 寫業務邏輯,這種方式雖然靈活,但有一些不足,比如具備一定門檻且調優較難,隨着版本的不斷更新,API 也出現了很多不兼容的地方。
在這個背景下,毫無疑問,SQL 就成了我們最佳選擇,之所以選擇將 SQL 作為核心 API,是因為其具有幾個非常重要的特點:
-
SQL 屬於設定式語言,用戶只要表達清楚需求即可,不需要了解具體做法;
-
SQL 可優化,內置多種查詢優化器,這些查詢優化器可為 SQL 翻譯出最優執行計划;
-
SQL 易於理解,不同行業和領域的人都懂,學習成本較低;
-
SQL 非常穩定,在數據庫 30 多年的歷史中,SQL 本身變化較少;
-
流與批的統一,Flink 底層 Runtime 本身就是一個流與批統一的引擎,而 SQL 可以做到 API 層的流與批統一。
1. Flink SQL 常用算子
SELECT:
SELECT 用於從 DataSet/DataStream 中選擇數據,用於篩選出某些列。
示例:
SELECT * FROM Table;
// 取出表中的所有列
SELECT name,age FROM Table;
// 取出表中 name 和 age 兩列
與此同時 SELECT 語句中可以使用函數和別名,例如我們上面提到的 WordCount 中:
SELECT word, COUNT(word) FROM table GROUP BY word;
WHERE:
WHERE 用於從數據集/流中過濾數據,與 SELECT 一起使用,用於根據某些條件對關系做水平分割,即選擇符合條件的記錄。
示例:
SELECT name,age FROM Table where name LIKE ‘% 小明 %’;
SELECT * FROM Table WHERE age = 20;
WHERE 是從原數據中進行過濾,那么在 WHERE 條件中,Flink SQL 同樣支持 =、<、>、<>、>=、<=
,以及 AND、OR
等表達式的組合,最終滿足過濾條件的數據會被選擇出來。並且 WHERE 可以結合 IN、NOT IN 聯合使用。舉個例子:
SELECT name, age
FROM Table
WHERE name IN (SELECT name FROM Table2)
DISTINCT:
DISTINCT 用於從數據集/流中去重根據 SELECT 的結果進行去重。
示例:
SELECT DISTINCT name FROM Table;
對於流式查詢,計算查詢結果所需的 State 可能會無限增長,用戶需要自己控制查詢的狀態范圍,以防止狀態過大。
GROUP BY:
GROUP BY 是對數據進行分組操作。例如我們需要計算成績明細表中,每個學生的總分。
示例:
SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name;
UNION 和 UNION ALL:
UNION 用於將兩個結果集合並起來,要求兩個結果集字段完全一致,包括字段類型、字段順序。不同於 UNION ALL 的是,UNION 會對結果數據去重。
示例:
SELECT * FROM T1 UNION (ALL) SELECT * FROM T2;
JOIN:
JOIN 用於把來自兩個表的數據聯合起來形成結果表,Flink 支持的 JOIN 類型包括:
JOIN - INNER JOIN
LEFT JOIN - LEFT OUTER JOIN
RIGHT JOIN - RIGHT OUTER JOIN
FULL JOIN - FULL OUTER JOIN
這里的 JOIN 的語義和我們在關系型數據庫中使用的 JOIN 語義一致。
示例:
JOIN(將訂單表數據和商品表進行關聯)
SELECT * FROM Orders INNER JOIN Product ON Orders.productId = Product.id
LEFT JOIN 與 JOIN 的區別是當右表沒有與左邊相 JOIN 的數據時候,右邊對應的字段補 NULL 輸出,RIGHT JOIN 相當於 LEFT JOIN 左右兩個表交互一下位置。FULL JOIN 相當於 RIGHT JOIN 和 LEFT JOIN 之后進行 UNION ALL 操作。
示例:
SELECT * FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
SELECT * FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id
SELECT * FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id
Group Window:
根據窗口數據划分的不同,目前 Apache Flink 有如下 3 種 Bounded Window:
Tumble,滾動窗口,窗口數據有固定的大小,窗口數據無疊加;
Hop,滑動窗口,窗口數據有固定大小,並且有固定的窗口重建頻率,窗口數據有疊加;
Session,會話窗口,窗口數據沒有固定的大小,根據窗口數據活躍程度划分窗口,窗口數據無疊加。
Tumble Window:
Tumble 滾動窗口有固定大小,窗口數據不重疊,具體語義如下:
Tumble 滾動窗口對應的語法如下:
SELECT
[gk],
[TUMBLE_START(timeCol, size)],
[TUMBLE_END(timeCol, size)],
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)
其中:
[gk] 決定了是否需要按照字段進行聚合;
TUMBLE_START 代表窗口開始時間;
TUMBLE_END 代表窗口結束時間;
timeCol 是流表中表示時間字段;
size 表示窗口的大小,如 秒、分鍾、小時、天。
舉個例子,假如我們要計算每個人每天的訂單量,按照 user 進行聚合分組:
SELECT user,
TUMBLE_START(rowtime, INTERVAL ‘1’ DAY) as wStart,
SUM(amount)
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL ‘1’ DAY), user;
Hop Window:
Hop 滑動窗口和滾動窗口類似,窗口有固定的 size,與滾動窗口不同的是滑動窗口可以通過 slide 參數控制滑動窗口的新建頻率。因此當 slide 值小於窗口 size 的值的時候多個滑動窗口會重疊,具體語義如下:
Hop 滑動窗口對應語法如下:
SELECT
[gk],
[HOP_START(timeCol, slide, size)] ,
[HOP_END(timeCol, slide, size)],
agg1(col1),
...
aggN(colN)
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)
每次字段的意思和 Tumble 窗口類似:
[gk] 決定了是否需要按照字段進行聚合;
HOP_START 表示窗口開始時間;
HOP_END 表示窗口結束時間;
timeCol 表示流表中表示時間字段;
slide 表示每次窗口滑動的大小;
size 表示整個窗口的大小,如 秒、分鍾、小時、天。
舉例說明,我們要每過一小時計算一次過去 24 小時內每個商品的銷量:
SELECT product,
SUM(amount)
FROM Orders
GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product
Session Window:
會話時間窗口沒有固定的持續時間,但它們的界限由 interval 不活動時間定義,即如果在定義的間隙期間沒有出現事件,則會話窗口關閉。
Seeeion 會話窗口對應語法如下:
SELECT
[gk],
SESSION_START(timeCol, gap) AS winStart,
SESSION_END(timeCol, gap) AS winEnd,
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)
[gk] 決定了是否需要按照字段進行聚合;
SESSION_START 表示窗口開始時間;
SESSION_END 表示窗口結束時間;
timeCol 表示流表中表示時間字段;
gap 表示窗口數據非活躍周期的時長。
例如,我們需要計算每個用戶訪問時間 12 小時內的訂單量:
SELECT user,
SESSION_START(rowtime, INTERVAL ‘12’ HOUR) AS sStart,
SESSION_ROWTIME(rowtime, INTERVAL ‘12’ HOUR) AS sEnd,
SUM(amount)
FROM Orders
GROUP BY SESSION(rowtime, INTERVAL ‘12’ HOUR), user
Table API 和 SQL 捆綁在 flink-table Maven 工件中。必須將以下依賴項添加到你的項目才能使用 Table API 和 SQL:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
另外,你需要為 Flink 的 Scala 批處理或流式 API 添加依賴項。對於批量查詢,您需要添加:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
2. Flink SQL 實戰案例
1) 批數據 SQL
用法:
- 構建 Table 運行環境
- 將 DataSet 注冊為一張表
- 使用 Table 運行環境的 sqlQuery 方法來執行 SQL 語句
示例:使用 Flink SQL 統計用戶消費訂單的總金額、最大金額、最小金額、訂單總數。
訂單 id | 用戶名 | 訂單日期 | 消費金額 |
---|---|---|---|
1 | Zhangsan | 2018-10-20 15:30 | 358.5 |
測試數據(訂單 ID、用戶名、訂單日期、訂單金額):
Order(1, "zhangsan", "2018-10-20 15:30", 358.5),
Order(2, "zhangsan", "2018-10-20 16:30", 131.5),
Order(3, "lisi", "2018-10-20 16:30", 127.5),
Order(4, "lisi", "2018-10-20 16:30", 328.5),
Order(5, "lisi", "2018-10-20 16:30", 432.5),
Order(6, "zhaoliu", "2018-10-20 22:30", 451.0),
Order(7, "zhaoliu", "2018-10-20 22:30", 362.0),
Order(8, "zhaoliu", "2018-10-20 22:30", 364.0),
Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)
步驟:
- 獲取一個批處理運行環境
- 獲取一個 Table 運行環境
- 創建一個樣例類 Order 用來映射數據(訂單名、用戶名、訂單日期、訂單金額)
- 基於本地 Order 集合創建一個 DataSet source
- 使用 Table 運行環境將 DataSet 注冊為一張表
- 使用 SQL 語句來操作數據(統計用戶消費訂單的總金額、最大金額、最小金額、訂單總數)
- 使用 TableEnv.toDataSet 將 Table 轉換為 DataSet
- 打印測試
示例代碼:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.types.Row
/**
* 使用Flink SQL統計用戶消費訂單的總金額、最大金額、最小金額、訂單總數。
*/
object BatchFlinkSqlDemo {
//3. 創建一個樣例類 Order 用來映射數據(訂單名、用戶名、訂單日期、訂單金額)
case class Order(id:Int, userName:String, createTime:String, money:Double)
def main(args: Array[String]): Unit = {
/**
* 實現思路:
* 1. 獲取一個批處理運行環境
* 2. 獲取一個Table運行環境
* 3. 創建一個樣例類 Order 用來映射數據(訂單名、用戶名、訂單日期、訂單金額)
* 4. 基於本地 Order 集合創建一個DataSet source
* 5. 使用Table運行環境將DataSet注冊為一張表
* 6. 使用SQL語句來操作數據(統計用戶消費訂單的總金額、最大金額、最小金額、訂單總數)
* 7. 使用TableEnv.toDataSet將Table轉換為DataSet
* 8. 打印測試
*/
//1. 獲取一個批處理運行環境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//2. 獲取一個Table運行環境
val tabEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
//4. 基於本地 Order 集合創建一個DataSet source
val orderDataSet: DataSet[Order] = env.fromElements(
Order(1, "zhangsan", "2018-10-20 15:30", 358.5),
Order(2, "zhangsan", "2018-10-20 16:30", 131.5),
Order(3, "lisi", "2018-10-20 16:30", 127.5),
Order(4, "lisi", "2018-10-20 16:30", 328.5),
Order(5, "lisi", "2018-10-20 16:30", 432.5),
Order(6, "zhaoliu", "2018-10-20 22:30", 451.0),
Order(7, "zhaoliu", "2018-10-20 22:30", 362.0),
Order(8, "zhaoliu", "2018-10-20 22:30", 364.0),
Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)
)
//5. 使用Table運行環境將DataSet注冊為一張表
tabEnv.registerDataSet("t_order", orderDataSet)
//6. 使用SQL語句來操作數據(統計用戶消費訂單的總金額、最大金額、最小金額、訂單總數)
//用戶消費訂單的總金額、最大金額、最小金額、訂單總數。
val sql =
"""
| select
| userName,
| sum(money) totalMoney,
| max(money) maxMoney,
| min(money) minMoney,
| count(1) totalCount
| from t_order
| group by userName
|""".stripMargin //在scala中stripMargin默認是“|”作為多行連接符
//7. 使用TableEnv.toDataSet將Table轉換為DataSet
val table: Table = tabEnv.sqlQuery(sql)
table.printSchema()
tabEnv.toDataSet[Row](table).print()
}
}
2) 流數據 SQL
流處理中也可以支持 SQL。但是需要注意以下幾點:
- 要使用流處理的 SQL,必須要添加水印時間
- 使用 registerDataStream 注冊表的時候,使用 ' 來指定字段
- 注冊表的時候,必須要指定一個 rowtime,否則無法在 SQL 中使用窗口
- 必須要導入 import org.apache.flink.table.api.scala._ 隱式參數
- SQL 中使用 trumble(時間列名, interval '時間' sencond) 來進行定義窗口
示例:使用 Flink SQL 來統計 5 秒內 用戶的 訂單總數、訂單的最大金額、訂單的最小金額。
步驟
- 獲取流處理運行環境
- 獲取 Table 運行環境
- 設置處理時間為 EventTime
- 創建一個訂單樣例類 Order ,包含四個字段(訂單 ID、用戶 ID、訂單金額、時間戳)
- 創建一個自定義數據源
- 使用 for 循環生成 1000 個訂單
- 隨機生成訂單 ID(UUID)
- 隨機生成用戶 ID(0-2)
- 隨機生成訂單金額(0-100)
- 時間戳為當前系統時間
- 每隔 1 秒生成一個訂單
- 添加水印,允許延遲 2 秒
- 導入 import org.apache.flink.table.api.scala._ 隱式參數
- 使用 registerDataStream 注冊表,並分別指定字段,還要指定 rowtime 字段
- 編寫 SQL 語句統計用戶訂單總數、最大金額、最小金額
分組時要使用 tumble(時間列, interval '窗口時間' second) 來創建窗口 - 使用 tableEnv.sqlQuery 執行 sql 語句
- 將 SQL 的執行結果轉換成 DataStream 再打印出來
- 啟動流處理程序
示例代碼:
import java.util.UUID
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.types.Row
import scala.util.Random
/**
* 需求:
* 使用Flink SQL來統計5秒內 用戶的 訂單總數、訂單的最大金額、訂單的最小金額
*
* timestamp是關鍵字不能作為字段的名字(關鍵字不能作為字段名字)
*/
object StreamFlinkSqlDemo {
/**
* 1. 獲取流處理運行環境
* 2. 獲取Table運行環境
* 3. 設置處理時間為 EventTime
* 4. 創建一個訂單樣例類 Order ,包含四個字段(訂單ID、用戶ID、訂單金額、時間戳)
* 5. 創建一個自定義數據源
* 使用for循環生成1000個訂單
* 隨機生成訂單ID(UUID)
* 隨機生成用戶ID(0-2)
* 隨機生成訂單金額(0-100)
* 時間戳為當前系統時間
* 每隔1秒生成一個訂單
* 6. 添加水印,允許延遲2秒
* 7. 導入 import org.apache.flink.table.api.scala._ 隱式參數
* 8. 使用 registerDataStream 注冊表,並分別指定字段,還要指定rowtime字段
* 9. 編寫SQL語句統計用戶訂單總數、最大金額、最小金額
* 分組時要使用 tumble(時間列, interval '窗口時間' second) 來創建窗口
* 10. 使用 tableEnv.sqlQuery 執行sql語句
* 11. 將SQL的執行結果轉換成DataStream再打印出來
* 12. 啟動流處理程序
*/
// 3. 創建一個訂單樣例類`Order`,包含四個字段(訂單ID、用戶ID、訂單金額、時間戳)
case class Order(orderId:String, userId:Int, money:Long, createTime:Long)
def main(args: Array[String]): Unit = {
// 1. 創建流處理運行環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 2. 設置處理時間為`EventTime`
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//獲取table的運行環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 4. 創建一個自定義數據源
val orderDataStream = env.addSource(new RichSourceFunction[Order] {
var isRunning = true
override def run(ctx: SourceFunction.SourceContext[Order]): Unit = {
// - 隨機生成訂單ID(UUID)
// - 隨機生成用戶ID(0-2)
// - 隨機生成訂單金額(0-100)
// - 時間戳為當前系統時間
// - 每隔1秒生成一個訂單
for (i <- 0 until 1000 if isRunning) {
val order = Order(UUID.randomUUID().toString, Random.nextInt(3), Random.nextInt(101),
System.currentTimeMillis())
TimeUnit.SECONDS.sleep(1)
ctx.collect(order)
}
}
override def cancel(): Unit = { isRunning = false }
})
// 5. 添加水印,允許延遲2秒
val watermarkDataStream = orderDataStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[Order](Time.seconds(2)) {
override def extractTimestamp(element: Order): Long = {
val eventTime = element.createTime
eventTime
}
}
)
// 6. 導入`import org.apache.flink.table.api.scala._`隱式參數
// 7. 使用`registerDataStream`注冊表,並分別指定字段,還要指定rowtime字段
import org.apache.flink.table.api.scala._
tableEnv.registerDataStream("t_order", watermarkDataStream, 'orderId, 'userId, 'money,'createTime.rowtime)
// 8. 編寫SQL語句統計用戶訂單總數、最大金額、最小金額
// - 分組時要使用`tumble(時間列, interval '窗口時間' second)`來創建窗口
val sql =
"""
|select
| userId,
| count(1) as totalCount,
| max(money) as maxMoney,
| min(money) as minMoney
| from
| t_order
| group by
| tumble(createTime, interval '5' second),
| userId
""".stripMargin
// 9. 使用`tableEnv.sqlQuery`執行sql語句
val table: Table = tableEnv.sqlQuery(sql)
// 10. 將SQL的執行結果轉換成DataStream再打印出來
table.toRetractStream[Row].print()
env.execute("StreamSQLApp")
}
}