本文翻譯自官網:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html
Table API和SQL集成在共同API中。該API的中心概念是Table
,用作查詢的輸入和輸出。本文檔介紹了使用Table API和SQL查詢的程序的通用結構,如何注冊 Table
,如何查詢Table
以及如何發出 Table(數據)
。
- 兩個 planner 之間的主要區別
- 表API和SQL程序的結構
- 創建一個TableEnvironment
- 在 Catalog 中注冊表
- 注冊擴展 Catalog
- 查詢表
- 發出表(數據)
- 翻譯並執行查詢
- 與DataStream和DataSet API集成
- 查詢優化
兩個 planner 之間的主要區別
- Blink將批處理作業視為流的特殊情況。因此,還不支持Table和DataSet之間的轉換,並且批處理作業不會轉換成DateSet ,而是像流作業一樣轉換為
DataStream
程序。 - Blink planner 不支持
BatchTableSource
,而是使用bounded StreamTableSource 代替。 - Blink planner 僅支持新的
Catalog
,不支持ExternalCatalog 它是
不推薦使用的。 - 為 old planner 和 blink planner 實現的 FilterableTableSource 是不相容的。old planner 會將
PlannerExpression
s 下推到FilterableTableSource
,而 blink planner 將 下推Expression
s (不懂什么意思:The old planner will push downPlannerExpression
s intoFilterableTableSource
, while the Blink planner will push downExpression
s.) 。 - 基於字符串的鍵值配置選項(有關詳細信息,請參閱有關配置的文檔)僅用於Blink planner。
- 兩個 planner 的實現(
CalciteConfig
)PlannerConfig
不同。 - Blink planner 會將多個接收器優化為一個DAG(僅在TableEnvironment上支持,而不在StreamTableEnvironment上支持)。old planner 將始終將每個接收器優化為一個新的DAG,其中所有DAG彼此獨立。
- old planner 現在不支持catalog 統計信息,而Blink planner 則支持。
表API和SQL程序的結構
用於批處理和流式傳輸的所有Table API和SQL程序都遵循相同的模式。以下代碼示例顯示了Table API和SQL程序的通用結構。
// create a TableEnvironment for specific planner batch or streaming val tableEnv = ... // see "Create a TableEnvironment" section // register a Table tableEnv.registerTable("table1", ...) // or tableEnv.registerTableSource("table2", ...) // or tableEnv.registerExternalCatalog("extCat", ...) // register an output Table tableEnv.registerTableSink("outputTable", ...); // create a Table from a Table API query val tapiResult = tableEnv.scan("table1").select(...) // create a Table from a SQL query val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...") // emit a Table API result Table to a TableSink, same for SQL result tapiResult.insertInto("outputTable") // execute tableEnv.execute("scala_job")
注意:表API和SQL查詢可以輕松地與DataStream或DataSet程序集成並嵌入其中。請參閱與DataStream和DataSet API集成,以了解如何將DataStream和DataSet轉換為Tables,反之亦然。
創建一個TableEnvironment
TableEnvironment
是Table API和SQL集成的中心概念。它負責:
Table
在內部catalog中注冊- 注冊外部catalog
- 執行SQL查詢
- 注冊用戶定義的(標量,表或聚合)函數
- 將
DataStream
或DataSet
轉換為Table
- 持有對
ExecutionEnvironment
或StreamExecutionEnvironment的引用
Table
始終綁定到特定的TableEnvironment
。不可能在同一查詢中組合不同TableEnvironments的表,例如,join 或union 它們。
TableEnvironment
是通過調用StreamExecutionEnvironment
或ExecutionEnvironment的
靜態方法 BatchTableEnvironment.create()
或StreamTableEnvironment.create()與可選的
。該TableConfig
創建的TableConfig
可用於配置TableEnvironment
或定制查詢優化和翻譯過程(參見查詢優化)。
請務必選擇特定的planner BatchTableEnvironment
/ StreamTableEnvironment 與
你的編程語言相匹配。
如果兩個planner jar都在類路徑上(默認行為),則應明確設置要在當前程序中使用的planner 。
// ********************** // FLINK STREAMING QUERY // ********************** import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala.StreamTableEnvironment val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build() val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings) // or val fsTableEnv = TableEnvironment.create(fsSettings) // ****************** // FLINK BATCH QUERY // ****************** import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.table.api.scala.BatchTableEnvironment val fbEnv = ExecutionEnvironment.getExecutionEnvironment val fbTableEnv = BatchTableEnvironment.create(fbEnv) // ********************** // BLINK STREAMING QUERY // ********************** import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala.StreamTableEnvironment val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) // or val bsTableEnv = TableEnvironment.create(bsSettings) // ****************** // BLINK BATCH QUERY // ****************** import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build() val bbTableEnv = TableEnvironment.create(bbSettings)
注意:如果/lib
目錄中只有一個計划器jar ,則可以使用useAnyPlanner
(use_any_planner
對於python)創建specific EnvironmentSettings
。
在catalog中注冊表
TableEnvironment
維護按名稱注冊的表的catalog。表有兩種類型,輸入表和輸出表。可以在Table API和SQL查詢中引用輸入表並提供輸入數據。輸出表可用於將表API或SQL查詢的結果發送到外部系統。
輸入表可以從各種source 進行注冊:
- 現有
Table
對象,通常是Table API或SQL查詢的結果。 TableSource
,用於訪問外部數據,例如文件,數據庫或消息系統。DataStream
或DataSet
從數據流(僅適用於流作業)或數據集(僅適用於old planner 轉換批處理作業)程序。DataStream和DataSet API的集成部分中討論了注冊DataStream
或DataSet。
可以使用TableSink來注冊輸出表。
注冊表格
在TableEnvironment中注冊Table如下:
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section // table is the result of a simple projection query val projTable: Table = tableEnv.scan("X").select(...) // register the Table projTable as table "projectedTable" tableEnv.registerTable("projectedTable", projTable)
注意:注冊Table
的處理方式與關系數據庫系統中的VIEW相似,即,定義的查詢Table
未經過優化,但是當另一個查詢引用已注冊的查詢時將內聯Table
。如果多個查詢引用同一個已注冊的查詢Table
,則將為每個引用查詢內聯該查詢並執行多次,即Table
將不會共享已注冊的結果。
注冊TableSource
TableSource
提供對存儲 在存儲系統(例如數據庫(MySQL,HBase等)具有特定編碼的文件(CSV,Apache [Parquet,Avro,ORC]等)或消息系統(Apache Kafka,RabbitMQ等)中的外部數據的訪問。
Flink旨在為常見的數據格式和存儲系統提供TableSources。請查看“ 表源和接收器”頁面,以獲取受支持的TableSource的列表以及如何構建自定義的TableSource
。
在TableEnvironment中注冊TableSource 如下:
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section // create a TableSource val csvSource: TableSource = new CsvTableSource("/path/to/file", ...) // register the TableSource as table "CsvTable" tableEnv.registerTableSource("CsvTable", csvSource)
注:TableEnvironment
用於blink planner 只接受StreamTableSource
,LookupableTableSource和
InputFormatTableSource,
StreamTableSource
用於blink planner必須是有界的。
注冊TableSink
TableSink
可以使用注冊的表將Table API或SQL查詢的結果發送到外部存儲系統,例如數據庫,鍵值存儲(系統),消息隊列或文件系統(采用不同的編碼,例如CSV,Apache [Parquet ,Avro,ORC],…)。
Flink旨在為常見的數據格式和存儲系統提供TableSink。請參閱有關“ 表源和接收器”頁面的文檔,以獲取有關可用接收器的詳細信息以及如何實現自定義的TableSink
。
在TableEnvironment中注冊TableSink 如下:
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section // create a TableSink val csvSink: TableSink = new CsvTableSink("/path/to/file", ...) // define the field names and types val fieldNames: Array[String] = Array("a", "b", "c") val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG) // register the TableSink as table "CsvSinkTable" tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
注冊外部catalog
外部catalog可以提供有關外部數據庫和表的信息,例如它們的名稱,結構,統計信息,以及有關如何訪問存儲在外部數據庫、表或文件中的數據的信息。
可以通過實現ExternalCatalog接口來創建外部catalog,並在TableEnvironment中對其進行注冊,如下所示:
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section // create an external catalog val catalog: ExternalCatalog = new InMemoryExternalCatalog // register the ExternalCatalog catalog tableEnv.registerExternalCatalog("InMemCatalog", catalog)
在TableEnvironment中注冊后,可以通過指定表的完整路徑(例如catalog.database.table)從Table API或SQL查詢中訪問在ExternalCatalog中定義的所有表。
目前,Flink提供了一個InMemoryExternalCatalog
用於演示和測試目的的工具。但是,該ExternalCatalog
接口還可用於 將HCatalog或Metastore之類的catalog連接到Table API。
注意:blink planner不支持外部catalog。
查詢表
表API
Table API是用於Scala和Java的語言集成查詢API。與SQL相反,查詢未指定為字符串,而是以宿主語言逐步構成。
該API基於Table類,Table類代表一個表(流或批的),並提供應用關系操作的方法。 這些方法返回一個新的Table對象,該對象表示對輸入Table應用關系操作的結果。 某些關系操作由多個方法調用組成,例如table.groupBy(...)select(),其中groupBy(...)指定表的分組,並select(...)分組的投影表。
Table API文檔描述了流和批處理表支持的所有Table API操作。
以下示例顯示了一個簡單的Table API聚合查詢:
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section // register Orders table // scan registered Orders table val orders = tableEnv.scan("Orders") // compute revenue for all customers from France val revenue = orders .filter('cCountry === "FRANCE") .groupBy('cID, 'cName) .select('cID, 'cName, 'revenue.sum AS 'revSum) // emit or convert Table // execute query
注意: Scala Table API使用Scala符號,這些符號以單個記號('
)開頭來引用的Table屬性。Table API使用Scala隱式轉換。為了使用Scala隱式轉換確保導入了 org.apache.flink.api.scala._ 和
org.apache.flink.table.api.scala._ 。
SQL
Flink的SQL集成基於實現SQL標准的Apache Calcite。SQL查詢被指定為常規字符串。
SQL文件描述flink SQL支持的流和批的表。
以下示例說明如何指定查詢並以返回用表 表示的結果。
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section // register Orders table // compute revenue for all customers from France val revenue = tableEnv.sqlQuery(""" |SELECT cID, cName, SUM(revenue) AS revSum |FROM Orders |WHERE cCountry = 'FRANCE' |GROUP BY cID, cName """.stripMargin) // emit or convert Table // execute query
下面的示例演示如何指定將更新查詢的結果插入到已注冊表中。
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section // register "Orders" table // register "RevenueFrance" output table // compute revenue for all customers from France and emit to "RevenueFrance" tableEnv.sqlUpdate(""" |INSERT INTO RevenueFrance |SELECT cID, cName, SUM(revenue) AS revSum |FROM Orders |WHERE cCountry = 'FRANCE' |GROUP BY cID, cName """.stripMargin) // execute query
混合 Table API和SQL
Table API和SQL查詢可以輕松混合,因為它們都返回Table
對象:
- 可以在
Table API 的查詢可以定義在
SQL查詢返回的Table對象上。 - 通過在TableEnvironment中注冊結果表並在SQL查詢的FROM子句中引用它,可以對Table API查詢的結果定義SQL查詢。(好繞:A SQL query can be defined on the result of a Table API query by registering the resulting Table in the
TableEnvironment
and referencing it in theFROM
clause of the SQL query )
發出Table
通過將表寫入TableSink來發出表。 TableSink是通用接口,用於支持各種文件格式(例如CSV,Apache Parquet,Apache Avro),存儲系統(例如JDBC,Apache HBase,Apache Cassandra,Elasticsearch)或消息系統(例如Apache Kafka, RabbitMQ)。
批處理表只能寫入BatchTableSink,而流表則需要AppendStreamTableSink、RetractStreamTableSink或UpsertStreamTableSink。
請參閱有關表源和接收器的文檔,以獲取有關可用接收器的詳細信息以及有關如何實現自定義TableSink的說明。
Table.insertInto(String tableName)方法將表發射到已注冊的TableSink。 該方法通過名稱從catalog中查找TableSink,並驗證Table的結構與TableSink的結構是否相同。
以下示例顯示如何發出表:
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section // create a TableSink val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|") // register the TableSink with a specific schema val fieldNames: Array[String] = Array("a", "b", "c") val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG) tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink) // compute a result Table using Table API operators and/or SQL queries val result: Table = ... // emit the result Table to the registered TableSink result.insertInto("CsvSinkTable") // execute the program
翻譯並執行查詢
對於兩個planner 來說,翻譯和執行查詢的行為是不同的。
Old planner
根據Table API和SQL查詢的輸入是流輸入還是批處理輸入,它們將轉換為DataStream或DataSet程序。查詢被內部表示為一個邏輯查詢計划,並在兩個階段被轉換:
- 優化邏輯計划
- 轉換為 DataStream 或 DataSet 程序
在以下情況下,將轉換 Table API或SQL查詢:
- 將
Table
發射到TableSink
,即當Table.insertInto()
被調用。 - 指定SQL更新查詢,即當
TableEnvironment.sqlUpdate()
調用。 - 將
Table
轉換為DataStream 或
DataSet
(請參閱與DataStream和DataSet API集成)。
轉換后,將像常規DataStream或DataSet程序一樣處理Table API或SQL查詢,並在調用StreamExecutionEnvironment.execute()
或ExecutionEnvironment.execute()時執行
。
Blink planner
無論Table API和SQL查詢的輸入是流傳輸還是批處理,都將轉換為DataStream程序。查詢被內部表示為一個邏輯查詢計划,並在兩個階段被轉換:
- 優化邏輯計划,
- 轉換為DataStream程序。
翻譯查詢的行為TableEnvironment
和StreamTableEnvironment是不同的
。
對於TableEnvironment
,Table API或SQL查詢在TableEnvironment.execute()
調用時被轉換,因為TableEnvironment
將優化多個接收器為一個DAG。
而對於StreamTableEnvironment
,在以下情況下會轉換Table API或SQL查詢:
- 將
Table
發射到TableSink
,即當Table.insertInto()
被調用。 - 指定SQL更新查詢,即當
TableEnvironment.sqlUpdate()被
調用。 - 將
Table
轉換為DataStream
。
轉換后,將像常規的DataStream程序一樣處理Table API或SQL查詢,並在調用TableEnvironment.execute()
或StreamExecutionEnvironment.execute()時執行
。
DataStream和DataSet API集成
以下示例顯示如何發出表:流上的兩個planner 都可以與DataStream API集成。 只有old planner 才能與DataSet API集成,批量blink planner不能同時與兩者結合。 注意:下面討論的DataSet API僅與批量使用的old planner 有關。
Table API和SQL查詢可以輕松地與DataStream和DataSet程序集成並嵌入其中。 例如,可以查詢外部表(例如從RDBMS),進行一些預處理,例如過濾,投影,聚合或與元數據聯接,然后使用DataStream或 DataSet API(以及在這些API之上構建的任何庫,例如CEP或Gelly)。 相反,也可以將Table API或SQL查詢應用於DataStream或DataSet程序的結果。
可以通過將DataStream或DataSet轉換為Table來實現這種交互,反之亦然。 在本節中,我們描述如何完成這些轉換。
Scala的隱式轉換
Scala Table API具有對DataSet,DataStream和Table類的隱式轉換。 通過為Scala DataStream API導入org.apache.flink.table.api.scala._以及org.apache.flink.api.scala._,可以啟用這些轉換。
將DataStream或DataSet注冊為表
可以在TableEnvironment中將DataStream或DataSet注冊為表。 結果表的模式取決於已注冊的DataStream或DataSet的數據類型。 請檢查有關將數據類型映射到表模式的部分,以獲取詳細信息。
// get TableEnvironment // registration of a DataSet is equivalent val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section val stream: DataStream[(Long, String)] = ... // register the DataStream as Table "myTable" with fields "f0", "f1" tableEnv.registerDataStream("myTable", stream) // register the DataStream as table "myTable2" with fields "myLong", "myString" tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)
注意:DataStream表的名稱不能與^ _DataStreamTable_ [0-9] +模式匹配,並且DataSet表的名稱不能與^ _DataSetTable_ [0-9] +模式匹配。 這些模式僅供內部使用。
將DataStream或DataSet轉換為表
除了在TableEnvironment中注冊DataStream或DataSet之外,還可以將其直接轉換為Table。 如果要在Table API查詢中使用Table,這將很方便。
// get TableEnvironment // registration of a DataSet is equivalent val tableEnv = ... // see "Create a TableEnvironment" section val stream: DataStream[(Long, String)] = ... // convert the DataStream into a Table with default fields '_1, '_2 val table1: Table = tableEnv.fromDataStream(stream) // convert the DataStream into a Table with fields 'myLong, 'myString val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
將表轉換為DataStream或DataSet
可以將表轉換為DataStream或DataSet。 這樣,可以在Table API或SQL查詢的結果上運行自定義DataStream或DataSet程序。
將表轉換為DataStream或DataSet時,需要指定生成的DataStream或DataSet的數據類型,即,將表的行轉換為的數據類型。 最方便的轉換類型通常是Row。 以下列表概述了不同選項的功能:
- Row:字段按位置,任意數量的字段,支持null值,無類型安全訪問的方式映射。
- POJO:字段按名稱映射(POJO字段必須命名為
Table
字段),任意數量的字段,支持null
值,類型安全訪問。 - case class:字段按位置映射,不支持
null
值,類型安全訪問。 - tuple:按位置映射字段,限制為22(Scala)或25(Java)字段,不支持
null
值,類型安全訪問。 - 原子類型:
Table
必須具有單個字段,不支持null
值,類型安全訪問。
將Table轉換為DataStream
流式查詢結果產生的表將動態更新,即隨着新記錄到達查詢的輸入流不斷變化。 因此,將這種動態查詢轉換成的DataStream需要對表的更新進行編碼。
有兩種模式可以將Table轉換為DataStream:
- 追加模式:僅當動態表僅通過INSERT更改進行修改時才可以使用此模式,即,它僅是追加操作,並且以前發出的結果不更新。
- 撤回模式:始終可以使用此模式。 它使用布爾標志對INSERT和DELETE更改進行編碼。
// get TableEnvironment. // registration of a DataSet is equivalent val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section // Table with two fields (String name, Integer age) val table: Table = ... // convert the Table into an append DataStream of Row val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table) // convert the Table into an append DataStream of Tuple2[String, Int] val dsTuple: DataStream[(String, Int)] dsTuple = tableEnv.toAppendStream[(String, Int)](table) // convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream[(Boolean, X)]. // The boolean field indicates the type of the change. // True is INSERT, false is DELETE. val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
注意:有關動態表及其屬性的詳細討論,請參見“ 動態表”文檔。
將Table轉換為DataSet
Table
轉換為DataSet
如下:
val tableEnv = BatchTableEnvironment.create(env) // Table with two fields (String name, Integer age) val table: Table = ... // convert the Table into a DataSet of Row val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table) // convert the Table into a DataSet of Tuple2[String, Int] val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
數據類型到表結構的映射
Flink的DataStream和DataSet API支持非常多種類型。元組(內置Scala和Flink Java元組),POJO,Scala case class和 Flink 的 Row 類型等復合類型,允許嵌套的數據結構具有多個字段,這些字段可在表達式中訪問。其他類型被視為原子類型。在下文中,我們描述Table API如何將這些類型轉換為內部行表示形式,並顯示將DataStream轉換為的Table示例。
數據類型到表模式的映射可以使用兩種方式:基於字段位置或基於字段名稱。
基於位置的映射
基於位置的映射可用於在保持字段順序的同時為字段提供更有意義的名稱。此映射可用於具有定義的字段順序的復合數據類型以及原子類型。元組,行和案例類等復合數據類型具有這樣的字段順序。但是,必須根據字段名稱映射POJO的字段(請參閱下一節)。字段可以投影出來,但不能用別名 as 重命名。
定義基於位置的映射時,輸入數據類型中一定不能存在指定的名稱,否則API會假定應該基於字段名稱進行映射。 如果未指定任何字段名稱,則使用復合類型的默認字段名稱和字段順序,或者原子類型使用f0。
// get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section val stream: DataStream[(Long, Int)] = ... // convert DataStream into Table with default field names "_1" and "_2" val table: Table = tableEnv.fromDataStream(stream) // convert DataStream into Table with field "myLong" only val table: Table = tableEnv.fromDataStream(stream, 'myLong) // convert DataStream into Table with field names "myLong" and "myInt" val table: Table = tableEnv.fromDataStream(stream, 'myLong, 'myInt)
基於名稱的映射
基於名稱的映射可用於任何數據類型,包括POJO。這是定義表模式映射的最靈活的方法。映射中的所有字段均按名稱引用,並且可以使用別名 as 重命名。字段可以重新排序和投影。
如果未指定任何字段名稱,則使用復合類型的默認字段名稱和字段順序,或者原子類型使用f0。
// get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section val stream: DataStream[(Long, Int)] = ... // convert DataStream into Table with default field names "_1" and "_2" val table: Table = tableEnv.fromDataStream(stream) // convert DataStream into Table with field "_2" only val table: Table = tableEnv.fromDataStream(stream, '_2) // convert DataStream into Table with swapped fields val table: Table = tableEnv.fromDataStream(stream, '_2, '_1) // convert DataStream into Table with swapped fields and field names "myInt" and "myLong" val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myInt, '_1 as 'myLong)
原子類型
Flink將基本類型(整數,雙精度型,字符串)或泛型(無法分析和分解的類型)視為原子類型。 原子類型的DataStream或DataSet轉換為具有單個屬性的表。 從原子類型推斷出屬性的類型,並且可以指定屬性的名稱。
// get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section val stream: DataStream[Long] = ... // convert DataStream into Table with default field name "f0" val table: Table = tableEnv.fromDataStream(stream) // convert DataStream into Table with field name "myLong" val table: Table = tableEnv.fromDataStream(stream, 'myLong)
元組(Scala和Java)和case類(僅Scala)
Flink支持Scala的內置元組,並為Java提供了自己的元組類。 兩種元組的DataStreams和DataSet都可以轉換為表。 可以通過提供所有字段的名稱來重命名字段(根據位置進行映射)。 如果未指定任何字段名稱,則使用默認字段名稱。 如果引用了原始字段名稱(Flink元組為f0,f1,...,Scala元組為_1,_2,...),則API會假定映射是基於名稱的,而不是基於位置的。 基於名稱的映射允許使用別名(as)對字段和投影進行重新排序。
// get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section val stream: DataStream[(Long, String)] = ... // convert DataStream into Table with renamed default field names '_1, '_2 val table: Table = tableEnv.fromDataStream(stream) // convert DataStream into Table with field names "myLong", "myString" (position-based) val table: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString) // convert DataStream into Table with reordered fields "_2", "_1" (name-based) val table: Table = tableEnv.fromDataStream(stream, '_2, '_1) // convert DataStream into Table with projected field "_2" (name-based) val table: Table = tableEnv.fromDataStream(stream, '_2) // convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based) val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myString, '_1 as 'myLong) // define case class case class Person(name: String, age: Int) val streamCC: DataStream[Person] = ... // convert DataStream into Table with default field names 'name, 'age val table = tableEnv.fromDataStream(streamCC) // convert DataStream into Table with field names 'myName, 'myAge (position-based) val table = tableEnv.fromDataStream(streamCC, 'myName, 'myAge) // convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)
POJO (Java and Scala)
Flink支持POJO作為復合類型。確定POJO的規則在此文檔。
在不指定字段名稱的情況下將POJO DataStream或DataSet轉換為Table時,將使用原始POJO字段的名稱。 名稱映射需要原始名稱,並且不能按位置進行。 可以使用別名(使用as關鍵字)對字段進行重命名,重新排序和投影。
// get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section // Person is a POJO with field names "name" and "age" val stream: DataStream[Person] = ... // convert DataStream into Table with default field names "age", "name" (fields are ordered by name!) val table: Table = tableEnv.fromDataStream(stream) // convert DataStream into Table with renamed fields "myAge", "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName) // convert DataStream into Table with projected field "name" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'name) // convert DataStream into Table with projected and renamed field "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)
Row
該Row
數據類型支持字段和字段與任意數量的null
值。字段名稱可以通過指定RowTypeInfo
或轉化時Row
DataStream
或DataSet
成Table
。行類型支持按位置和名稱映射字段。可以通過提供所有字段的名稱(基於位置的映射)來重命名字段,也可以為投影/排序/重命名(基於名稱的映射)單獨選擇字段。
// get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section // DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo` val stream: DataStream[Row] = ... // convert DataStream into Table with default field names "name", "age" val table: Table = tableEnv.fromDataStream(stream) // convert DataStream into Table with renamed field names "myName", "myAge" (position-based) val table: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge) // convert DataStream into Table with renamed fields "myName", "myAge" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge) // convert DataStream into Table with projected field "name" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'name) // convert DataStream into Table with projected and renamed field "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)
查詢優化
Old planner
Apache Flink利用Apache Calcite來優化和翻譯查詢。 當前執行的優化包括投影和過濾器下推,子查詢去相關以及其他類型的查詢重寫。 Old Planner尚未優化聯接的順序,而是按照查詢中定義的順序執行它們(FROM子句中的表順序和/或WHERE子句中的連接謂詞順序)。
Blink planner
Apache Flink利用並擴展了Apache Calcite來執行復雜的查詢優化。這包括一系列基於規則和成本的優化,例如:
- 基於Apache Calcite的子查詢解相關
- 計划修剪
- 分區修剪
- 過濾器下推
- 子計划重復數據刪除避免重復計算
- 特殊的子查詢重寫,包括兩個部分:
- 將IN和EXISTS轉換為左半聯接
- 將NOT IN和NOT EXISTS轉換為左反聯接
- 可選join 重新排序
- 通過啟用
table.optimizer.join-reorder-enabled
- 通過啟用
注意: IN / EXISTS / NOT IN / NOT EXISTS當前僅在子查詢重寫的結合條件下受支持。
優化器不僅基於計划,而且還基於可從數據源獲得的豐富統計信息以及每個operator(例如io,cpu,網絡和內存)的細粒度成本來做出明智的決策。
高級用戶可以通過CalciteConfig
對象提供自定義優化,該對象可以通過調用提供給表環境TableEnvironment#getConfig#setPlannerConfig
。
解釋表
Table API提供了一種機制來解釋計算表的邏輯和優化查詢計划。 這是通過TableEnvironment.explain(table)方法或TableEnvironment.explain()方法完成的。 explain(表)返回給定表的計划。 describe()返回多接收器計划的結果,主要用於Blink planner。 它返回一個描述三個計划的字符串:
- 關系查詢的抽象語法樹,即未優化的邏輯查詢計划
- 優化的邏輯查詢計划
- 實際執行計划
以下代碼顯示了一個示例以及使用explain(table)給定Table的相應輸出:
val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) val table = table1 .where('word.like("F%")) .unionAll(table2) val explanation: String = tEnv.explain(table) println(explanation)
== Abstract Syntax Tree == LogicalUnion(all=[true]) LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) FlinkLogicalDataStreamScan(id=[1], fields=[count, word]) FlinkLogicalDataStreamScan(id=[2], fields=[count, word]) == Optimized Logical Plan == DataStreamUnion(all=[true], union all=[count, word]) DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')]) DataStreamScan(id=[1], fields=[count, word]) DataStreamScan(id=[2], fields=[count, word]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Data Source content : collect elements with CollectionInputFormat Stage 3 : Operator content : from: (count, word) ship_strategy : REBALANCE Stage 4 : Operator content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word) ship_strategy : FORWARD Stage 5 : Operator content : from: (count, word) ship_strategy : REBALANCE
以下代碼顯示了一個示例以及使用explain()的多sink計划的相應輸出:
val settings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build val tEnv = TableEnvironment.create(settings) val fieldNames = Array("count", "word") val fieldTypes = Array[TypeInformation[_]](Types.INT, Types.STRING) tEnv.registerTableSource("MySource1", new CsvTableSource("/source/path1", fieldNames, fieldTypes)) tEnv.registerTableSource("MySource2", new CsvTableSource("/source/path2",fieldNames, fieldTypes)) tEnv.registerTableSink("MySink1", new CsvTableSink("/sink/path1").configure(fieldNames, fieldTypes)) tEnv.registerTableSink("MySink2", new CsvTableSink("/sink/path2").configure(fieldNames, fieldTypes)) val table1 = tEnv.scan("MySource1").where("LIKE(word, 'F%')") table1.insertInto("MySink1") val table2 = table1.unionAll(tEnv.scan("MySource2")) table2.insertInto("MySink2") val explanation = tEnv.explain(false) println(explanation)
多sink計划的結果是
== Abstract Syntax Tree == LogicalSink(name=[MySink1], fields=[count, word]) +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]]) LogicalSink(name=[MySink2], fields=[count, word]) +- LogicalUnion(all=[true]) :- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) : +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]]) +- LogicalTableScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]]) == Optimized Logical Plan == Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], reuse_id=[1]) +- TableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word]) Sink(name=[MySink1], fields=[count, word]) +- Reused(reference_id=[1]) Sink(name=[MySink2], fields=[count, word]) +- Union(all=[true], union=[count, word]) :- Reused(reference_id=[1]) +- TableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Operator content : CsvTableSource(read fields: count, word) ship_strategy : REBALANCE Stage 3 : Operator content : SourceConversion(table:Buffer(default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]), fields:(count, word)) ship_strategy : FORWARD Stage 4 : Operator content : Calc(where: (word LIKE _UTF-16LE'F%'), select: (count, word)) ship_strategy : FORWARD Stage 5 : Operator content : SinkConversionToRow ship_strategy : FORWARD Stage 6 : Operator content : Map ship_strategy : FORWARD Stage 8 : Data Source content : collect elements with CollectionInputFormat Stage 9 : Operator content : CsvTableSource(read fields: count, word) ship_strategy : REBALANCE Stage 10 : Operator content : SourceConversion(table:Buffer(default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]), fields:(count, word)) ship_strategy : FORWARD Stage 12 : Operator content : SinkConversionToRow ship_strategy : FORWARD Stage 13 : Operator content : Map ship_strategy : FORWARD Stage 7 : Data Sink content : Sink: CsvTableSink(count, word) ship_strategy : FORWARD Stage 14 : Data Sink content : Sink: CsvTableSink(count, word) ship_strategy : FORWARD
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文