Flink基礎(二十五):FLINK-SQL語法 (一)DQL(一)查詢語句(一)基本查詢


來源:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html

0 簡介

SELECT 語句和 VALUES 語句需要使用 TableEnvironment 的 sqlQuery() 方法加以指定。這個方法會以 Table 的形式返回 SELECT (或 VALUE)的查詢結果。Table 可以被用於 隨后的SQL 與 Table API 查詢 、 轉換為 DataSet 或 DataStream 或 輸出到 TableSink 。SQL 與 Table API 的查詢可以進行無縫融合、整體優化並翻譯為單一的程序。

為了可以在 SQL 查詢中訪問到表,你需要先 在 TableEnvironment 中注冊表 。表可以通過 TableSource、 TableCREATE TABLE 語句、 DataStream 或 DataSet 注冊。 用戶也可以通過 向 TableEnvironment 中注冊 catalog 的方式指定數據源的位置。

為方便起見 Table.toString() 將會在其 TableEnvironment 中自動使用一個唯一的名字注冊表並返回表名。 因此, Table 對象可以如下文所示樣例,直接內聯到 SQL 語句中。

注意: 查詢若包括了不支持的 SQL 特性,將會拋出 TableException。批處理和流處理所支持的 SQL 特性將會在下述章節中列出。

1 指定查詢

以下示例顯示如何在已注冊和內聯表上指定 SQL 查詢。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

//  從外部數據源讀取 DataStream 
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)

// 使用 SQL 查詢內聯的(未注冊的)表
val table = ds.toTable(tableEnv, $"user", $"product", $"amount")
val result = tableEnv.sqlQuery(
  s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")

// 使用名稱 "Orders" 注冊一個 DataStream 
tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount")
// 在表上執行 SQL 查詢並得到以新表返回的結果
val result2 = tableEnv.sqlQuery(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

// 創建並注冊一個 TableSink
val schema = new Schema()
    .field("product", DataTypes.STRING())
    .field("amount", DataTypes.INT())

tableEnv.connect(new FileSystem().path("/path/to/file"))
    .withFormat(...)
    .withSchema(schema)
    .createTemporaryTable("RubberOrders")

// 在表上執行插入操作,並把結果發出到 TableSink
tableEnv.executeSql(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

2 執行查詢

SELECT 語句或者 VALUES 語句可以通過 TableEnvironment.executeSql() 方法來執行,將選擇的結果收集到本地。該方法返回 TableResult 對象用於包裝查詢的結果。和 SELECT 語句很像,一個 Table 對象可以通過 Table.execute() 方法執行從而將 Table 的內容收集到本地客戶端。 TableResult.collect() 方法返回一個可以關閉的行迭代器。除非所有的數據都被收集到本地,否則一個查詢作業永遠不會結束。所以我們應該通過 CloseableIterator#close() 方法主動地關閉作業以防止資源泄露。 我們還可以通過 TableResult.print() 方法將查詢結果打印到本地控制台。TableResult 中的結果數據只能被訪問一次,因此一個 TableResult 實例中,collect() 方法和 print() 方法不能被同時使用。

對於流模式,TableResult.collect() 方法或者 TableResult.print 方法保證端到端精確一次的數據交付。這就要求開啟 checkpointing。默認情況下 checkpointing 是禁止的,我們可以通過 TableConfig 設置 checkpointing 相關屬性(請參考 checkpointing 配置)來開啟 checkpointing。 因此一條結果數據只有在其對應的 checkpointing 完成后才能在客戶端被訪問。

注意: 對於流模式,當前僅支持追加模式的查詢語句,並且應該開啟 checkpoint。因為一條結果只有在其對應的 checkpoint 完成之后才能被客戶端訪問到。

val env = StreamExecutionEnvironment.getExecutionEnvironment()
val tableEnv = StreamTableEnvironment.create(env, settings)
// enable checkpointing
tableEnv.getConfig.getConfiguration.set(
  ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
tableEnv.getConfig.getConfiguration.set(
  ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10))

tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")

// execute SELECT statement
val tableResult1 = tableEnv.executeSql("SELECT * FROM Orders")
val it = tableResult1.collect()
try while (it.hasNext) {
  val row = it.next
  // handle row
}
finally it.close() // close the iterator to avoid resource leak

// execute Table
val tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute()
tableResult2.print()

3 語法

Flink 通過支持標准 ANSI SQL的 Apache Calcite 解析 SQL。

以下 BNF-語法 描述了批處理和流處理查詢中所支持的 SQL 特性的超集。其中 操作符 章節展示了所支持的特性的樣例,並指明了哪些特性僅適用於批處理或流處理。

query:
  values
  | {
      select
      | selectWithoutFrom
      | query UNION [ ALL ] query
      | query EXCEPT query
      | query INTERSECT query
    }
    [ ORDER BY orderItem [, orderItem ]* ]
    [ LIMIT { count | ALL } ]
    [ OFFSET start { ROW | ROWS } ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]

orderItem:
  expression [ ASC | DESC ]

select:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }
  FROM tableExpression
  [ WHERE booleanExpression ]
  [ GROUP BY { groupItem [, groupItem ]* } ]
  [ HAVING booleanExpression ]
  [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]

selectWithoutFrom:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }

projectItem:
  expression [ [ AS ] columnAlias ]
  | tableAlias . *

tableExpression:
  tableReference [, tableReference ]*
  | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition:
  ON booleanExpression
  | USING '(' column [, column ]* ')'

tableReference:
  tablePrimary
  [ matchRecognize ]
  [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]

tablePrimary:
  [ TABLE ] [ [ catalogName . ] schemaName . ] tableName [ dynamicTableOptions ]
  | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  | UNNEST '(' expression ')'

dynamicTableOptions:
  /*+ OPTIONS(key=val [, key=val]*) */

key:
  stringLiteral

val:
  stringLiteral

values:
  VALUES expression [, expression ]*

groupItem:
  expression
  | '(' ')'
  | '(' expression [, expression ]* ')'
  | CUBE '(' expression [, expression ]* ')'
  | ROLLUP '(' expression [, expression ]* ')'
  | GROUPING SETS '(' groupItem [, groupItem ]* ')'

windowRef:
    windowName
  | windowSpec

windowSpec:
    [ windowName ]
    '('
    [ ORDER BY orderItem [, orderItem ]* ]
    [ PARTITION BY expression [, expression ]* ]
    [
        RANGE numericOrIntervalExpression {PRECEDING}
      | ROWS numericExpression {PRECEDING}
    ]
    ')'

matchRecognize:
      MATCH_RECOGNIZE '('
      [ PARTITION BY expression [, expression ]* ]
      [ ORDER BY orderItem [, orderItem ]* ]
      [ MEASURES measureColumn [, measureColumn ]* ]
      [ ONE ROW PER MATCH ]
      [ AFTER MATCH
            ( SKIP TO NEXT ROW
            | SKIP PAST LAST ROW
            | SKIP TO FIRST variable
            | SKIP TO LAST variable
            | SKIP TO variable )
      ]
      PATTERN '(' pattern ')'
      [ WITHIN intervalLiteral ]
      DEFINE variable AS condition [, variable AS condition ]*
      ')'

measureColumn:
      expression AS alias

pattern:
      patternTerm [ '|' patternTerm ]*

patternTerm:
      patternFactor [ patternFactor ]*

patternFactor:
      variable [ patternQuantifier ]

patternQuantifier:
      '*'
  |   '*?'
  |   '+'
  |   '+?'
  |   '?'
  |   '??'
  |   '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
  |   '{' repeat '}'

Flink SQL 對於標識符(表、屬性、函數名)有類似於 Java 的詞法約定:

  • 不管是否引用標識符,都保留標識符的大小寫。
  • 且標識符需區分大小寫。
  • 與 Java 不一樣的地方在於,通過反引號,可以允許標識符帶有非字母的字符(如:"SELECT a AS `my field` FROM t")。

字符串文本常量需要被單引號包起來(如 SELECT 'Hello World' )。兩個單引號表示轉移(如 SELECT 'It''s me.')。字符串文本常量支持 Unicode 字符,如需明確使用 Unicode 編碼,請使用以下語法:

  • 使用反斜杠(\)作為轉義字符(默認):SELECT U&'\263A'
  • 使用自定義的轉義字符: SELECT U&'#263A' UESCAPE '#'

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM