【翻譯】Flink Table Api & SQL —— 概念與通用API


本文翻譯自官網:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html

 Flink Table Api & SQL 翻譯目錄

Table API和SQL集成在共同API中。該API的中心概念是Table,用作查詢的輸入和輸出。本文檔介紹了使用Table API和SQL查詢的程序的通用結構,如何注冊 Table,如何查詢Table以及如何發出 Table(數據)

兩個 planner 之間的主要區別

  1. Blink將批處理作業視為流的特殊情況。因此,還不支持Table和DataSet之間的轉換,並且批處理作業不會轉換成DateSet ,而是像流作業一樣轉換為DataStream程序。
  2. Blink planner 不支持BatchTableSource而是使用bounded  StreamTableSource 代替。
  3. Blink planner 僅支持新的Catalog,不支持 ExternalCatalog 它是不推薦使用的。
  4. 為 old planner 和 blink planner  實現的 FilterableTableSource 是不相容的。old planner 會將 PlannerExpressions  下推到 FilterableTableSource,而 blink planner 將 下推 Expressions (不懂什么意思:The old planner will push down PlannerExpressions into FilterableTableSource, while the Blink planner will push down Expressions.) 
  5. 基於字符串的鍵值配置選項(有關詳細信息,請參閱有關配置的文檔)僅用於Blink planner。
  6. 兩個 planner 的實現(CalciteConfigPlannerConfig不同。
  7. Blink planner 會將多個接收器優化為一個DAG(僅在TableEnvironment上支持,而不在StreamTableEnvironment上支持)。old planner 將始終將每個接收器優化為一個新的DAG,其中所有DAG彼此獨立。
  8. old planner 現在不支持catalog 統計信息,而Blink planner 則支持。

表API和SQL程序的結構

用於批處理和流式傳輸的所有Table API和SQL程序都遵循相同的模式。以下代碼示例顯示了Table API和SQL程序的通用結構。

// create a TableEnvironment for specific planner batch or streaming
val tableEnv = ... // see "Create a TableEnvironment" section

// register a Table
tableEnv.registerTable("table1", ...)           // or
tableEnv.registerTableSource("table2", ...)     // or
tableEnv.registerExternalCatalog("extCat", ...)
// register an output Table
tableEnv.registerTableSink("outputTable", ...);

// create a Table from a Table API query
val tapiResult = tableEnv.scan("table1").select(...)
// create a Table from a SQL query
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...")

// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto("outputTable")

// execute
tableEnv.execute("scala_job")

注意:表API和SQL查詢可以輕松地與DataStream或DataSet程序集成並嵌入其中。請參閱與DataStream和DataSet API集成,以了解如何將DataStream和DataSet轉換為Tables,反之亦然。

 

創建一個TableEnvironment

TableEnvironment是Table API和SQL集成的中心概念。它負責:

  • Table在內部catalog中注冊
  • 注冊外部catalog
  • 執行SQL查詢
  • 注冊用戶定義的(標量,表或聚合)函數
  • DataStreamDataSet轉換為Table
  • 持有對ExecutionEnvironment或StreamExecutionEnvironment的引用

Table始終綁定到特定的TableEnvironment不可能在同一查詢中組合不同TableEnvironments的表,例如,join 或union 它們。

TableEnvironment是通過調用StreamExecutionEnvironmentExecutionEnvironment的靜態方法 BatchTableEnvironment.create()StreamTableEnvironment.create()與可選的TableConfig創建的TableConfig可用於配置TableEnvironment或定制查詢優化和翻譯過程(參見查詢優化)。

請務必選擇特定的planner BatchTableEnvironmentStreamTableEnvironment 與你的編程語言相匹配。

如果兩個planner jar都在類路徑上(默認行為),則應明確設置要在當前程序中使用的planner 。

// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironment

val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings)
// or val fsTableEnv = TableEnvironment.create(fsSettings)

// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala.BatchTableEnvironment

val fbEnv = ExecutionEnvironment.getExecutionEnvironment
val fbTableEnv = BatchTableEnvironment.create(fbEnv)

// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironment

val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
// or val bsTableEnv = TableEnvironment.create(bsSettings)

// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)

注意:如果/lib目錄中只有一個計划器jar ,則可以使用useAnyPlanneruse_any_planner對於python)創建specific EnvironmentSettings

 回到頂部

在catalog中注冊表

TableEnvironment 維護按名稱注冊的表的catalog。表有兩種類型,輸入表輸出表可以在Table API和SQL查詢中引用輸入表並提供輸入數據。輸出表可用於將表API或SQL查詢的結果發送到外部系統。

輸入表可以從各種source 進行注冊:

  • 現有Table對象,通常是Table API或SQL查詢的結果。
  • TableSource,用於訪問外部數據,例如文件,數據庫或消息系統。
  • DataStreamDataSet從數據流(僅適用於流作業)或數據集(僅適用於old planner 轉換批處理作業)程序。DataStream和DataSet API集成部分中討論了注冊DataStreamDataSet。

可以使用TableSink來注冊輸出表

注冊表格

在TableEnvironment中注冊Table如下: 

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// table is the result of a simple projection query 
val projTable: Table = tableEnv.scan("X").select(...)

// register the Table projTable as table "projectedTable"
tableEnv.registerTable("projectedTable", projTable)

注意:注冊Table的處理方式與關系數據庫系統中的VIEW相似,即,定義的查詢Table未經過優化,但是當另一個查詢引用已注冊的查詢時將內聯Table如果多個查詢引用同一個已注冊的查詢Table,則將為每個引用查詢內聯該查詢並執行多次,即Table不會共享已注冊的結果 

注冊TableSource

TableSource提供對存儲 在存儲系統(例如數據庫(MySQL,HBase等)具有特定編碼的文件(CSV,Apache [Parquet,Avro,ORC]等)或消息系統(Apache Kafka,RabbitMQ等)中的外部數據的訪問。 

Flink旨在為常見的數據格式和存儲系統提供TableSources。請查看“ 表源和接收器”頁面,以獲取受支持TableSource的列表以及如何構建自定義的TableSource

在TableEnvironment中注冊TableSource 如下: 

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// create a TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)

// register the TableSource as table "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)

注:TableEnvironment用於blink planner 只接受StreamTableSourceLookupableTableSource和InputFormatTableSource,StreamTableSource用於blink planner必須是有界的。 

注冊TableSink

TableSink可以使用注冊的表將Table API或SQL查詢的結果發送到外部存儲系統,例如數據庫,鍵值存儲(系統),消息隊列或文件系統(采用不同的編碼,例如CSV,Apache [Parquet ,Avro,ORC],…)。

Flink旨在為常見的數據格式和存儲系統提供TableSink。請參閱有關“ 表源和接收器”頁面的文檔,以獲取有關可用接收的詳細信息以及如何實現自定義的TableSink

在TableEnvironment中注冊TableSink 如下: 

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)

// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)

// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)

注冊外部catalog

外部catalog可以提供有關外部數據庫和表的信息,例如它們的名稱,結構,統計信息,以及有關如何訪問存儲在外部數據庫、表或文件中的數據的信息。

可以通過實現ExternalCatalog接口來創建外部catalog,並在TableEnvironment中對其進行注冊,如下所示:

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// create an external catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog

// register the ExternalCatalog catalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog)

在TableEnvironment中注冊后,可以通過指定表的完整路徑(例如catalog.database.table)從Table API或SQL查詢中訪問在ExternalCatalog中定義的所有表。

目前,Flink提供了一個InMemoryExternalCatalog用於演示和測試目的的工具。但是,該ExternalCatalog接口還可用於 將HCatalog或Metastore之類的catalog連接到Table API。

注意:blink planner不支持外部catalog。

查詢表

表API

Table API是用於Scala和Java的語言集成查詢API。與SQL相反,查詢未指定為字符串,而是以宿主語言逐步構成。

該API基於Table類,Table類代表一個表(流或批的),並提供應用關系操作的方法。 這些方法返回一個新的Table對象,該對象表示對輸入Table應用關系操作的結果。 某些關系操作由多個方法調用組成,例如table.groupBy(...)select(),其中groupBy(...)指定表的分組,並select(...)分組的投影表。

Table API文檔描述了流和批處理表支持的所有Table API操作。

以下示例顯示了一個簡單的Table API聚合查詢:

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// register Orders table

// scan registered Orders table
val orders = tableEnv.scan("Orders")
// compute revenue for all customers from France
val revenue = orders
  .filter('cCountry === "FRANCE")
  .groupBy('cID, 'cName)
  .select('cID, 'cName, 'revenue.sum AS 'revSum)

// emit or convert Table
// execute query

注意: Scala Table API使用Scala符號,這些符號以單個記號('開頭來引用的Table屬性Table API使用Scala隱式轉換。為了使用Scala隱式轉換確保導入了 org.apache.flink.api.scala._ 和 org.apache.flink.table.api.scala._ 。

SQL

Flink的SQL集成基於實現SQL標准的Apache CalciteSQL查詢被指定為常規字符串。

SQL文件描述flink SQL支持的流和批的表。

以下示例說明如何指定查詢並以返回用表 表示的結果

 

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// register Orders table

// compute revenue for all customers from France
val revenue = tableEnv.sqlQuery("""
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

// emit or convert Table
// execute query

下面的示例演示如何指定將更新查詢的結果插入到已注冊表中。

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// register "Orders" table
// register "RevenueFrance" output table

// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.sqlUpdate("""
  |INSERT INTO RevenueFrance
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

// execute query

混合 Table API和SQL

Table API和SQL查詢可以輕松混合,因為它們都返回Table對象:

  • 可以在Table API 的查詢可以定義在SQL查詢返回的Table對象
  • 通過在TableEnvironment中注冊結果表並在SQL查詢的FROM子句中引用它,可以對Table API查詢的結果定義SQL查詢。(好繞:A SQL query can be defined on the result of a Table API query by registering the resulting Table in the TableEnvironment and referencing it in the FROM clause of the SQL query 

發出Table

通過將表寫入TableSink來發出表。 TableSink是通用接口,用於支持各種文件格式(例如CSV,Apache Parquet,Apache Avro),存儲系統(例如JDBC,Apache HBase,Apache Cassandra,Elasticsearch)或消息系統(例如Apache Kafka, RabbitMQ)。

批處理表只能寫入BatchTableSink,而流表則需要AppendStreamTableSink、RetractStreamTableSink或UpsertStreamTableSink。

請參閱有關表源和接收器的文檔,以獲取有關可用接收器的詳細信息以及有關如何實現自定義TableSink的說明。

Table.insertInto(String tableName)方法將表發射到已注冊的TableSink。 該方法通過名稱從catalog中查找TableSink,並驗證Table的結構與TableSink的結構是否相同。

以下示例顯示如何發出表:

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// create a TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")

// register the TableSink with a specific schema
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG)
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)

// compute a result Table using Table API operators and/or SQL queries
val result: Table = ...

// emit the result Table to the registered TableSink
result.insertInto("CsvSinkTable")

// execute the program

翻譯並執行查詢

對於兩個planner 來說,翻譯和執行查詢的行為是不同的。

Old planner

根據Table API和SQL查詢的輸入是流輸入還是批處理輸入,它們將轉換為DataStreamDataSet程序。查詢被內部表示為一個邏輯查詢計划,並在兩個階段被轉換:

  1. 優化邏輯計划
  2. 轉換為 DataStream 或 DataSet 程序

在以下情況下,將轉換 Table API或SQL查詢:

  • 將 Table發射到TableSink,即當 Table.insertInto()被調用。
  • 指定SQL更新查詢,即當 TableEnvironment.sqlUpdate()調用。
  • Table轉換為DataStream 或 DataSet(請參閱與DataStream和DataSet API集成)。

轉換后,將像常規DataStream或DataSet程序一樣處理Table API或SQL查詢,並在調用StreamExecutionEnvironment.execute()ExecutionEnvironment.execute()時執行

Blink planner

無論Table API和SQL查詢的輸入是流傳輸還是批處理,都將轉換為DataStream程序。查詢被內部表示為一個邏輯查詢計划,並在兩個階段被轉換:

  1. 優化邏輯計划,
  2. 轉換為DataStream程序。

翻譯查詢的行為TableEnvironmentStreamTableEnvironment是不同的

對於TableEnvironment,Table API或SQL查詢在TableEnvironment.execute()調用時被轉換,因為TableEnvironment將優化多個接收器為一個DAG。

而對於StreamTableEnvironment在以下情況下會轉換Table API或SQL查詢:

  • Table發射到TableSink,即當Table.insertInto()被調用。
  • 指定SQL更新查詢,即當TableEnvironment.sqlUpdate()被調用。
  • Table轉換為DataStream

轉換后,將像常規的DataStream程序一樣處理Table API或SQL查詢,並在調用TableEnvironment.execute()StreamExecutionEnvironment.execute()時執行

DataStream和DataSet API集成

以下示例顯示如何發出表:流上的兩個planner 都可以與DataStream API集成。 只有old planner 才能與DataSet API集成,批量blink planner不能同時與兩者結合。 注意:下面討論的DataSet API僅與批量使用的old planner 有關。

Table API和SQL查詢可以輕松地與DataStream和DataSet程序集成並嵌入其中。 例如,可以查詢外部表(例如從RDBMS),進行一些預處理,例如過濾,投影,聚合或與元數據聯接,然后使用DataStream或 DataSet API(以及在這些API之上構建的任何庫,例如CEP或Gelly)。 相反,也可以將Table API或SQL查詢應用於DataStream或DataSet程序的結果。

可以通過將DataStream或DataSet轉換為Table來實現這種交互,反之亦然。 在本節中,我們描述如何完成這些轉換。

Scala的隱式轉換

 Scala Table API具有對DataSet,DataStream和Table類的隱式轉換。 通過為Scala DataStream API導入org.apache.flink.table.api.scala._以及org.apache.flink.api.scala._,可以啟用這些轉換。

將DataStream或DataSet注冊為表

 可以在TableEnvironment中將DataStream或DataSet注冊為表。 結果表的模式取決於已注冊的DataStream或DataSet的數據類型。 請檢查有關將數據類型映射到表模式的部分,以獲取詳細信息。

// get TableEnvironment 
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section

val stream: DataStream[(Long, String)] = ...

// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream)

// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)

注意:DataStream表的名稱不能與^ _DataStreamTable_ [0-9] +模式匹配,並且DataSet表的名稱不能與^ _DataSetTable_ [0-9] +模式匹配。 這些模式僅供內部使用。

將DataStream或DataSet轉換為表

 除了在TableEnvironment中注冊DataStream或DataSet之外,還可以將其直接轉換為Table。 如果要在Table API查詢中使用Table,這將很方便。

// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = ... // see "Create a TableEnvironment" section

val stream: DataStream[(Long, String)] = ...

// convert the DataStream into a Table with default fields '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)

// convert the DataStream into a Table with fields 'myLong, 'myString
val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

將表轉換為DataStream或DataSet

 可以將表轉換為DataStream或DataSet。 這樣,可以在Table API或SQL查詢的結果上運行自定義DataStream或DataSet程序。

將表轉換為DataStream或DataSet時,需要指定生成的DataStream或DataSet的數據類型,即,將表的行轉換為的數據類型。 最方便的轉換類型通常是Row。 以下列表概述了不同選項的功能:

  • Row:字段按位置,任意數量的字段,支持null值,無類型安全訪問的方式映射
  • POJO:字段按名稱映射(POJO字段必須命名為Table字段),任意數量的字段,支持null值,類型安全訪問。
  • case class:字段按位置映射,不支持null值,類型安全訪問。
  • tuple:按位置映射字段,限制為22(Scala)或25(Java)字段,不支持null值,類型安全訪問。
  • 原子類型Table必須具有單個字段,不支持null值,類型安全訪問。 

將Table轉換為DataStream 

流式查詢結果產生的表將動態更新,即隨着新記錄到達查詢的輸入流不斷變化。 因此,將這種動態查詢轉換成的DataStream需要對表的更新進行編碼。

有兩種模式可以將Table轉換為DataStream:

  1. 追加模式:僅當動態表僅通過INSERT更改進行修改時才可以使用此模式,即,它僅是追加操作,並且以前發出的結果不更新
  2. 撤回模式:始終可以使用此模式。 它使用布爾標志對INSERT和DELETE更改進行編碼
// get TableEnvironment. 
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section

// Table with two fields (String name, Integer age)
val table: Table = ...

// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)

// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple = 
  tableEnv.toAppendStream[(String, Int)](table)

// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream[(Boolean, X)]. 
//   The boolean field indicates the type of the change. 
//   True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

注意:有關動態表及其屬性的詳細討論,請參見“ 動態表”文檔。 

將Table轉換為DataSet

 Table轉換為DataSet如下:

val tableEnv = BatchTableEnvironment.create(env)

// Table with two fields (String name, Integer age)
val table: Table = ...

// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)

// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

數據類型到表結構的映射 

Flink的DataStream和DataSet API支持非常多種類型。元組(內置Scala和Flink Java元組),POJO,Scala case class和 Flink 的 Row 類型等復合類型,允許嵌套的數據結構具有多個字段,這些字段可在表達式中訪問。其他類型被視為原子類型。在下文中,我們描述Table API如何將這些類型轉換為內部行表示形式,並顯示將DataStream轉換為的Table示例 

數據類型到表模式的映射可以使用兩種方式:基於字段位置基於字段名稱 

基於位置的映射 

基於位置的映射可用於在保持字段順序的同時為字段提供更有意義的名稱。此映射可用於具有定義的字段順序的復合數據類型以及原子類型。元組,行和案例類等復合數據類型具有這樣的字段順序。但是,必須根據字段名稱映射POJO的字段(請參閱下一節)。字段可以投影出來,但不能用別名 as 重命名

定義基於位置的映射時,輸入數據類型中一定不能存在指定的名稱,否則API會假定應該基於字段名稱進行映射。 如果未指定任何字段名稱,則使用復合類型的默認字段名稱和字段順序,或者原子類型使用f0。 

// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section

val stream: DataStream[(Long, Int)] = ...

// convert DataStream into Table with default field names "_1" and "_2"
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field "myLong" only
val table: Table = tableEnv.fromDataStream(stream, 'myLong)

// convert DataStream into Table with field names "myLong" and "myInt"
val table: Table = tableEnv.fromDataStream(stream, 'myLong, 'myInt)

基於名稱的映射 

基於名稱的映射可用於任何數據類型,包括POJO。這是定義表模式映射的最靈活的方法。映射中的所有字段均按名稱引用,並且可以使用別名 as 重命名字段可以重新排序和投影。

如果未指定任何字段名稱,則使用復合類型的默認字段名稱和字段順序,或者原子類型使用f0。

// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section

val stream: DataStream[(Long, Int)] = ...

// convert DataStream into Table with default field names "_1" and "_2"
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field "_2" only
val table: Table = tableEnv.fromDataStream(stream, '_2)

// convert DataStream into Table with swapped fields
val table: Table = tableEnv.fromDataStream(stream, '_2, '_1)

// convert DataStream into Table with swapped fields and field names "myInt" and "myLong"
val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myInt, '_1 as 'myLong)

原子類型

 Flink將基本類型(整數,雙精度型,字符串)或泛型(無法分析和分解的類型)視為原子類型。 原子類型的DataStream或DataSet轉換為具有單個屬性的表。 從原子類型推斷出屬性的類型,並且可以指定屬性的名稱。

// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section

val stream: DataStream[Long] = ...

// convert DataStream into Table with default field name "f0"
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field name "myLong"
val table: Table = tableEnv.fromDataStream(stream, 'myLong)

元組(Scala和Java)和case類(僅Scala)

Flink支持Scala的內置元組,並為Java提供了自己的元組類。 兩種元組的DataStreams和DataSet都可以轉換為表。 可以通過提供所有字段的名稱來重命名字段(根據位置進行映射)。 如果未指定任何字段名稱,則使用默認字段名稱。 如果引用了原始字段名稱(Flink元組為f0,f1,...,Scala元組為_1,_2,...),則API會假定映射是基於名稱的,而不是基於位置的。 基於名稱的映射允許使用別名(as)對字段和投影進行重新排序。

// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section

val stream: DataStream[(Long, String)] = ...

// convert DataStream into Table with renamed default field names '_1, '_2
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field names "myLong", "myString" (position-based)
val table: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

// convert DataStream into Table with reordered fields "_2", "_1" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2, '_1)

// convert DataStream into Table with projected field "_2" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2)

// convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myString, '_1 as 'myLong)

// define case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...

// convert DataStream into Table with default field names 'name, 'age
val table = tableEnv.fromDataStream(streamCC)

// convert DataStream into Table with field names 'myName, 'myAge (position-based)
val table = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)

// convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)

POJO (Java and Scala)

Flink支持POJO作為復合類型。確定POJO的規則在此文檔

 在不指定字段名稱的情況下將POJO DataStream或DataSet轉換為Table時,將使用原始POJO字段的名稱。 名稱映射需要原始名稱,並且不能按位置進行。 可以使用別名(使用as關鍵字)對字段進行重命名,重新排序和投影。

// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section

// Person is a POJO with field names "name" and "age"
val stream: DataStream[Person] = ...

// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)

// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name)

// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)

Row

Row數據類型支持字段和字段與任意數量的null值。字段名稱可以通過指定RowTypeInfo或轉化時Row DataStreamDataSetTable行類型支持按位置和名稱映射字段。可以通過提供所有字段的名稱(基於位置的映射)來重命名字段,也可以為投影/排序/重命名(基於名稱的映射)單獨選擇字段。

// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section

// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
val stream: DataStream[Row] = ...

// convert DataStream into Table with default field names "name", "age"
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
val table: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)

// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)

// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name)

// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)

查詢優化

Old planner

Apache Flink利用Apache Calcite來優化和翻譯查詢。 當前執行的優化包括投影和過濾器下推,子查詢去相關以及其他類型的查詢重寫。 Old Planner尚未優化聯接的順序,而是按照查詢中定義的順序執行它們(FROM子句中的表順序和/或WHERE子句中的連接謂詞順序)。

Blink planner 

Apache Flink利用並擴展了Apache Calcite來執行復雜的查詢優化。這包括一系列基於規則和成本的優化,例如:

  • 基於Apache Calcite的子查詢解相關
  • 計划修剪
  • 分區修剪
  • 過濾器下推
  • 子計划重復數據刪除避免重復計算
  • 特殊的子查詢重寫,包括兩個部分:
    • 將IN和EXISTS轉換為左半聯接
    • 將NOT IN和NOT EXISTS轉換為左反聯接
  • 可選join 重新排序
    • 通過啟用 table.optimizer.join-reorder-enabled

注意: IN / EXISTS / NOT IN / NOT EXISTS當前僅在子查詢重寫的結合條件下受支持。

優化器不僅基於計划,而且還基於可從數據源獲得的豐富統計信息以及每個operator(例如io,cpu,網絡和內存)的細粒度成本來做出明智的決策。

高級用戶可以通過CalciteConfig對象提供自定義優化,該對象可以通過調用提供給表環境TableEnvironment#getConfig#setPlannerConfig

解釋表 

Table API提供了一種機制來解釋計算表的邏輯和優化查詢計划。 這是通過TableEnvironment.explain(table)方法或TableEnvironment.explain()方法完成的。 explain(表)返回給定表的計划。 describe()返回多接收器計划的結果,主要用於Blink planner。 它返回一個描述三個計划的字符串:

  1. 關系查詢的抽象語法樹,即未優化的邏輯查詢計划
  2. 優化的邏輯查詢計划
  3. 實際執行計划

以下代碼顯示了一個示例以及使用explain(table)給定Table的相應輸出:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table = table1
  .where('word.like("F%"))
  .unionAll(table2)

val explanation: String = tEnv.explain(table)
println(explanation)
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
    FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
  FlinkLogicalDataStreamScan(id=[2], fields=[count, word])

== Optimized Logical Plan ==
DataStreamUnion(all=[true], union all=[count, word])
  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
    DataStreamScan(id=[1], fields=[count, word])
  DataStreamScan(id=[2], fields=[count, word])

== Physical Execution Plan ==
Stage 1 : Data Source
    content : collect elements with CollectionInputFormat

Stage 2 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 3 : Operator
        content : from: (count, word)
        ship_strategy : REBALANCE

        Stage 4 : Operator
            content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
            ship_strategy : FORWARD

            Stage 5 : Operator
                content : from: (count, word)
                ship_strategy : REBALANCE

以下代碼顯示了一個示例以及使用explain()的多sink計划的相應輸出:

val settings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
val tEnv = TableEnvironment.create(settings)

val fieldNames = Array("count", "word")
val fieldTypes = Array[TypeInformation[_]](Types.INT, Types.STRING)
tEnv.registerTableSource("MySource1", new CsvTableSource("/source/path1", fieldNames, fieldTypes))
tEnv.registerTableSource("MySource2", new CsvTableSource("/source/path2",fieldNames, fieldTypes))
tEnv.registerTableSink("MySink1", new CsvTableSink("/sink/path1").configure(fieldNames, fieldTypes))
tEnv.registerTableSink("MySink2", new CsvTableSink("/sink/path2").configure(fieldNames, fieldTypes))

val table1 = tEnv.scan("MySource1").where("LIKE(word, 'F%')")
table1.insertInto("MySink1")

val table2 = table1.unionAll(tEnv.scan("MySource2"))
table2.insertInto("MySink2")

val explanation = tEnv.explain(false)
println(explanation)

多sink計划的結果是

== Abstract Syntax Tree ==
LogicalSink(name=[MySink1], fields=[count, word])
+- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
   +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])

LogicalSink(name=[MySink2], fields=[count, word])
+- LogicalUnion(all=[true])
   :- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
   :  +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
   +- LogicalTableScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]])

== Optimized Logical Plan ==
Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], reuse_id=[1])
+- TableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])

Sink(name=[MySink1], fields=[count, word])
+- Reused(reference_id=[1])

Sink(name=[MySink2], fields=[count, word])
+- Union(all=[true], union=[count, word])
   :- Reused(reference_id=[1])
   +- TableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])

== Physical Execution Plan ==
Stage 1 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 2 : Operator
        content : CsvTableSource(read fields: count, word)
        ship_strategy : REBALANCE

        Stage 3 : Operator
            content : SourceConversion(table:Buffer(default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
            ship_strategy : FORWARD

            Stage 4 : Operator
                content : Calc(where: (word LIKE _UTF-16LE'F%'), select: (count, word))
                ship_strategy : FORWARD

                Stage 5 : Operator
                    content : SinkConversionToRow
                    ship_strategy : FORWARD

                    Stage 6 : Operator
                        content : Map
                        ship_strategy : FORWARD

Stage 8 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 9 : Operator
        content : CsvTableSource(read fields: count, word)
        ship_strategy : REBALANCE

        Stage 10 : Operator
            content : SourceConversion(table:Buffer(default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
            ship_strategy : FORWARD

            Stage 12 : Operator
                content : SinkConversionToRow
                ship_strategy : FORWARD

                Stage 13 : Operator
                    content : Map
                    ship_strategy : FORWARD

                    Stage 7 : Data Sink
                        content : Sink: CsvTableSink(count, word)
                        ship_strategy : FORWARD

                        Stage 14 : Data Sink
                            content : Sink: CsvTableSink(count, word)
                            ship_strategy : FORWARD

 

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM