5 OrderBy & Limit
操作符 | 描述 |
---|---|
Order By 批處理 流處理 |
注意: 流處理結果需主要根據 時間屬性 按照升序進行排序。支持使用其他排序屬性。 |
Limit 批處理 |
注意: LIMIT 查詢需要有一個 ORDER BY 字句。 |
6 Top-N
目前僅 Blink 計划器支持 Top-N 。
Top-N 查詢是根據列排序找到N個最大或最小的值。最大值集和最小值集都被視為是一種 Top-N 的查詢。若在批處理或流處理的表中需要顯示出滿足條件的 N 個最底層記錄或最頂層記錄, Top-N 查詢將會十分有用。得到的結果集將可以進行進一步的分析。
Flink 使用 OVER 窗口條件和過濾條件相結合以進行 Top-N 查詢。利用 OVER 窗口的 PARTITION BY
子句的功能,Flink 還支持逐組 Top-N 。 例如,每個類別中實時銷量最高的前五種產品。批處理表和流處理表都支持基於SQL的 Top-N 查詢。 以下是 TOP-N 表達式的語法:
SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum FROM table_name) WHERE rownum <= N [AND conditions]
參數說明:
ROW_NUMBER()
: 根據當前分區內的各行的順序從第一行開始,依次為每一行分配一個唯一且連續的號碼。目前,我們只支持ROW_NUMBER
在 over 窗口函數中使用。未來將會支持RANK()
和DENSE_RANK()
函數。PARTITION BY col1[, col2...]
: 指定分區列,每個分區都將會有一個 Top-N 結果。ORDER BY col1 [asc|desc][, col2 [asc|desc]...]
: 指定排序列,不同列的排序方向可以不一樣。WHERE rownum <= N
: Flink 需要rownum <= N
才能識別一個查詢是否為 Top-N 查詢。 其中, N 代表最大或最小的 N 條記錄會被保留。[AND conditions]
: 在 where 語句中,可以隨意添加其他的查詢條件,但其他條件只允許通過AND
與rownum <= N
結合使用。
流處理模式需注意 TopN 查詢的結果會帶有更新。 Flink SQL 會根據排序鍵對輸入的流進行排序;若 top N 的記錄發生了變化,變化的部分會以撤銷、更新記錄的形式發送到下游。 推薦使用一個支持更新的存儲作為 Top-N 查詢的 sink 。另外,若 top N 記錄需要存儲到外部存儲,則結果表需要擁有相同與 Top-N 查詢相同的唯一鍵。
Top-N 的唯一鍵是分區列和 rownum 列的結合,另外 Top-N 查詢也可以獲得上游的唯一鍵。以下面的任務為例,product_id
是 ShopSales
的唯一鍵,然后 Top-N 的唯一鍵是 [category
, rownum
] 和 [product_id
] 。
下面的樣例描述了如何指定帶有 Top-N 的 SQL 查詢。這個例子的作用是我們上面提到的“查詢每個分類實時銷量最大的五個產品”。
val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // 讀取外部數據源的 DataStream val ds: DataStream[(String, String, String, Long)] = env.addSource(...) // 注冊名為 “ShopSales” 的 DataStream tableEnv.createTemporaryView("ShopSales", ds, $"product_id", $"category", $"product_name", $"sales") // 選擇每個分類中銷量前5的產品 val result1 = tableEnv.sqlQuery( """ |SELECT * |FROM ( | SELECT *, | ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num | FROM ShopSales) |WHERE row_num <= 5 """.stripMargin)
7 無排名輸出優化
如上文所描述,rownum
字段會作為唯一鍵的其中一個字段寫到結果表里面,這會導致大量的結果寫出到結果表。比如,當原始結果(名為 product-1001
)從排序第九變化為排序第一時,排名 1-9 的所有結果都會以更新消息的形式發送到結果表。若結果表收到太多的數據,將會成為 SQL 任務的瓶頸。
優化方法是在 Top-N 查詢的外部 SELECT 子句中省略 rownum 字段。由於前N條記錄的數量通常不大,因此消費者可以自己對記錄進行快速排序,因此這是合理的。去掉 rownum 字段后,上述的例子中,只有變化了的記錄( product-1001
)需要發送到下游,從而可以節省大量的對結果表的 IO 操作。
以下的例子描述了如何以這種方式優化上述的 Top-N 查詢:
val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // 從外部數據源讀取 DataStream val ds: DataStream[(String, String, String, Long)] = env.addSource(...) // 注冊名為 “ShopSales” 的數據源 tableEnv.createTemporaryView("ShopSales", ds, $"product_id", $"category", $"product_name", $"sales") // 選擇每個分類中銷量前5的產品 val result1 = tableEnv.sqlQuery( """ |SELECT product_id, category, product_name, sales -- omit row_num field in the output |FROM ( | SELECT *, | ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num | FROM ShopSales) |WHERE row_num <= 5 """.stripMargin)
使用流處理模式時需注意 為了使上述查詢輸出可以輸出到外部存儲並且結果正確,外部存儲需要擁有與 Top-N 查詢一致的唯一鍵。在上述的查詢例子中,若 product_id
是查詢的唯一鍵,那么外部表必須要有 product_id
作為其唯一鍵。
8 去重
注意 僅 Blink planner 支持去重。
去重是指對在列的集合內重復的行進行刪除,只保留第一行或最后一行數據。 在某些情況下,上游的 ETL 作業不能實現精確一次的端到端,這將可能導致在故障恢復 時,sink 中有重復的記錄。 由於重復的記錄將影響下游分析作業的正確性(例如,SUM
、COUNT
), 所以在進一步分析之前需要進行數據去重。
與 Top-N 查詢相似,Flink 使用 ROW_NUMBER()
去除重復的記錄。理論上來說,去重是一個特殊的 Top-N 查詢,其中 N 是 1 ,記錄則是以處理時間或事件事件進行排序的。
以下代碼展示了去重語句的語法:
SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr [asc|desc]) AS rownum FROM table_name) WHERE rownum = 1
參數說明:
ROW_NUMBER()
: 從第一行開始,依次為每一行分配一個唯一且連續的號碼。PARTITION BY col1[, col2...]
: 指定分區的列,例如去重的鍵。ORDER BY time_attr [asc|desc]
: 指定排序的列。所制定的列必須為 時間屬性。目前僅支持 proctime attribute,在未來版本中將會支持 Rowtime atttribute 。升序( ASC )排列指只保留第一行,而降序排列( DESC )則指保留最后一行。WHERE rownum = 1
: Flink 需要rownum = 1
以確定該查詢是否為去重查詢。
以下的例子描述了如何指定 SQL 查詢以在一個流計算表中進行去重操作。
val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // 從外部數據源讀取 DataStream val ds: DataStream[(String, String, String, Int)] = env.addSource(...) // 注冊名為 “Orders” 的 DataStream tableEnv.createTemporaryView("Orders", ds, $"order_id", $"user", $"product", $"number", $"proctime".proctime) // 由於不應該出現兩個訂單有同一個order_id,所以根據 order_id 去除重復的行,並保留第一行 val result1 = tableEnv.sqlQuery( """ |SELECT order_id, user, product, number |FROM ( | SELECT *, | ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime DESC) as row_num | FROM Orders) |WHERE row_num = 1 """.stripMargin)
9 分組窗口
SQL 查詢的分組窗口是通過 GROUP BY
子句定義的。類似於使用常規 GROUP BY
語句的查詢,窗口分組語句的 GROUP BY
子句中帶有一個窗口函數為每個分組計算出一個結果。以下是批處理表和流處理表支持的分組窗口函數:
分組窗口函數 | 描述 |
---|---|
TUMBLE(time_attr, interval) |
定義一個滾動窗口。滾動窗口把行分配到有固定持續時間( interval )的不重疊的連續窗口。比如,5 分鍾的滾動窗口以 5 分鍾為間隔對行進行分組。滾動窗口可以定義在事件時間(批處理、流處理)或處理時間(流處理)上。 |
HOP(time_attr, interval, interval) |
定義一個跳躍的時間窗口(在 Table API 中稱為滑動窗口)。滑動窗口有一個固定的持續時間( 第二個 interval 參數 )以及一個滑動的間隔(第一個 interval 參數 )。若滑動間隔小於窗口的持續時間,滑動窗口則會出現重疊;因此,行將會被分配到多個窗口中。比如,一個大小為 15 分組的滑動窗口,其滑動間隔為 5 分鍾,將會把每一行數據分配到 3 個 15 分鍾的窗口中。滑動窗口可以定義在事件時間(批處理、流處理)或處理時間(流處理)上。 |
SESSION(time_attr, interval) |
定義一個會話時間窗口。會話時間窗口沒有一個固定的持續時間,但是它們的邊界會根據 interval 所定義的不活躍時間所確定;即一個會話時間窗口在定義的間隔時間內沒有時間出現,該窗口會被關閉。例如時間窗口的間隔時間是 30 分鍾,當其不活躍的時間達到30分鍾后,若觀測到新的記錄,則會啟動一個新的會話時間窗口(否則該行數據會被添加到當前的窗口),且若在 30 分鍾內沒有觀測到新紀錄,這個窗口將會被關閉。會話時間窗口可以使用事件時間(批處理、流處理)或處理時間(流處理)。 |
10 時間屬性
在流處理表中的 SQL 查詢中,分組窗口函數的 time_attr
參數必須引用一個合法的時間屬性,且該屬性需要指定行的處理時間或事件時間。可參考 時間屬性文檔 以了解如何定義時間屬性。
對於批處理的 SQL 查詢,分組窗口函數的 time_attr
參數必須是一個 TIMESTAMP
類型的屬性。
11 選擇分組窗口的開始和結束時間戳
可以使用以下輔助函數選擇組窗口的開始和結束時間戳以及時間屬性:
輔助函數 | 描述 |
---|---|
TUMBLE_START(time_attr, interval) HOP_START(time_attr, interval, interval) SESSION_START(time_attr, interval) |
返回相對應的滾動、滑動和會話窗口范圍內的下界時間戳。 |
TUMBLE_END(time_attr, interval) HOP_END(time_attr, interval, interval) SESSION_END(time_attr, interval) |
返回相對應的滾動、滑動和會話窗口范圍以外的上界時間戳。 注意: 范圍以外的上界時間戳不可以 在隨后基於時間的操作中,作為 行時間屬性 使用,比如 interval join 以及 分組窗口或分組窗口上的聚合。 |
TUMBLE_ROWTIME(time_attr, interval) HOP_ROWTIME(time_attr, interval, interval) SESSION_ROWTIME(time_attr, interval) |
返回相對應的滾動、滑動和會話窗口范圍以內的上界時間戳。 返回的是一個可用於后續需要基於時間的操作的時間屬性(rowtime attribute),比如interval join 以及 分組窗口或分組窗口上的聚合。 |
TUMBLE_PROCTIME(time_attr, interval) HOP_PROCTIME(time_attr, interval, interval) SESSION_PROCTIME(time_attr, interval) |
返回一個可用於后續需要基於時間的操作的 處理時間參數,比如interval join 以及 分組窗口或分組窗口上的聚合. |
注意: 輔助函數必須使用與 GROUP BY
子句中的分組窗口函數完全相同的參數來調用.
以下的例子展示了如何在流處理表中指定使用分組窗口函數的 SQL 查詢。
val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) // 從外部數據源讀取 DataSource val ds: DataStream[(Long, String, Int)] = env.addSource(...) // 計算每日(使用處理時間)的 SUM(amount) tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount", $"proctime".proctime, $"rowtime".rowtime) // 計算每日的 SUM(amount) (使用事件時間) val result1 = tableEnv.sqlQuery( """ |SELECT | user, | TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart, | SUM(amount) | FROM Orders | GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user """.stripMargin) // 計算每日的 SUM(amount) (使用處理時間) val result2 = tableEnv.sqlQuery( "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user") // 使用事件時間計算過去24小時中每小時的 SUM(amount) val result3 = tableEnv.sqlQuery( "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product") // 計算每個以12小時(事件時間)作為不活動時間的會話的 SUM(amount) val result4 = tableEnv.sqlQuery( """ |SELECT | user, | SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart, | SESSION_END(rowtime, INTERVAL '12' HOUR) AS sEnd, | SUM(amount) | FROM Orders | GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user """.stripMargin)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // ingest a DataStream from an external source DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...); // register the DataStream as table "Orders" tableEnv.createTemporaryView("Orders", ds, $("user"), $("product"), $("amount"), $("proctime").proctime(), $("rowtime").rowtime()); // compute SUM(amount) per day (in event-time) Table result1 = tableEnv.sqlQuery( "SELECT user, " + " TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart, " + " SUM(amount) FROM Orders " + "GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user"); // compute SUM(amount) per day (in processing-time) Table result2 = tableEnv.sqlQuery( "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user"); // compute every hour the SUM(amount) of the last 24 hours in event-time Table result3 = tableEnv.sqlQuery( "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product"); // compute SUM(amount) per session with 12 hour inactivity gap (in event-time) Table result4 = tableEnv.sqlQuery( "SELECT user, " + " SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart, " + " SESSION_ROWTIME(rowtime, INTERVAL '12' HOUR) AS snd, " + " SUM(amount) " + "FROM Orders " + "GROUP BY SESSION(rowtime, INTERVAL '12' HOUR), user");
12 模式匹配
操作符 | 描述 |
---|---|
MATCH_RECOGNIZE 流處理 |
根據 更多詳情請參考 檢測表中的模式. |