Flink基礎(二十六):FLINK-SQL語法(二)DQL(二)查詢語句(二)操作符(一)


1 Scan、Projection 與 Filter

操作符 描述
Scan / Select / As
批處理 流處理
SELECT * FROM Orders SELECT a, c AS d FROM Orders
Where / Filter
批處理 流處理
SELECT * FROM Orders WHERE b = 'red' SELECT * FROM Orders WHERE a % 2 = 0
用戶定義標量函數(Scalar UDF)
批處理 流處理

自定義函數必須事先注冊到 TableEnvironment 中。 可閱讀 自定義函數文檔 以獲得如何指定和注冊自定義函數的詳細信息。

SELECT PRETTY_PRINT(user) FROM Orders

2 聚合

操作符 描述
GroupBy 聚合
批處理 流處理
結果更新

注意: GroupBy 在流處理表中會產生更新結果(updating result)。詳情請閱讀 動態表流概念 。

SELECT a, SUM(b) as d FROM Orders GROUP BY a
GroupBy 窗口聚合
批處理 流處理

使用分組窗口對每個組進行計算並得到一個結果行。詳情請閱讀 分組窗口 章節

SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
Over Window aggregation
流處理

注意: 所有的聚合必須定義到同一個窗口中,即相同的分區、排序和區間。當前僅支持 PRECEDING (無界或有界) 到 CURRENT ROW 范圍內的窗口、FOLLOWING 所描述的區間並未支持,ORDER BY 必須指定於單個的時間屬性

SELECT COUNT(amount) OVER ( PARTITION BY user ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM Orders SELECT COUNT(amount) OVER w, SUM(amount) OVER w FROM Orders WINDOW w AS ( PARTITION BY user ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) 
Distinct
批處理 流處理
結果更新
SELECT DISTINCT users FROM Orders

注意: 對於流處理查詢,根據不同字段的數量,計算查詢結果所需的狀態可能會無限增長。請提供具有有效保留間隔的查詢配置,以防止出現過多的狀態。請閱讀 查詢配置 以獲取詳細的信息

Grouping sets, Rollup, Cube
批處理 流處理 結果更新
SELECT SUM(amount) FROM Orders GROUP BY GROUPING SETS ((user), (product))

Note: 流式 Grouping sets、Rollup 以及 Cube 只在 Blink planner 中支持。

Having
批處理 流處理
SELECT SUM(amount) FROM Orders GROUP BY users HAVING SUM(amount) > 50
用戶自定義聚合函數 (UDAGG)
批處理 流處理

UDAGG 必須注冊到 TableEnvironment. 參考自定義函數文檔 以了解如何指定和注冊 UDAGG 。

SELECT MyAggregate(amount) FROM Orders GROUP BY users

3 Joins

操作符 描述
Inner Equi-join
批處理 流處理

目前僅支持 equi-join ,即 join 的聯合條件至少擁有一個相等謂詞。不支持任何 cross join 和 theta join。

注意: Join 的順序沒有進行優化,join 會按照 FROM 中所定義的順序依次執行。請確保 join 所指定的表在順序執行中不會產生不支持的 cross join (笛卡兒積)以至查詢失敗。

SELECT * FROM Orders INNER JOIN Product ON Orders.productId = Product.id

注意: 流查詢中可能會因為不同行的輸入數量導致計算結果的狀態無限增長。請提供具有有效保留間隔的查詢配置,以防止出現過多的狀態。詳情請參考 查詢配置 頁面.

Outer Equi-join
批處理 流處理 結果更新

目前僅支持 equi-join ,即 join 的聯合條件至少擁有一個相等謂詞。不支持任何 cross join 和 theta join。

注意: Join 的順序沒有進行優化,join 會按照 FROM 中所定義的順序依次執行。請確保 join 所指定的表在順序執行中不會產生不支持的 cross join (笛卡兒積)以至查詢失敗。

SELECT * FROM Orders LEFT JOIN Product ON Orders.productId = Product.id SELECT * FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id SELECT * FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id

注意: 流查詢中可能會因為不同行的輸入數量導致計算結果的狀態無限增長。請提供具有有效保留間隔的查詢配置,以防止出現過多的狀態。詳情請參考 查詢配置 頁面.

Interval Join
批處理 流處理

注意:Interval join (時間區間關聯)是常規 join 的子集,可以使用流的方式進行處理。

Interval join需要至少一個 equi-join 謂詞和一個限制了雙方時間的 join 條件。例如使用兩個適當的范圍謂詞(<, <=, >=, >),一個 BETWEEN 謂詞或一個比較兩個輸入表中相同類型的 時間屬性 (即處理時間和事件時間)的相等謂詞

比如,以下謂詞是合法的 interval join 條件:

  • ltime = rtime
  • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
  • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
SELECT * FROM Orders o, Shipments s WHERE o.id = s.orderId AND o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
以上示例中,所有在收到后四小時內發貨的 order 會與他們相關的 shipment 進行 join。
Expanding arrays into a relation
批處理 流處理

目前尚未支持非嵌套的 WITH ORDINALITY 。

SELECT users, tag FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
Join 表函數 (UDTF)
批處理 流處理

將表與表函數的結果進行 join 操作。左表(outer)中的每一行將會與調用表函數所產生的所有結果中相關聯行進行 join 。

用戶自定義表函數( User-defined table functions,UDTFs ) 在執行前必須先注冊。請參考 UDF 文檔 以獲取更多關於指定和注冊UDF的信息

Inner Join

若表函數返回了空結果,左表(outer)的行將會被刪除。

SELECT users, tag FROM Orders, LATERAL TABLE(unnest_udtf(tags)) AS t(tag)

Left Outer Join

若表函數返回了空結果,將會保留相對應的外部行並用空值填充結果。

SELECT users, tag FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) AS t(tag) ON TRUE

注意: 當前僅支持文本常量 TRUE 作為針對橫向表的左外部聯接的謂詞。

Join Temporal Table Function
流處理

Temporal Tables 是跟隨時間變化而變化的表。

Temporal Table Function 提供訪問 Temporal Tables 在某一時間點的狀態的能力。 Join Temporal Table Function 的語法與 Join Table Function 一致。

注意: 目前僅支持在 Temporal Tables 上的 inner join 。

假如 Rates 是一個 Temporal Table Function, join 可以使用 SQL 進行如下的表達:

SELECT o_amount, r_rate FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency

請查看 Temporal Tables 概念描述 以了解詳細信息。

Join Temporal Tables
批處理 流處理

Temporal Tables 是隨時間變化而變化的表。 Temporal Table 提供訪問指定時間點的 temporal table 版本的功能。

僅支持帶有處理時間的 temporal tables 的 inner 和 left join。

下述示例中,假設 LatestRates 是一個根據最新的 rates 物化的 Temporal Table 。

SELECT o.amout, o.currency, r.rate, o.amount * r.rate FROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency

請閱讀 Temporal Tables 概念描述以了解詳細信息。

僅 Blink planner 支持。

4 集合操作

操作符 描述
Union
批處理
SELECT * FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) UNION (SELECT user FROM Orders WHERE b = 0) )
UnionAll
批處理 流處理
SELECT * FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) UNION ALL (SELECT user FROM Orders WHERE b = 0) )
Intersect / Except
批處理
SELECT * FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) INTERSECT (SELECT user FROM Orders WHERE b = 0) )
SELECT * FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) EXCEPT (SELECT user FROM Orders WHERE b = 0) )
In
批處理 流處理

若表達式在給定的表子查詢中存在,則返回 true 。子查詢表必須由單個列構成,且該列的數據類型需與表達式保持一致。

SELECT user, amount FROM Orders WHERE product IN ( SELECT product FROM NewProducts )

注意: 在流查詢中,這一操作將會被重寫為 join 和 group 操作。該查詢所需要的狀態可能會由於不同的輸入行數而導致無限增長。請在查詢配置中提合理的保留間隔以避免產生狀態過大。請閱讀 查詢配置 以了解詳細信息

Exists
批處理 流處理

若子查詢的結果多於一行,將返回 true 。僅支持可以被通過 join 和 group 重寫的操作。

SELECT user, amount FROM Orders WHERE product EXISTS ( SELECT product FROM NewProducts )

注意: 在流查詢中,這一操作將會被重寫為 join 和 group 操作。該查詢所需要的狀態可能會由於不同的輸入行數而導致無限增長。請在查詢配置中提合理的保留間隔以避免產生狀態過大。請閱讀 查詢配置 以了解詳細信息

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM