Flink基礎(二十八):FLINK-SQL語法(四)DDL(一)CREATE 語句


0 簡介

CREATE 語句用於向當前或指定的 Catalog 中注冊表、視圖或函數。注冊后的表、視圖和函數可以在 SQL 查詢中使用。

目前 Flink SQL 支持下列 CREATE 語句:

  • CREATE TABLE
  • CREATE DATABASE
  • CREATE VIEW
  • CREATE FUNCTION

1 執行 CREATE 語句

可以使用 TableEnvironment 中的 executeSql() 方法執行 CREATE 語句,也可以在 SQL CLI 中執行 CREATE 語句。 若 CREATE 操作執行成功,executeSql() 方法返回 ‘OK’,否則會拋出異常。

以下的例子展示了如何在 TableEnvironment 和 SQL CLI 中執行一個 CREATE 語句。

val settings = EnvironmentSettings.newInstance()...
val tableEnv = TableEnvironment.create(settings)

// 對已注冊的表進行 SQL 查詢
// 注冊名為 “Orders” 的表
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
// 在表上執行 SQL 查詢,並把得到的結果作為一個新的表
val result = tableEnv.sqlQuery(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

// 對已注冊的表進行 INSERT 操作
// 注冊 TableSink
tableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH ('connector.path'='/path/to/file' ...)");
// 在表上執行 INSERT 語句並向 TableSink 發出結果
tableEnv.executeSql(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);
[INFO] Table has been created.

Flink SQL> CREATE TABLE RubberOrders (product STRING, amount INT) WITH (...);
[INFO] Table has been created.

Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';
[INFO] Submitting SQL update statement to the cluster...

2 CREATE TABLE

CREATE TABLE [catalog_name.][db_name.]table_name
  (
    { <column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] ]

<column_definition>:
  column_name column_type [ <column_constraint> ] [COMMENT column_comment]

<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]

<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

<source_table>:
  [catalog_name.][db_name.]table_name

<like_options>:
{
   { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
 | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } 
}[, ...]

根據指定的表名創建一個表,如果同名表已經在 catalog 中存在了,則無法注冊。

COMPUTED COLUMN

計算列是一個使用 “column_name AS computed_column_expression” 語法生成的虛擬列。它由使用同一表中其他列的非查詢表達式生成,並且不會在表中進行物理存儲。例如,一個計算列可以使用 cost AS price * quantity 進行定義,這個表達式可以包含物理列、常量、函數或變量的任意組合,但這個表達式不能存在任何子查詢。

在 Flink 中計算列一般用於為 CREATE TABLE 語句定義 時間屬性。 處理時間屬性 可以簡單地通過使用了系統函數 PROCTIME() 的 proc AS PROCTIME() 語句進行定義。 另一方面,由於事件時間列可能需要從現有的字段中獲得,因此計算列可用於獲得事件時間列。例如,原始字段的類型不是 TIMESTAMP(3) 或嵌套在 JSON 字符串中。

注意:

  • 定義在一個數據源表( source table )上的計算列會在從數據源讀取數據后被計算,它們可以在 SELECT 查詢語句中使用。
  • 計算列不可以作為 INSERT 語句的目標,在 INSERT 語句中,SELECT 語句的 schema 需要與目標表不帶有計算列的 schema 一致。

WATERMARK

WATERMARK 定義了表的事件時間屬性,其形式為 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 。

rowtime_column_name 把一個現有的列定義為一個為表標記事件時間的屬性。該列的類型必須為 TIMESTAMP(3),且是 schema 中的頂層列,它也可以是一個計算列。

watermark_strategy_expression 定義了 watermark 的生成策略。它允許使用包括計算列在內的任意非查詢表達式來計算 watermark ;表達式的返回類型必須是 TIMESTAMP(3),表示了從 Epoch 以來的經過的時間。 返回的 watermark 只有當其不為空且其值大於之前發出的本地 watermark 時才會被發出(以保證 watermark 遞增)。每條記錄的 watermark 生成表達式計算都會由框架完成。 框架會定期發出所生成的最大的 watermark ,如果當前 watermark 仍然與前一個 watermark 相同、為空、或返回的 watermark 的值小於最后一個發出的 watermark ,則新的 watermark 不會被發出。 Watermark 根據 pipeline.auto-watermark-interval 中所配置的間隔發出。 若 watermark 的間隔是 0ms ,那么每條記錄都會產生一個 watermark,且 watermark 會在不為空並大於上一個發出的 watermark 時發出。

使用事件時間語義時,表必須包含事件時間屬性和 watermark 策略。

Flink 提供了幾種常用的 watermark 策略。

  • 嚴格遞增時間戳: WATERMARK FOR rowtime_column AS rowtime_column

    發出到目前為止已觀察到的最大時間戳的 watermark ,時間戳大於最大時間戳的行被認為沒有遲到。

  • 遞增時間戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND

    發出到目前為止已觀察到的最大時間戳減 1 的 watermark ,時間戳大於或等於最大時間戳的行被認為沒有遲到。

  • 有界亂序時間戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit

    發出到目前為止已觀察到的最大時間戳減去指定延遲的 watermark ,例如, WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND 是一個 5 秒延遲的 watermark 策略。

CREATE TABLE Orders (
    user BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );

PRIMARY KEY

主鍵用作 Flink 優化的一種提示信息。主鍵限制表明一張表或視圖的某個(些)列是唯一的並且不包含 Null 值。 主鍵聲明的列都是非 nullable 的。因此主鍵可以被用作表行級別的唯一標識。

主鍵可以和列的定義一起聲明,也可以獨立聲明為表的限制屬性,不管是哪種方式,主鍵都不可以重復定義,否則 Flink 會報錯。

有效性檢查

SQL 標准主鍵限制可以有兩種模式:ENFORCED 或者 NOT ENFORCED。 它申明了是否輸入/出數據會做合法性檢查(是否唯一)。Flink 不存儲數據因此只支持 NOT ENFORCED 模式,即不做檢查,用戶需要自己保證唯一性。

Flink 假設聲明了主鍵的列都是不包含 Null 值的,Connector 在處理數據時需要自己保證語義正確。

Notes: 在 CREATE TABLE 語句中,創建主鍵會修改列的 nullable 屬性,主鍵聲明的列默認都是非 Nullable 的。

PARTITIONED BY

根據指定的列對已經創建的表進行分區。若表使用 filesystem sink ,則將會為每個分區創建一個目錄。

WITH OPTIONS

表屬性用於創建 table source/sink ,一般用於尋找和創建底層的連接器。

表達式 key1=val1 的鍵和值必須為字符串文本常量。請參考 連接外部系統 了解不同連接器所支持的屬性。

注意: 表名可以為以下三種格式 1. catalog_name.db_name.table_name 2. db_name.table_name 3. table_name。使用catalog_name.db_name.table_name 的表將會與名為 “catalog_name” 的 catalog 和名為 “db_name” 的數據庫一起注冊到 metastore 中。使用 db_name.table_name 的表將會被注冊到當前執行的 table environment 中的 catalog 且數據庫會被命名為 “db_name”;對於 table_name, 數據表將會被注冊到當前正在運行的catalog和數據庫中。

注意: 使用 CREATE TABLE 語句注冊的表均可用作 table source 和 table sink。 在被 DML 語句引用前,我們無法決定其實際用於 source 抑或是 sink。

LIKE

LIKE 子句來源於兩種 SQL 特性的變體/組合(Feature T171,“表定義中的 LIKE 語法” 和 Feature T173,“表定義中的 LIKE 語法擴展”)。LIKE 子句可以基於現有表的定義去創建新表,並且可以擴展或排除原始表中的某些部分。與 SQL 標准相反,LIKE 子句必須在 CREATE 語句中定義,並且是基於 CREATE 語句的更上層定義,這是因為 LIKE 子句可以用於定義表的多個部分,而不僅僅是 schema 部分。

你可以使用該子句,重用(或改寫)指定的連接器配置屬性或者可以向外部表添加 watermark 定義,例如可以向 Apache Hive 中定義的表添加 watermark 定義。

示例如下:

CREATE TABLE Orders (
    user BIGINT,
    product STRING,
    order_time TIMESTAMP(3)
) WITH ( 
    'connector' = 'kafka',
    'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE Orders_with_watermark (
    -- 添加 watermark 定義
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    -- 改寫 startup-mode 屬性
    'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;

結果表 Orders_with_watermark 等效於使用以下語句創建的表:

CREATE TABLE Orders_with_watermark (
    user BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector' = 'kafka',
    'scan.startup.mode' = 'latest-offset'
);

表屬性的合並邏輯可以用 like options 來控制。

可以控制合並的表屬性如下:

  • CONSTRAINTS - 主鍵和唯一鍵約束
  • GENERATED - 計算列
  • OPTIONS - 連接器信息、格式化方式等配置項
  • PARTITIONS - 表分區信息
  • WATERMARKS - watermark 定義

並且有三種不同的表屬性合並策略:

  • INCLUDING - 新表包含源表(source table)所有的表屬性,如果和源表的表屬性重復則會直接失敗,例如新表和源表存在相同 key 的屬性。
  • EXCLUDING - 新表不包含源表指定的任何表屬性。
  • OVERWRITING - 新表包含源表的表屬性,但如果出現重復項,則會用新表的表屬性覆蓋源表中的重復表屬性,例如,兩個表中都存在相同 key 的屬性,則會使用當前語句中定義的 key 的屬性值。

並且你可以使用 INCLUDING/EXCLUDING ALL 這種聲明方式來指定使用怎樣的合並策略,例如使用 EXCLUDING ALL INCLUDING WATERMARKS,那么代表只有源表的 WATERMARKS 屬性才會被包含進新表。

示例如下:

-- 存儲在文件系統的源表
CREATE TABLE Orders_in_file (
    user BIGINT,
    product STRING,
    order_time_string STRING,
    order_time AS to_timestamp(order_time)
    
)
PARTITIONED BY user 
WITH ( 
    'connector' = 'filesystem'
    'path' = '...'
);

-- 對應存儲在 kafka 的源表
CREATE TABLE Orders_in_kafka (
    -- 添加 watermark 定義
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector': 'kafka'
    ...
)
LIKE Orders_in_file (
    -- 排除需要生成 watermark 的計算列之外的所有內容。
    -- 去除不適用於 kafka 的所有分區和文件系統的相關屬性。
    EXCLUDING ALL
    INCLUDING GENERATED
);

如果未提供 like 配置項(like options),默認將使用 INCLUDING ALL OVERWRITING OPTIONS 的合並策略。

注意: 您無法選擇物理列的合並策略,當物理列進行合並時就如使用了 INCLUDING 策略。

注意: 源表 source_table 可以是一個組合 ID。您可以指定不同 catalog 或者 DB 的表作為源表: 例如,my_catalog.my_db.MyTable 指定了源表 MyTable 來源於名為 MyCatalog 的 catalog 和名為 my_db 的 DB ,my_db.MyTable 指定了源表 MyTable 來源於當前 catalog 和名為 my_db 的 DB。

3 CREATE CATALOG

CREATE CATALOG catalog_name
  WITH (key1=val1, key2=val2, ...)

Create a catalog with the given catalog properties. If a catalog with the same name already exists, an exception is thrown.

WITH OPTIONS

Catalog properties used to store extra information related to this catalog. The key and value of expression key1=val1 should both be string literal.

Check out more details at Catalogs.

4 CREATE DATABASE

CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
  [COMMENT database_comment]
  WITH (key1=val1, key2=val2, ...)

根據給定的表屬性創建數據庫。若數據庫中已存在同名表會拋出異常。

IF NOT EXISTS

若數據庫已經存在,則不會進行任何操作。

WITH OPTIONS

數據庫屬性一般用於存儲關於這個數據庫額外的信息。 表達式 key1=val1 中的鍵和值都需要是字符串文本常量。

5 CREATE VIEW

CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
  [{columnName [, columnName ]* }] [COMMENT view_comment]
  AS query_expression

根據給定的 query 語句創建一個視圖。若數據庫中已經存在同名視圖會拋出異常.

TEMPORARY

創建一個有 catalog 和數據庫命名空間的臨時視圖,並覆蓋原有的視圖。

IF NOT EXISTS

若該視圖已經存在,則不會進行任何操作。

6 CREATE FUNCTION

CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
  [IF NOT EXISTS] [[catalog_name.]db_name.]function_name
  AS identifier [LANGUAGE JAVA|SCALA|PYTHON]

創建一個有 catalog 和數據庫命名空間的 catalog function ,需要指定一個 identifier ,可指定 language tag 。 若 catalog 中,已經有同名的函數注冊了,則無法注冊。

如果 language tag 是 JAVA 或者 SCALA ,則 identifier 是 UDF 實現類的全限定名。關於 JAVA/SCALA UDF 的實現,請參考 自定義函數

如果 language tag 是 PYTHON ,則 identifier 是 UDF 對象的全限定名,例如 pyflink.table.tests.test_udf.add。關於 PYTHON UDF 的實現,請參考 Python UDFs

TEMPORARY

創建一個有 catalog 和數據庫命名空間的臨時 catalog function ,並覆蓋原有的 catalog function 。

TEMPORARY SYSTEM

創建一個沒有數據庫命名空間的臨時系統 catalog function ,並覆蓋系統內置的函數。

IF NOT EXISTS

若該函數已經存在,則不會進行任何操作。

LANGUAGE JAVA|SCALA|PYTHON

Language tag 用於指定 Flink runtime 如何執行這個函數。目前,只支持 JAVA, SCALA 和 PYTHON,且函數的默認語言為 JAVA。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM