本文翻譯自官網:SQL Client Beta https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sqlClient.html
Flink的Table&SQL API使使用SQL語言編寫的查詢成為可能,但是這些查詢需要嵌入用Java或Scala編寫的表程序中。 此外,在將這些程序提交給集群之前,需要將它們與構建工具打包在一起。 這或多或少地將Flink的使用限制為Java / Scala程序員。
SQL客戶端旨在提供一種簡單的方法來編寫,調試和提交表程序到Flink集群,而無需一行Java或Scala代碼。 SQL Client CLI允許從命令行上正在運行的分布式應用程序檢索和可視化實時結果。
注意:SQL Client 處於早期開發階段。 即該應用程序尚未投入生產,它對於原型制作和使用Flink SQL還是一個非常有用的工具。 將來,社區計划通過提供基於 REST 的 SQL Client Gateway 來擴展其功能。
入門
本節介紹如何從命令行設置和運行第一個Flink SQL程序。
SQL客戶端捆綁在常規的Flink發行版中,因此可以直接運行。它只需要一個運行中的Flink集群即可在其中執行表程序。有關設置Flink群集的更多信息,請參見“ 群集和部署”部分。如果只想試用SQL Client,也可以使用以下命令以一個工作程序啟動本地群集:
./bin/start-cluster.sh
啟動SQL客戶端CLI
SQL Client腳本也位於Flink的二進制目錄中。將來,用戶可以通過啟動嵌入式獨立進程或連接到遠程SQL Client Gateway來啟動SQL Client CLI的兩種可能性。目前僅embedded
支持該模式。您可以通過以下方式啟動CLI:
./bin/sql-client.sh embedded
默認情況下,SQL客戶端將從位於中的環境文件中讀取其配置./conf/sql-client-defaults.yaml
。有關環境文件的結構的更多信息,請參見配置部分。
運行SQL查詢
CLI啟動后,您可以使用HELP
命令列出所有可用的SQL語句。為了驗證您的設置和集群連接,您可以輸入第一個SQL查詢,輸入Enter
按鍵執行它:
SELECT 'Hello World';
該查詢不需要表源,並且只產生一行結果。 CLI將從群集中檢索結果並將其可視化。 您可以通過按Q鍵關閉結果視圖。
CLI支持兩種用於維護和可視化結果的模式。
表格模式將結果具體化到內存中,並以規則的分頁表格表示形式將其可視化。 可以通過在CLI中執行以下命令來啟用它:
SET execution.result-mode=table;
更改日志模式不會具體化結果,並且無法可視化由包含插入(+)和撤回(-)的連續查詢產生的結果流。
SET execution.result-mode=changelog;
您可以使用以下查詢來查看兩種結果模式的運行情況:
SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
此查詢執行一個有限字數示例。
在變更日志模式下,可視化的變更日志類似於:
+ Bob, 1 + Alice, 1 + Greg, 1 - Bob, 1 + Bob, 2
在表格模式下,可視化結果表格將不斷更新,直到表格程序以以下內容結束:
Bob, 2 Alice, 1 Greg, 1
這兩種結果模式在SQL查詢的原型制作過程中都非常有用。 在這兩種模式下,結果都存儲在SQL Client的Java堆內存中。 為了保持CLI界面的響應性,更改日志模式僅顯示最新的1000個更改。 表格模式允許瀏覽更大的結果,這些結果僅受可用主存儲器和配置的最大行數(最大表結果行)限制。
注意: 在批處理環境中執行的查詢只能使用表結果模式來檢索。
定義查詢后,可以將其作為長期運行的獨立Flink作業提交給集群。 為此,需要使用INSERT INTO語句指定存儲結果的目標系統。 配置部分說明如何聲明用於讀取數據的表源,如何聲明用於寫入數據的表接收器以及如何配置其他表程序屬性。
配置
可以使用以下可選的CLI命令啟動SQL Client。 在隨后的段落中將詳細討論它們。
./bin/sql-client.sh embedded --help Mode "embedded" submits Flink jobs from the local machine. Syntax: embedded [OPTIONS] "embedded" mode options: -d,--defaults <environment file> The environment properties with which every new session is initialized. Properties might be overwritten by session properties. -e,--environment <environment file> The environment properties to be imported into the session. It might overwrite default environment properties. -h,--help Show the help message with descriptions of all options. -j,--jar <JAR file> A JAR file to be imported into the session. The file might contain user-defined classes needed for the execution of statements such as functions, table sources, or sinks. Can be used multiple times. -l,--library <JAR directory> A JAR file directory with which every new session is initialized. The files might contain user-defined classes needed for the execution of statements such as functions, table sources, or sinks. Can be used multiple times. -s,--session <session identifier> The identifier for a session. 'default' is the default identifier.
環境文件
SQL查詢需要在其中執行配置環境。所謂的 環境文件 定義了可用的目錄,表 source 和 sink,用戶定義的函數以及執行和部署所需的其他屬性。
每個環境文件都是常規的YAML文件。下面提供了此類文件的示例。
# Define tables here such as sources, sinks, views, or temporal tables. tables: - name: MyTableSource type: source-table update-mode: append connector: type: filesystem path: "/path/to/something.csv" format: type: csv fields: - name: MyField1 type: INT - name: MyField2 type: VARCHAR line-delimiter: "\n" comment-prefix: "#" schema: - name: MyField1 type: INT - name: MyField2 type: VARCHAR - name: MyCustomView type: view query: "SELECT MyField2 FROM MyTableSource" # Define user-defined functions here. functions: - name: myUDF from: class class: foo.bar.AggregateUDF constructor: - 7.6 - false # Define available catalogs catalogs: - name: catalog_1 type: hive property-version: 1 hive-conf-dir: ... - name: catalog_2 type: hive property-version: 1 default-database: mydb2 hive-conf-dir: ... hive-version: 1.2.1 # Properties that change the fundamental execution behavior of a table program. execution: planner: old # optional: either 'old' (default) or 'blink' type: streaming # required: execution mode either 'batch' or 'streaming' result-mode: table # required: either 'table' or 'changelog' max-table-result-rows: 1000000 # optional: maximum number of maintained rows in # 'table' mode (1000000 by default, smaller 1 means unlimited) time-characteristic: event-time # optional: 'processing-time' or 'event-time' (default) parallelism: 1 # optional: Flink's parallelism (1 by default) periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default) max-parallelism: 16 # optional: Flink's maximum parallelism (128 by default) min-idle-state-retention: 0 # optional: table program's minimum idle state time max-idle-state-retention: 0 # optional: table program's maximum idle state time current-catalog: catalog_1 # optional: name of the current catalog of the session ('default_catalog' by default) current-database: mydb1 # optional: name of the current database of the current catalog # (default database of the current catalog by default) restart-strategy: # optional: restart strategy type: fallback # "fallback" to global restart strategy by default # Configuration options for adjusting and tuning table programs. # A full list of options and their default values can be found # on the dedicated "Configuration" page. configuration: table.optimizer.join-reorder-enabled: true table.exec.spill-compression.enabled: true table.exec.spill-compression.block-size: 128kb # Properties that describe the cluster to which table programs are submitted to. deployment: response-timeout: 5000
配置:
- 使用表源MyTableSource定義環境,該表源從CSV文件讀取
- 定義一個視圖MyCustomView,該視圖使用SQL查詢聲明一個虛擬表
- 定義一個用戶定義的函數myUDF,該函數可以使用類名和兩個構造函數參數進行實例化,
- 連接到兩個Hive catalog,並使用catalog_1作為當前 catalog,使用mydb1作為該 catalog 的當前數據庫
- 使用舊 planner 以流模式運行具有事件時間特征和並行度為1的語句
- 在表結果模式下運行探索性查詢
- 並通過配置選項圍繞聯接的重新排序和溢出進行一些 planner 調整。
根據使用情況,可以將配置拆分為多個文件。 因此,可以出於一般目的(使用--defaults使用默認環境文件)以及基於每個會話(使用--environment使用會話環境文件)來創建環境文件。 每個CLI會話均使用默認屬性初始化,后跟會話屬性。 例如,默認環境文件可以指定在每個會話中都可用於查詢的所有表源,而會話環境文件僅聲明特定的狀態保留時間和並行性。 啟動CLI應用程序時,可以傳遞默認環境文件和會話環境文件。 如果未指定默認環境文件,則SQL客戶端會在Flink的配置目錄中搜索./conf/sql-client-defaults.yaml。
注意:在CLI會話中設置的屬性(例如,使用SET命令)具有最高優先級:
CLI commands > session environment file > defaults environment file
重啟策略
重啟策略控制在發生故障時如何重新啟動Flink作業。 與Flink群集的全局重啟策略類似,可以在環境文件中聲明更細粒度的重啟配置。
支持以下策略:
execution: # falls back to the global strategy defined in flink-conf.yaml restart-strategy: type: fallback # job fails directly and no restart is attempted restart-strategy: type: none # attempts a given number of times to restart the job restart-strategy: type: fixed-delay attempts: 3 # retries before job is declared as failed (default: Integer.MAX_VALUE) delay: 10000 # delay in ms between retries (default: 10 s) # attempts as long as the maximum number of failures per time interval is not exceeded restart-strategy: type: failure-rate max-failures-per-interval: 1 # retries in interval until failing (default: 1) failure-rate-interval: 60000 # measuring interval in ms for failure rate delay: 10000 # delay in ms between retries (default: 10 s)
依賴關系
SQL客戶端不需要使用Maven或SBT設置Java項目。 相反,您可以將依賴項作為常規JAR文件傳遞,然后將其提交給集群。 您可以單獨指定每個JAR文件(使用--jar),也可以定義整個庫目錄(使用--library)。 對於外部系統(例如Apache Kafka)和相應數據格式(例如JSON)的連接器,Flink提供了現成的JAR捆綁包。 可以從Maven中央存儲庫為每個發行版下載這些JAR文件。
提供的SQL JAR的完整列表以及有關如何使用它們的文檔可以在與外部系統的連接頁面上找到。
以下示例顯示了一個環境文件,該文件定義了一個表源,該表源從Apache Kafka讀取JSON數據。
tables: - name: TaxiRides type: source-table update-mode: append connector: property-version: 1 type: kafka version: "0.11" topic: TaxiRides startup-mode: earliest-offset properties: - key: zookeeper.connect value: localhost:2181 - key: bootstrap.servers value: localhost:9092 - key: group.id value: testGroup format: property-version: 1 type: json schema: "ROW<rideId LONG, lon FLOAT, lat FLOAT, rideTime TIMESTAMP>" schema: - name: rideId type: LONG - name: lon type: FLOAT - name: lat type: FLOAT - name: rowTime type: TIMESTAMP rowtime: timestamps: type: "from-field" from: "rideTime" watermarks: type: "periodic-bounded" delay: "60000" - name: procTime type: TIMESTAMP proctime: true
TaxiRide表的結果模式包含JSON模式的大多數字段。 此外,它添加了行時間屬性rowTime和處理時間屬性procTime。
連接器和格式都允許定義屬性版本(當前為版本1),以便將來向后兼容。
用戶定義的函數
SQL客戶端允許用戶創建要在SQL查詢中使用的自定義用戶定義函數。 當前,這些函數僅限於以編程方式在Java / Scala類中定義。
為了提供用戶定義的函數,您需要首先實現並編譯擴展ScalarFunction,AggregateFunction或TableFunction的函數類(請參閱用戶定義的函數)。 然后可以將一個或多個函數打包到SQL客戶端的依賴項JAR中。
在調用之前,必須在環境文件中聲明所有函數。 對於函數列表中的每一項,必須指定
- 函數注冊的名稱
- 使用的函數源(目前僅限於類)
- 指示函數的完全限定的類名稱的類,以及用於實例化的可選構造函數參數列表。
functions: - name: ... # required: name of the function from: class # required: source of the function (can only be "class" for now) class: ... # required: fully qualified class name of the function constructor: # optimal: constructor parameters of the function class - ... # optimal: a literal parameter with implicit type - class: ... # optimal: full class name of the parameter constructor: # optimal: constructor parameters of the parameter's class - type: ... # optimal: type of the literal parameter value: ... # optimal: value of the literal parameter
確保指定參數的順序和類型嚴格匹配函數類的構造函數之一。
函數構造參數
根據用戶定義的函數,可能有必要在SQL語句中使用實現之前對其進行參數化。
如前面的示例所示,在聲明用戶定義的函數時,可以通過以下三種方式之一使用構造函數參數來配置類:
具有隱式類型的文字值: SQL Client將根據文字值本身自動派生類型。目前,僅支持 BOOLEAN
,INT
,DOUBLE
和VARCHAR
。如果自動派生無法按預期進行(例如,您需要VARCHAR false
),請改用顯式類型。
- true # -> BOOLEAN (case sensitive) - 42 # -> INT - 1234.222 # -> DOUBLE - foo # -> VARCHAR
具有顯式類型的文字值:使用類型屬性顯式聲明參數type
和value
屬性。
- type: DECIMAL
value: 11111111111111111
下表說明了受支持的Java參數類型和相應的SQL類型字符串。
Java type | SQL type |
---|---|
java.math.BigDecimal |
DECIMAL |
java.lang.Boolean |
BOOLEAN |
java.lang.Byte |
TINYINT |
java.lang.Double |
DOUBLE |
java.lang.Float |
REAL , FLOAT |
java.lang.Integer |
INTEGER , INT |
java.lang.Long |
BIGINT |
java.lang.Short |
SMALLINT |
java.lang.String |
VARCHAR |
目前尚不支持更多類型(例如TIMESTAMP或ARRAY),基本類型和null。
(嵌套的)類實例:除了文字值,還可以通過指定class
和constructor
屬性來為構造函數參數創建(嵌套的)類實例。可以遞歸執行此過程,直到所有構造函數參數都用文字值表示為止。
- class: foo.bar.paramClass constructor: - StarryName - class: java.lang.Integer constructor: - class: java.lang.String constructor: - type: VARCHAR value: 3
Catalogs
可以將 catalog 定義為一組YAML屬性,並在啟動SQL Client時自動將其注冊到環境中。
用戶可以在SQL CLI中指定當前 catalog ,以及要用作當前數據庫的 catalog 的數據庫。
catalogs: - name: catalog_1 type: hive property-version: 1 default-database: mydb2 hive-version: 1.2.1 hive-conf-dir: <path of Hive conf directory> - name: catalog_2 type: hive property-version: 1 hive-conf-dir: <path of Hive conf directory> execution: ... current-catalog: catalog_1 current-database: mydb1
有關catalog 的更多信息,請參見 catalog。
分離的SQL查詢
為了定義端到端的SQL管道,可以使用SQL的INSERT INTO語句向Flink集群提交長時間運行的分離查詢。 這些查詢將其結果生成到外部系統而不是SQL Client中。 這允許處理更高的並行度和更大數量的數據。 提交后,CLI本身對分離的查詢沒有任何控制權。
INSERT INTO MyTableSink SELECT * FROM MyTableSource
表接收器MyTableSink必須在環境文件中聲明。 有關支持的外部系統及其配置的更多信息,請參見連接頁面。 下面顯示了Apache Kafka table sink 的示例。
tables: - name: MyTableSink type: sink-table update-mode: append connector: property-version: 1 type: kafka version: "0.11" topic: OutputTopic properties: - key: zookeeper.connect value: localhost:2181 - key: bootstrap.servers value: localhost:9092 - key: group.id value: testGroup format: property-version: 1 type: json derive-schema: true schema: - name: rideId type: LONG - name: lon type: FLOAT - name: lat type: FLOAT - name: rideTime type: TIMESTAMP
SQL客戶端確保語句已成功提交到群集。 提交查詢后,CLI將顯示有關Flink作業的信息。
[INFO] Table update statement has been successfully submitted to the cluster: Cluster ID: StandaloneClusterId Job ID: 6f922fe5cba87406ff23ae4a7bb79044 Web interface: http://localhost:8081
注意:提交后,SQL客戶端不會跟蹤正在運行的Flink作業的狀態。 提交后可以關閉CLI進程,而不會影響分離的查詢。 Flink的重啟策略可確保容錯能力。 可以使用Flink的 web 界面,命令行或REST API取消查詢。
SQL視圖
視圖允許通過SQL查詢定義虛擬表。 視圖定義被立即解析和驗證。 但是,實際執行是在提交常規INSERT INTO或SELECT語句期間訪問視圖時發生的。
可以在環境文件中或在CLI會話中定義視圖。
以下示例顯示如何在一個文件中定義多個視圖。 按照在環境文件中定義的順序注冊視圖。 支持諸如視圖A依賴於視圖B依賴於視圖C的引用鏈。
tables: - name: MyTableSource # ... - name: MyRestrictedView type: view query: "SELECT MyField2 FROM MyTableSource" - name: MyComplexView type: view query: > SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR) FROM MyTableSource WHERE MyField2 > 200
與表源和接收器相似,會話環境文件中定義的視圖具有最高優先級。
也可以使用CREATE VIEW語句在CLI會話中創建視圖:
CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource;
也可以使用DROP VIEW語句刪除在CLI會話中創建的視圖:
DROP VIEW MyNewView;
注意:CLI中視圖的定義僅限於上述語法。 將來的版本將支持為視圖定義表名或在表名中轉義空格。
時態表
時態表允許在變化的歷史記錄表上進行(參數化)視圖,該視圖返回表在特定時間點的內容。 這對於在特定時間戳將一個表與另一個表的內容連接起來特別有用。 在時態表聯接頁面中可以找到更多信息。
以下示例顯示如何定義時態表SourceTemporalTable:
tables: # Define the table source (or view) that contains updates to a temporal table - name: HistorySource type: source-table update-mode: append connector: # ... format: # ... schema: - name: integerField type: INT - name: stringField type: VARCHAR - name: rowtimeField type: TIMESTAMP rowtime: timestamps: type: from-field from: rowtimeField watermarks: type: from-source # Define a temporal table over the changing history table with time attribute and primary key - name: SourceTemporalTable type: temporal-table history-table: HistorySource primary-key: integerField time-attribute: rowtimeField # could also be a proctime field
如示例中所示,表源,視圖和時態表的定義可以相互混合。 按照在環境文件中定義的順序注冊它們。 例如,時態表可以引用一個視圖,該視圖可以依賴於另一個視圖或表源。
局限與未來
當前的SQL Client實現處於非常早期的開發階段,作為更大的Flink改進提案24(FLIP-24)的一部分,將來可能會更改。隨時加入有關您發現有用的錯誤和功能的討論並公開發表問題。
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文