本文翻譯自官網:SQL https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html
這是Flink 支持的 數據定義語言(DDL) 和數據操縱語言的完整列表。
查詢
SQL查詢使用TableEnvironment 的 sqlQuery() 方法指定。這個方法返回一個表作為SQL查詢的結果。這個表可以在后續的 SQL 和 Table API 中查詢,可以轉換為 DataSet 和 DataStream ,或寫到 TableSink 中。SQL和Table API查詢可以無縫混合,可以進行整體優化並將其轉換為單個程序。
為了在 SQL 查詢中使用一個表,它必須在 TableEnvironment 中注冊。表可以通過 TableSource, Table, CREATE TABLE statement, DataStream, or DataSet 注冊。或者,用戶還可以在TableEnvironment中注冊外部目錄以指定數據源的位置。
為方便起見,Table.toString()自動在其TableEnvironment中以唯一名稱注冊該表並返回該名稱。 因此,可以將Table對象直接內聯到SQL查詢中(通過字符串連接),如下面的示例所示。
注意:Flink的SQL支持尚未完成。 包含不受支持的SQL功能的查詢會導致TableException。 以下各節列出了批處理表和流表上SQL的受支持功能。
指定查詢
以下示例顯示如何在已注冊和內聯表上指定SQL查詢。
val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) // read a DataStream from an external source val ds: DataStream[(Long, String, Integer)] = env.addSource(...) // SQL query with an inlined (unregistered) table val table = ds.toTable(tableEnv, 'user, 'product, 'amount) val result = tableEnv.sqlQuery( s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'") // SQL query with a registered table // register the DataStream under the name "Orders" tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount) // run a SQL query on the Table and retrieve the result as a new Table val result2 = tableEnv.sqlQuery( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") // SQL update with a registered table // create and register a TableSink val csvSink: CsvTableSink = new CsvTableSink("/path/to/file", ...) val fieldNames: Array[String] = Array("product", "amount") val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT) tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink) // run a SQL update query on the Table and emit the result to the TableSink tableEnv.sqlUpdate( "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
Supported Syntax
Flink使用支持標准ANSI SQL的Apache Calcite解析SQL。 Flink不支持DDL語句。
以下BNF語法描述了批處理和流查詢中支持的SQL功能的超集。 “操作”部分顯示了受支持功能的示例,並指示僅批處理或流查詢支持哪些功能。
insert: INSERT INTO tableReference query query: values | { select | selectWithoutFrom | query UNION [ ALL ] query | query EXCEPT query | query INTERSECT query } [ ORDER BY orderItem [, orderItem ]* ] [ LIMIT { count | ALL } ] [ OFFSET start { ROW | ROWS } ] [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY] orderItem: expression [ ASC | DESC ] select: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ] selectWithoutFrom: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } projectItem: expression [ [ AS ] columnAlias ] | tableAlias . * tableExpression: tableReference [, tableReference ]* | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ] joinCondition: ON booleanExpression | USING '(' column [, column ]* ')' tableReference: tablePrimary [ matchRecognize ] [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ] tablePrimary: [ TABLE ] [ [ catalogName . ] schemaName . ] tableName | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')' | UNNEST '(' expression ')' values: VALUES expression [, expression ]* groupItem: expression | '(' ')' | '(' expression [, expression ]* ')' | CUBE '(' expression [, expression ]* ')' | ROLLUP '(' expression [, expression ]* ')' | GROUPING SETS '(' groupItem [, groupItem ]* ')' windowRef: windowName | windowSpec windowSpec: [ windowName ] '(' [ ORDER BY orderItem [, orderItem ]* ] [ PARTITION BY expression [, expression ]* ] [ RANGE numericOrIntervalExpression {PRECEDING} | ROWS numericExpression {PRECEDING} ] ')' matchRecognize: MATCH_RECOGNIZE '(' [ PARTITION BY expression [, expression ]* ] [ ORDER BY orderItem [, orderItem ]* ] [ MEASURES measureColumn [, measureColumn ]* ] [ ONE ROW PER MATCH ] [ AFTER MATCH ( SKIP TO NEXT ROW | SKIP PAST LAST ROW | SKIP TO FIRST variable | SKIP TO LAST variable | SKIP TO variable ) ] PATTERN '(' pattern ')' [ WITHIN intervalLiteral ] DEFINE variable AS condition [, variable AS condition ]* ')' measureColumn: expression AS alias pattern: patternTerm [ '|' patternTerm ]* patternTerm: patternFactor [ patternFactor ]* patternFactor: variable [ patternQuantifier ] patternQuantifier: '*' | '*?' | '+' | '+?' | '?' | '??' | '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?'] | '{' repeat '}'
Flink SQL對類似於Java的標識符(表,屬性,函數名稱)使用詞法策略:
- 不管是否引用標識符,都保留標識符的大小寫。
- 之后,標識符區分大小寫。
- 與Java不同,反引號允許標識符包含非字母數字字符(例如
"SELECT a AS `my field` FROM t")。
字符串文字必須用單引號引起來(例如SELECT 'Hello World')。復制單引號以進行轉義(例如SELECT 'It''s me.')。字符串文字中支持Unicode字符。如果需要明確的unicode代碼點,請使用以下語法:
- 使用反斜杠(
\)作為轉義字符(默認):SELECT U&'\263A' - 使用自定義轉義字符:
SELECT U&'#263A' UESCAPE '#'
Operations
Show and Use
| Operation | Description |
|---|---|
| Show Batch Streaming |
Show all catalogs Show all databases in the current catalog Show all tables in the current database in the current catalog |
| Use Batch Streaming |
Set current catalog for the session Set current database of the current catalog for the session |
Scan, Projection, and Filter
| Operation | Description |
|---|---|
| Scan / Select / As Batch Streaming |
|
| Where / Filter Batch Streaming |
|
| User-defined Scalar Functions (Scalar UDF) Batch Streaming |
UDFs 必須在TableEnvironment 中注冊了. See the UDF documentation 有關如何指定和注冊標量UDF的詳細信息. |
Aggregations
| Operation | Description |
|---|---|
| GroupBy Aggregation Batch Streaming Result Updating |
Note: 流表上的GroupBy產生更新結果. See the Dynamic Tables Streaming Concepts page for details. |
| GroupBy Window Aggregation Batch Streaming |
使用分組窗口可為每個組計算一個結果行。 有關更多詳細信息,請參見Group Windows 部分。 |
| Over Window aggregation Streaming |
Note: 必須在同一窗口(即相同的分區,排序和范圍)上定義所有聚合。 當前,僅支持PRECEDING(無邊界和有界)到CURRENT ROW范圍的窗口。 目前尚不支持帶有FOLLOWING的范圍。 必須在單個時間屬性上指定ORDER BY |
| Distinct Batch Streaming Result Updating |
Note: 對於流查詢,根據不同字段的數量,計算查詢結果所需的狀態可能會無限增長。 請提供具有有效保留間隔的查詢配置,以防止出現過多的狀態。 有關詳細信息,請參見查詢配置。. |
| Grouping sets, Rollup, Cube Batch |
|
| Having Batch Streaming |
|
| User-defined Aggregate Functions (UDAGG) Batch Streaming |
UDAGG必須在TableEnvironment中注冊。 有關如何指定和注冊UDAGG的詳細信息,請參見UDF文檔。 |
Joins
| Operation | Description |
|---|---|
| Inner Equi-join Batch Streaming |
當前,僅支持等值連接,即具有至少一個具有相等謂詞的聯合條件的連接。 不支持任意交叉或 theta 連接 (自連接). Note: 連接順序未優化。 表按照在FROM子句中指定的順序進行連接。 確保以不產生交叉聯接(笛卡爾乘積)的順序指定表,該順序不被支持並會導致查詢失敗. Note: 對於流查詢,根據不同輸入行的數量,計算查詢結果所需的狀態可能會無限增長。 請提供具有有效保留間隔的查詢配置,以防止出現過多的狀態。 有關詳細信息,請參見查詢配置。 |
| Outer Equi-join Batch StreamingResult Updating |
當前,僅支持等值連接,即具有至少一個具有相等謂詞的聯合條件的連接。 不支持任意交叉或 theta 連接 (自連接). Note: 連接順序未優化。 表按照在FROM子句中指定的順序進行連接。 確保以不產生交叉聯接(笛卡爾乘積)的順序指定表,該順序不被支持並會導致查詢失敗. Note: 對於流查詢,根據不同輸入行的數量,計算查詢結果所需的狀態可能會無限增長。 請提供具有有效保留間隔的查詢配置,以防止出現過多的狀態。 有關詳細信息,請參見查詢配置。 |
| Time-windowed Join Batch Streaming |
Note: 時間窗口連接是常規連接的子集,可以以流方式處理. 時間窗連接需要至少一個等聯接謂詞和在兩側限制時間的連接條件。兩個輸入表可以通過兩個適當的范圍謂詞(<,<=,> =,>),BETWEEN謂詞或比較相同類型的時間屬性 (即處理時間或事件時間)的單個相等謂詞來定義這樣的條件 . 例如,以下謂詞是有效的窗口連接條件:
如果訂單在收到訂單后四個小時內發貨,則上面的示例會將所有訂單與其相應的發貨合並在一起. |
| Expanding arrays into a relation Batch Streaming |
目前尚不支持使用ORDINALITY取消嵌套. |
| Join with Table Function (UDTF) Batch Streaming |
用表函數的結果連接表。 左(外)表的每一行都與表函數的相應調用產生的所有行連接在一起. 必須先注冊用戶定義的表函數(UDTF)。 有關如何指定和注冊UDTF的詳細信息,請參見UDF文檔。 Inner Join 如果左表(外部)的表函數調用返回空結果,則會刪除該表的左行. Left Outer Join 如果表函數調用返回空結果,則保留對應的外部行,並用空值填充結果. Note: 當前,僅支持將文字TRUE作為針對橫向表的左外部連接的謂詞。 |
| Join with Temporal Table Function Streaming |
時態表是跟蹤隨時間變化的表. 時態表功能提供對特定時間點時態表狀態的訪問。 使用時態表函數聯接表的語法與使用表函數聯接的語法相同. Note: 當前僅支持帶有時態表的內部聯接. 假設Rates是一個時態表函數,則聯接可以用SQL表示如下: 有關更多信息,請檢查更詳細的時態表概念描述. |
| Join with Temporal Table Batch Streaming |
時態表是跟蹤隨時間變化的表。 臨時表提供對特定時間點的時態表版本的訪問. 僅支持帶有處理時間時態表的內部聯接和左聯接. 下面的示例假定LatestRates是一個以最新速率物化的時態表. 有關更多信息,請檢查更詳細的時態表概念描述. 只有Blink planner 支持. |
Set Operations
| Operation | Description |
|---|---|
| Union Batch |
|
| UnionAll Batch Streaming |
|
| Intersect / Except Batch |
|
| In Batch Streaming |
如果給定表子查詢中存在表達式,則返回true。 子查詢表必須由一列組成。 此列的數據類型必須與表達式相同. Note: 對於流查詢,該操作將被重寫為join and group操作。 根據不同輸入行的數量,計算查詢結果所需的狀態可能會無限增長。 請提供具有有效保留間隔的查詢配置,以防止出現過多的狀態。 有關詳細信息,請參見查詢配置。. |
| Exists Batch Streaming |
如果子查詢返回至少一行,則返回true。 僅在可以在聯接和組操作中重寫操作時才受支持. Note: 對於流查詢,該操作將被重寫為join and group操作。 根據不同輸入行的數量,計算查詢結果所需的狀態可能會無限增長。 請提供具有有效保留間隔的查詢配置,以防止出現過多的狀態。 有關詳細信息,請參見查詢配置。. |
OrderBy & Limit
| Operation | Description |
|---|---|
| Order By Batch Streaming |
Note: 流查詢的結果必須主要按升序時間屬性排序。 支持其他排序屬性. |
| Limit Batch |
Note: LIMIT子句需要ORDER BY子句. |
Top-N
注意:僅Blink planner 僅支持Top-N。
Top-N 查詢要求按列排序的N個最小值或最大值。 最小和最大值集都被認為是Top-N查詢。 在需要只顯示批處理/流表中的N個最底層記錄或N個最頂層記錄的情況下,Top-N查詢很有用。 此結果集可用於進一步分析。
Flink使用OVER窗口子句和過濾條件的組合來表示Top-N查詢。 借助OVER window PARTITION BY子句的強大功能,Flink還支持每組Top-N。 例如,每個類別中實時銷量最高的前五種產品。 批處理表和流表上的SQL支持Top-N查詢。
下面顯示了TOP-N語句的語法:
SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum FROM table_name) WHERE rownum <= N [AND conditions]
參數含義:
ROW_NUMBER(): 根據分區中各行的順序,為每一行分配一個唯一的順序號(從1開始)。 目前,我們僅支持ROW_NUMBER作為窗口函數。 將來,我們將支持RANK()和DENSE_RANK().PARTITION BY col1[, col2...]: 指定分區列。 每個分區都有一個Top-N結果.ORDER BY col1 [asc|desc][, col2 [asc|desc]...]: 指定排序列。 不同列上的排序方向可以不同.WHERE rownum <= N: Flink將rownum <= N識別為該查詢是Top-N查詢。 N代表將保留N個最小或最大記錄.[AND conditions]: 可以在where子句中添加其他條件,但是其他條件只能與AND結合使用rownum <= N.
流模式下的注意:TopN查詢會更新結果。 Flink SQL將根據順序鍵對輸入數據流進行排序,因此,如果前N條記錄已更改,則更改后的記錄將作為撤消/更新記錄發送到下游。 建議使用支持更新的存儲作為Top-N查詢的接收器。 此外,如果需要將前N條記錄存儲在外部存儲中,則結果表應具有與前N條查詢相同的唯一鍵。
Top-N查詢的唯一鍵是分區列和rownum列的組合。 Top-N查詢還可以導出上游的唯一鍵。 以下面的工作為例,假設product_id是ShopSales的唯一鍵,那么Top-N查詢的唯一鍵是[category,rownum]和[product_id]。
以下示例顯示如何在流表上使用Top-N指定SQL查詢。 這是我們上面提到的“每個類別中實時銷量最高的前五種產品”的示例。
val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // read a DataStream from an external source val ds: DataStream[(String, String, String, Long)] = env.addSource(...) // register the DataStream under the name "ShopSales" tableEnv.registerDataStream("ShopSales", ds, 'product_id, 'category, 'product_name, 'sales) // select top-5 products per category which have the maximum sales. val result1 = tableEnv.sqlQuery( """ |SELECT * |FROM ( | SELECT *, | ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num | FROM ShopSales) |WHERE row_num <= 5 """.stripMargin)
沒有排名輸出優化
如上所述,rownum字段將作為唯一鍵的一個字段寫入結果表,這可能導致許多記錄被寫入結果表。 例如,當更新排名9的記錄(例如product-1001)並將其排名升級到1時,排名1到9的所有記錄將作為更新消息輸出到結果表。 如果結果表接收到太多數據,它會成為SQL作業的瓶頸。
優化方法是在Top-N查詢的外部SELECT子句中省略rownum字段。 這是合理的,因為前N條記錄的數量通常不大,因此消費者可以自己對記錄進行快速排序。 如果沒有rownum字段,則在上面的示例中,僅需要將已更改的記錄(product-1001)發送到下游,這可以減少結果表的大量IO。
下面的示例顯示如何以這種方式優化上面的Top-N示例:
val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // read a DataStream from an external source val ds: DataStream[(String, String, String, Long)] = env.addSource(...) // register the DataStream under the name "ShopSales" tableEnv.registerDataStream("ShopSales", ds, 'product_id, 'category, 'product_name, 'sales) // select top-5 products per category which have the maximum sales. val result1 = tableEnv.sqlQuery( """ |SELECT product_id, category, product_name, sales -- omit row_num field in the output |FROM ( | SELECT *, | ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num | FROM ShopSales) |WHERE row_num <= 5 """.stripMargin)
流傳輸模式中的注意事項:為了將以上查詢輸出到外部存儲並獲得正確的結果,外部存儲必須具有與Top-N查詢相同的唯一鍵。 在上面的示例查詢中,如果product_id是查詢的唯一鍵,則外部表也應將product_id作為唯一鍵。
重復數據刪除
注意:重復數據刪除僅在Blink planner 中受支持。
重復數據刪除是指刪除在一組列上重復的行,僅保留第一個或最后一個。 在某些情況下,上游ETL作業不是一次精確的端到端,這可能導致在故障轉移的情況下,接收器中有重復的記錄。 但是,重復的記錄將影響下游分析作業的正確性(例如SUM,COUNT)。 因此,在進一步分析之前需要進行重復數據刪除。
Flink使用ROW_NUMBER()刪除重復項,就像Top-N查詢一樣。 從理論上講,重復數據刪除是Top-N的一種特殊情況,其中N為1,並按處理時間或事件時間排序。
下面顯示了重復數據刪除語句的語法:
SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr [asc|desc]) AS rownum FROM table_name) WHERE rownum = 1
參數含義:
ROW_NUMBER(): 從每一行開始,為每一行分配一個唯一的順序號.PARTITION BY col1[, col2...]: 指定分區列,即重復數據刪除鍵.ORDER BY time_attr [asc|desc]: 指定排序列,它必須是時間屬性。 當前僅支持proctime屬性。 將來將支持行時間屬性。 通過ASC排序意味着保留第一行,通過DESC排序意味着保留最后一行.WHERE rownum = 1: Flink要求rownum = 1才能識別此查詢是重復數據刪除.
以下示例說明如何在流表上指定帶有重復數據刪除功能的SQL查詢。
val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // read a DataStream from an external source val ds: DataStream[(String, String, String, Int)] = env.addSource(...) // register the DataStream under the name "Orders" tableEnv.registerDataStream("Orders", ds, 'order_id, 'user, 'product, 'number, 'proctime.proctime) // remove duplicate rows on order_id and keep the first occurrence row, // because there shouldn't be two orders with the same order_id. val result1 = tableEnv.sqlQuery( """ |SELECT order_id, user, product, number |FROM ( | SELECT *, | ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime DESC) as row_num | FROM Orders) |WHERE row_num = 1 """.stripMargin)
Insert
| Operation | Description |
|---|---|
| Insert Into Batch Streaming |
輸出表必須在TableEnvironment中注冊(請參閱注冊TableSink)。 此外,已注冊表的結構必須與查詢的結構匹配. |
Group Windows
分組窗口在SQL查詢的GROUP BY子句中定義。 就像帶有常規GROUP BY子句的查詢一樣,帶有GROUP BY子句(包括分組窗口函數)的查詢每個組只計算一個結果行。 批處理表和流表上的SQL支持以下分組窗口功能。
| Group Window Function | Description |
|---|---|
TUMBLE(time_attr, interval) |
定義滾動時間窗口。 滾動時間窗口將行分配給具有固定持續時間(間隔)的非重疊連續窗口。 例如,5分鍾的滾動窗口以5分鍾為間隔對行進行分組。 可以在事件時間(流+批處理)或處理時間(流)上定義滾動窗口。 |
HOP(time_attr, interval, interval) |
定義一個跳躍時間窗口(在Table API中稱為滑動窗口)。 跳躍時間窗口具有固定的持續時間(第二個間隔參數),並按指定的跳躍間隔(第一個間隔參數)跳躍。 如果跳躍間隔小於窗口大小,則跳躍窗口重疊。 因此,可以將行分配給多個窗口。 例如,一個15分鍾大小和5分鍾跳躍間隔的跳窗將每行分配給3個15分鍾大小的不同窗口, 它們以5分鍾的間隔進行評估。 可以在事件時間(流+批處理)或處理時間(流)上定義跳躍窗口。 |
SESSION(time_attr, interval) |
定義會話時間窗口。 會話時間窗口沒有固定的持續時間,但其邊界由不活動的時間間隔定義,即,如果在定義的間隔時間段內未出現任何事件,則關閉會話窗口。 例如,間隔30分鍾的會話窗口在30分鍾不活動后觀察到一行時開始(否則該行將被添加到現有窗口),如果在30分鍾內未添加任何行,則關閉該窗口。 會話窗口可以在事件時間(流+批處理)或處理時間(流)上工作。 |
時間屬性
對於流表上的SQL查詢,分組窗口函數的time_attr參數必須引用一個有效的時間屬性,該屬性指定行的處理時間或事件時間。 請參閱時間屬性文檔以了解如何定義時間屬性。
對於批處理表上的SQL,分組窗口函數的time_attr參數必須是TIMESTAMP類型的屬性。
選擇分組窗口開始和結束時間戳
可以使用以下輔助功能選擇組窗口的開始和結束時間戳以及時間屬性:
| Auxiliary Function | Description |
|---|---|
TUMBLE_START(time_attr, interval)HOP_START(time_attr, interval, interval)SESSION_START(time_attr, interval) |
返回相應的滾動,跳動或會話窗口的包含下限的時間戳. |
TUMBLE_END(time_attr, interval)HOP_END(time_attr, interval, interval)SESSION_END(time_attr, interval) |
返回相應的滾動,跳躍或會話窗口的排他上限的時間戳. Note: 排他上限時間戳不能在隨后的基於時間的操作(例如帶時間窗扣的連接和分組窗口或窗口聚合中)中用作行時間屬性。. |
TUMBLE_ROWTIME(time_attr, interval)HOP_ROWTIME(time_attr, interval, interval)SESSION_ROWTIME(time_attr, interval) |
返回相應的滾動,跳躍或會話窗口的包含上限的時間戳. 結果屬性是一個行時間屬性,可以在隨后的基於時間的操作(例如帶時間窗口的連接和分組窗口或整個窗口聚合)中使用. |
TUMBLE_PROCTIME(time_attr, interval)HOP_PROCTIME(time_attr, interval, interval)SESSION_PROCTIME(time_attr, interval) |
返回一個proctime屬性,該屬性可以在隨后的基於時間的操作(例如帶時間窗口的連接和分組窗口或窗口聚合中)中使用. |
注意:必須使用與GROUP BY子句中的分組窗口函數完全相同的參數來調用輔助函數。
以下示例說明如何在流表上使用分組窗口指定SQL查詢。
val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) // read a DataStream from an external source val ds: DataStream[(Long, String, Int)] = env.addSource(...) // register the DataStream under the name "Orders" tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount, 'proctime.proctime, 'rowtime.rowtime) // compute SUM(amount) per day (in event-time) val result1 = tableEnv.sqlQuery( """ |SELECT | user, | TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart, | SUM(amount) | FROM Orders | GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user """.stripMargin) // compute SUM(amount) per day (in processing-time) val result2 = tableEnv.sqlQuery( "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user") // compute every hour the SUM(amount) of the last 24 hours in event-time val result3 = tableEnv.sqlQuery( "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product") // compute SUM(amount) per session with 12 hour inactivity gap (in event-time) val result4 = tableEnv.sqlQuery( """ |SELECT | user, | SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart, | SESSION_END(rowtime, INTERVAL '12' HOUR) AS sEnd, | SUM(amount) | FROM Orders | GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user """.stripMargin)
Pattern Recognition
| Operation | Description |
|---|---|
| MATCH_RECOGNIZE Streaming |
根據MATCH_RECOGNIZE ISO標准在流表中搜索給定的模式。 這樣就可以在SQL查詢中表達復雜的事件處理(CEP)邏輯. 有關更詳細的描述,請參見用於檢測表中模式的專用頁面. |
DDL
DDL是通過TableEnvironment的sqlUpdate()方法指定的。 對於成功創建表,該方法不返回任何內容。 可以使用CREATE TABLE語句將表注冊到目錄中,然后可以在TableEnvironment的方法sqlQuery()中的SQL查詢中引用表。
注意:Flink的DDL支持尚未完成。 包含不受支持的SQL功能的查詢會導致TableException。 以下各節列出了批處理表和流表上SQL DDL的受支持功能。
指定DDL
以下示例顯示如何指定SQL DDL。
val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) // SQL query with a registered table // register a table named "Orders" tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)"); // run a SQL query on the Table and retrieve the result as a new Table val result = tableEnv.sqlQuery( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'"); // SQL update with a registered table // register a TableSink tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH ('connector.path'='/path/to/file' ...)"); // run a SQL update query on the Table and emit the result to the TableSink tableEnv.sqlUpdate( "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
Create Table
CREATE TABLE [catalog_name.][db_name.]table_name
[(col_name1 col_type1 [COMMENT col_comment1], ...)]
[COMMENT table_comment]
[PARTITIONED BY (col_name1, col_name2, ...)]
WITH (key1=val1, key2=val2, ...)
創建具有給定表屬性的表。 如果數據庫中已經存在具有相同名稱的表,則會引發異常。
PARTITIONED BY
按指定的列對創建的表進行分區。 如果將此表用作文件系統接收器,則會為每個分區創建一個目錄。
WITH OPTIONS
用於創建表源/接收器的表屬性。 這些屬性通常用於查找和創建基礎連接器。
表達式key1 = val1的鍵和值都應為字符串文字。 有關不同連接器的所有受支持表屬性,請參閱“連接到外部系統”中的詳細信息。
注意:表名可以采用三種格式:1. catalog_name.db_name.table_name 2. db_name.table_name 3. table_name。 對於catalog_name.db_name.table_name,該表將被注冊到 catalog 名為“ catalog_name”和數據庫名為“ db_name”的元存儲中; 對於db_name.table_name,該表將被注冊到執行表環境和名為“ db_name”的數據庫的當前 catalog 中; 對於table_name,該表將被注冊到執行表環境的當前 catalog 和數據庫中。
注意:用CREATE TABLE語句注冊的表既可以用作表源,也可以用作表接收器,在DML中引用它之前,我們無法確定是將其用作源還是接收器。
Drop Table
DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
刪除具有給定表名的表。 如果要刪除的表不存在,則會引發異常。
IF EXISTS
如果該表不存在,則什么也不會發生。
Data Types
請參閱有關數據類型的專用頁面。
通用類型和(嵌套的)復合類型(例如POJO,元組,行,Scala案例類)也可以是一行的字段。
可以使用值訪問功能訪問具有任意嵌套的復合類型的字段。
泛型類型被視為黑盒,可以通過用戶定義的函數傳遞或處理。
對於DDL,我們支持在“數據類型”頁面中定義的完整數據類型。
注意:sql查詢中不支持某些數據類型(強制轉換表達式或文字)。 例如。 STRING,BYTES,不帶時區的TIME(p),帶本地時區的TIME(p),不帶時區的TIMESTAMP(p),帶本地時區的TIMESTAMP(p),數組,多集,行。
保留關鍵字
盡管尚未實現所有SQL功能,但某些字符串組合已作為關鍵字保留,以備將來使用。 如果您想使用以下字符串之一作為字段名稱,請確保將其用反引號引起來(例如,“ value”,“ count”)。
A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, BYTES, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRING, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

