來源:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html
0 簡介
SELECT 語句和 VALUES 語句需要使用 TableEnvironment 的 sqlQuery() 方法加以指定。這個方法會以 Table 的形式返回 SELECT (或 VALUE)的查詢結果。Table 可以被用於 隨后的SQL 與 Table API 查詢 、 轉換為 DataSet 或 DataStream 或 輸出到 TableSink 。SQL 與 Table API 的查詢可以進行無縫融合、整體優化並翻譯為單一的程序。
為了可以在 SQL 查詢中訪問到表,你需要先 在 TableEnvironment 中注冊表 。表可以通過 TableSource、 Table、CREATE TABLE 語句、 DataStream 或 DataSet 注冊。 用戶也可以通過 向 TableEnvironment 中注冊 catalog 的方式指定數據源的位置。
為方便起見 Table.toString() 將會在其 TableEnvironment 中自動使用一個唯一的名字注冊表並返回表名。 因此, Table 對象可以如下文所示樣例,直接內聯到 SQL 語句中。
注意: 查詢若包括了不支持的 SQL 特性,將會拋出 TableException。批處理和流處理所支持的 SQL 特性將會在下述章節中列出。
1 指定查詢
以下示例顯示如何在已注冊和內聯表上指定 SQL 查詢。
val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) // 從外部數據源讀取 DataStream val ds: DataStream[(Long, String, Integer)] = env.addSource(...) // 使用 SQL 查詢內聯的(未注冊的)表 val table = ds.toTable(tableEnv, $"user", $"product", $"amount") val result = tableEnv.sqlQuery( s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'") // 使用名稱 "Orders" 注冊一個 DataStream tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount") // 在表上執行 SQL 查詢並得到以新表返回的結果 val result2 = tableEnv.sqlQuery( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") // 創建並注冊一個 TableSink val schema = new Schema() .field("product", DataTypes.STRING()) .field("amount", DataTypes.INT()) tableEnv.connect(new FileSystem().path("/path/to/file")) .withFormat(...) .withSchema(schema) .createTemporaryTable("RubberOrders") // 在表上執行插入操作,並把結果發出到 TableSink tableEnv.executeSql( "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
2 執行查詢
SELECT 語句或者 VALUES 語句可以通過 TableEnvironment.executeSql() 方法來執行,將選擇的結果收集到本地。該方法返回 TableResult 對象用於包裝查詢的結果。和 SELECT 語句很像,一個 Table 對象可以通過 Table.execute() 方法執行從而將 Table 的內容收集到本地客戶端。 TableResult.collect() 方法返回一個可以關閉的行迭代器。除非所有的數據都被收集到本地,否則一個查詢作業永遠不會結束。所以我們應該通過 CloseableIterator#close() 方法主動地關閉作業以防止資源泄露。 我們還可以通過 TableResult.print() 方法將查詢結果打印到本地控制台。TableResult 中的結果數據只能被訪問一次,因此一個 TableResult 實例中,collect() 方法和 print() 方法不能被同時使用。
對於流模式,TableResult.collect() 方法或者 TableResult.print 方法保證端到端精確一次的數據交付。這就要求開啟 checkpointing。默認情況下 checkpointing 是禁止的,我們可以通過 TableConfig 設置 checkpointing 相關屬性(請參考 checkpointing 配置)來開啟 checkpointing。 因此一條結果數據只有在其對應的 checkpointing 完成后才能在客戶端被訪問。
注意: 對於流模式,當前僅支持追加模式的查詢語句,並且應該開啟 checkpoint。因為一條結果只有在其對應的 checkpoint 完成之后才能被客戶端訪問到。
val env = StreamExecutionEnvironment.getExecutionEnvironment() val tableEnv = StreamTableEnvironment.create(env, settings) // enable checkpointing tableEnv.getConfig.getConfiguration.set( ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE) tableEnv.getConfig.getConfiguration.set( ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10)) tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)") // execute SELECT statement val tableResult1 = tableEnv.executeSql("SELECT * FROM Orders") val it = tableResult1.collect() try while (it.hasNext) { val row = it.next // handle row } finally it.close() // close the iterator to avoid resource leak // execute Table val tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute() tableResult2.print()
3 語法
Flink 通過支持標准 ANSI SQL的 Apache Calcite 解析 SQL。
以下 BNF-語法 描述了批處理和流處理查詢中所支持的 SQL 特性的超集。其中 操作符 章節展示了所支持的特性的樣例,並指明了哪些特性僅適用於批處理或流處理。
query: values | { select | selectWithoutFrom | query UNION [ ALL ] query | query EXCEPT query | query INTERSECT query } [ ORDER BY orderItem [, orderItem ]* ] [ LIMIT { count | ALL } ] [ OFFSET start { ROW | ROWS } ] [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY] orderItem: expression [ ASC | DESC ] select: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ] selectWithoutFrom: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } projectItem: expression [ [ AS ] columnAlias ] | tableAlias . * tableExpression: tableReference [, tableReference ]* | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ] joinCondition: ON booleanExpression | USING '(' column [, column ]* ')' tableReference: tablePrimary [ matchRecognize ] [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ] tablePrimary: [ TABLE ] [ [ catalogName . ] schemaName . ] tableName [ dynamicTableOptions ] | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')' | UNNEST '(' expression ')' dynamicTableOptions: /*+ OPTIONS(key=val [, key=val]*) */ key: stringLiteral val: stringLiteral values: VALUES expression [, expression ]* groupItem: expression | '(' ')' | '(' expression [, expression ]* ')' | CUBE '(' expression [, expression ]* ')' | ROLLUP '(' expression [, expression ]* ')' | GROUPING SETS '(' groupItem [, groupItem ]* ')' windowRef: windowName | windowSpec windowSpec: [ windowName ] '(' [ ORDER BY orderItem [, orderItem ]* ] [ PARTITION BY expression [, expression ]* ] [ RANGE numericOrIntervalExpression {PRECEDING} | ROWS numericExpression {PRECEDING} ] ')' matchRecognize: MATCH_RECOGNIZE '(' [ PARTITION BY expression [, expression ]* ] [ ORDER BY orderItem [, orderItem ]* ] [ MEASURES measureColumn [, measureColumn ]* ] [ ONE ROW PER MATCH ] [ AFTER MATCH ( SKIP TO NEXT ROW | SKIP PAST LAST ROW | SKIP TO FIRST variable | SKIP TO LAST variable | SKIP TO variable ) ] PATTERN '(' pattern ')' [ WITHIN intervalLiteral ] DEFINE variable AS condition [, variable AS condition ]* ')' measureColumn: expression AS alias pattern: patternTerm [ '|' patternTerm ]* patternTerm: patternFactor [ patternFactor ]* patternFactor: variable [ patternQuantifier ] patternQuantifier: '*' | '*?' | '+' | '+?' | '?' | '??' | '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?'] | '{' repeat '}'
Flink SQL 對於標識符(表、屬性、函數名)有類似於 Java 的詞法約定:
- 不管是否引用標識符,都保留標識符的大小寫。
- 且標識符需區分大小寫。
- 與 Java 不一樣的地方在於,通過反引號,可以允許標識符帶有非字母的字符(如:
"SELECT a AS `my field` FROM t")。
字符串文本常量需要被單引號包起來(如 SELECT 'Hello World' )。兩個單引號表示轉移(如 SELECT 'It''s me.')。字符串文本常量支持 Unicode 字符,如需明確使用 Unicode 編碼,請使用以下語法:
- 使用反斜杠(
\)作為轉義字符(默認):SELECT U&'\263A' - 使用自定義的轉義字符:
SELECT U&'#263A' UESCAPE '#'
