Flink Table API & SQL編程指南(1)








點擊藍字關注我們

Apache Flink提供了兩種頂層的關系型API,分別為Table API和SQL,Flink通過Table API&SQL實現了批流統一。其中Table API是用於Scala和Java的語言集成查詢API,它允許以非常直觀的方式組合關系運算符(例如select,where和join)的查詢。Flink SQL基於Apache Calcite 實現了標准的SQL,用戶可以使用標准的SQL處理數據集。Table API和SQL與Flink的DataStream和DataSet API緊密集成在一起,用戶可以實現相互轉化,比如可以將DataStream或者DataSet注冊為table進行操作數據。值得注意的是,Table API and SQL目前尚未完全完善,還在積極的開發中,所以並不是所有的算子操作都可以通過其實現。

依賴

從Flink1.9開始,Flink為Table & SQL API提供了兩種planner,分別為Blink planner和old planner,其中old planner是在Flink1.9之前的版本使用。主要區別如下:

尖叫提示:對於生產環境,目前推薦使用old planner.

  • flink-table-common: 通用模塊,包含 Flink Planner 和 Blink Planner 一些共用的代碼
  • flink-table-api-java: java語言的Table & SQL API,僅針對table(處於早期的開發階段,不推薦使用)
  • flink-table-api-scala: scala語言的Table & SQL API,僅針對table(處於早期的開發階段,不推薦使用)
  • flink-table-api-java-bridge: java語言的Table & SQL API,支持DataStream/DataSet API(推薦使用)
  • flink-table-api-scala-bridge: scala語言的Table & SQL API,支持DataStream/DataSet API(推薦使用)
  • flink-table-planner:planner 和runtime. planner為Flink1,9之前的old planner(推薦使用)
  • flink-table-planner-blink: 新的Blink planner.
  • flink-table-runtime-blink: 新的Blink runtime.
  • flink-table-uber: 將上述的API模塊及old planner打成一個jar包,形如flink-table-*.jar,位與/lib目錄下
  • flink-table-uber-blink:將上述的API模塊及Blink 模塊打成一個jar包,形如fflink-table-blink-*.jar,位與/lib目錄下

Blink planner & old planner

Blink planner和old planner有許多不同的特點,具體列舉如下:

  • Blink planner將批處理作業看做是流處理作業的特例。所以,不支持Table 與DataSet之間的轉換,批處理的作業也不會被轉成DataSet程序,而是被轉為DataStream程序。
  • Blink planner不支持 BatchTableSource,使用的是有界的StreamTableSource。
  • Blink planner僅支持新的 Catalog,不支持 ExternalCatalog (已過時)。
  • 對於FilterableTableSource的實現,兩種Planner是不同的。old planner會謂詞下推到 PlannerExpression(未來會被移除),而Blink planner 會謂詞下推到 Expression(表示一個產生計算結果的邏輯樹)。
  • 僅僅Blink planner支持key-value形式的配置,即通過Configuration進行參數設置。
  • 關於PlannerConfig的實現,兩種planner有所不同。
  • Blink planner 會將多個sink優化成一個DAG(僅支持TableEnvironment,StreamTableEnvironment不支持),old planner總是將每一個sink優化成一個新的DAG,每一個DAG都是相互獨立的。
  • old planner不支持catalog統計,Blink planner支持catalog統計。

Flink Table & SQL程序的pom依賴

根據使用的語言不同,可以選擇下面的依賴,包括scala版和java版,如下:

<!-- java版 -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>
<!-- scala版 -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

除此之外,如果需要在本地的IDE中運行Table API & SQL的程序,則需要添加下面的pom依賴:

<!-- Flink 1.9之前的old planner -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>
<!-- 新的Blink planner -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

另外,如果需要實現自定義的格式(比如和kafka交互)或者用戶自定義函數,需要添加如下依賴:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-common</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

Table API & SQL的編程模板

所有的Table API&SQL的程序(無論是批處理還是流處理)都有着相同的形式,下面將給出通用的編程結構形式:

// 創建一個TableEnvironment對象,指定planner、處理模式(batch、streaming)
TableEnvironment tableEnv = ...; 
// 創建一個表
tableEnv.connect(...).createTemporaryTable("table1");
// 注冊一個外部的表
tableEnv.connect(...).createTemporaryTable("outputTable");
// 通過Table API的查詢創建一個Table 對象
Table tapiResult = tableEnv.from("table1").select(...);
// 通過SQL查詢的查詢創建一個Table 對象
Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
// 將結果寫入TableSink
tapiResult.insertInto("outputTable");
// 執行
tableEnv.execute("java_job");

注意:Table API & SQL的查詢可以相互集成,另外還可以在DataStream或者DataSet中使用Table API & SQL的API,實現DataStreams、 DataSet與Table之間的相互轉換。

創建TableEnvironment

TableEnvironment是Table API & SQL程序的一個入口,主要包括如下的功能:

  • 在內部的catalog中注冊Table
  • 注冊catalog
  • 加載可插拔模塊
  • 執行SQL查詢
  • 注冊用戶定義函數
  • DataStreamDataSet與Table之間的相互轉換
  • 持有對 ExecutionEnvironmentStreamExecutionEnvironment的引用

一個Table必定屬於一個具體的TableEnvironment,不可以將不同TableEnvironment的表放在一起使用(比如join,union等操作)。

TableEnvironment是通過調用 BatchTableEnvironment.create() 或者StreamTableEnvironment.create()的靜態方法進行創建的。另外,默認兩個planner的jar包都存在與classpath下,所有需要明確指定使用的planner。

// **********************
// FLINK 流處理查詢
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
//或者TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);

// ******************
// FLINK 批處理查詢
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;

ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

// **********************
// BLINK 流處理查詢
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// 或者 TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);

// ******************
// BLINK 批處理查詢
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

在catalog中創建表

臨時表與永久表

表可以分為臨時表和永久表兩種,其中永久表需要一個catalog(比如Hive的Metastore)倆維護表的元數據信息,一旦永久表被創建,只要連接到該catalog就可以訪問該表,只有顯示刪除永久表,該表才可以被刪除。臨時表的生命周期是Flink Session,這些表不能夠被其他的Flink Session訪問,這些表不屬於任何的catalog或者數據庫,如果與臨時表相對應的數據庫被刪除了,該臨時表也不會被刪除。

創建表

虛表(Virtual Tables)

一個Table對象相當於SQL中的視圖(虛表),它封裝了一個邏輯執行計划,可以通過一個catalog創建,具體如下:

// 獲取一個TableEnvironment
TableEnvironment tableEnv = ...; 
// table對象,查詢的結果集
Table projTable = tableEnv.from("X").select(...);
// 注冊一個表,名稱為 "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);

外部數據源表(Connector Tables)

可以把外部的數據源注冊成表,比如可以讀取MySQL數據庫數據、Kafka數據等

tableEnvironment
  .connect(...)
  .withFormat(...)
  .withSchema(...)
  .inAppendMode()
  .createTemporaryTable("MyTable")

擴展創建表的標識屬性

表的注冊總是包含三部分標識屬性:catalog、數據庫、表名。用戶可以在內部設置一個catalog和一個數據庫作為當前的catalog和數據庫,所以對於catalog和數據庫這兩個標識屬性是可選的,即如果不指定,默認使用的是“current catalog”和 “current database”。

TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");//設置catalog
tEnv.useDatabase("custom_database");//設置數據庫
Table table = ...;
// 注冊一個名為exampleView的視圖,catalog名為custom_catalog
// 數據庫的名為custom_database
tableEnv.createTemporaryView("exampleView", table);

// 注冊一個名為exampleView的視圖,catalog的名為custom_catalog
// 數據庫的名為other_database
tableEnv.createTemporaryView("other_database.exampleView", table);
 
// 注冊一個名為'View'的視圖,catalog的名稱為custom_catalog
// 數據庫的名為custom_database,'View'是保留關鍵字,需要使用``(反引號)
tableEnv.createTemporaryView("`View`", table);

// 注冊一個名為example.View的視圖,catalog的名為custom_catalog,
// 數據庫名為custom_database
tableEnv.createTemporaryView("`example.View`", table);

// 注冊一個名為'exampleView'的視圖, catalog的名為'other_catalog'
// 數據庫名為other_database' 
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);

查詢表

Table API

Table API是一個集成Scala與Java語言的查詢API,與SQL相比,它的查詢不是一個標准的SQL語句,而是由一步一步的操作組成的。如下展示了一個使用Table API實現一個簡單的聚合查詢。

// 獲取TableEnvironment
TableEnvironment tableEnv = ...;
//注冊Orders表

// 查詢注冊的表
Table orders = tableEnv.from("Orders");
// 計算操作
Table revenue = orders
  .filter("cCountry === 'FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");

SQL

Flink SQL依賴於Apache Calcite,其實現了標准的SQL語法,如下案例:

// 獲取TableEnvironment
TableEnvironment tableEnv = ...;

//注冊Orders表

// 計算邏輯同上面的Table API
Table revenue = tableEnv.sqlQuery(
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );

// 注冊"RevenueFrance"外部輸出表
// 計算結果插入"RevenueFrance"表
tableEnv.sqlUpdate(
    "INSERT INTO RevenueFrance " +
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );

輸出表

一個表通過將其寫入到TableSink,然后進行輸出。TableSink是一個通用的支持多種文件格式(CSV、Parquet, Avro)和多種外部存儲系統(JDBC, Apache HBase, Apache Cassandra, Elasticsearch)以及多種消息對列(Apache Kafka, RabbitMQ)的接口。

批處理的表只能被寫入到 BatchTableSink,流處理的表需要指明AppendStreamTableSink、RetractStreamTableSink或者 UpsertStreamTableSink

// 獲取TableEnvironment
TableEnvironment tableEnv = ...;

// 創建輸出表
final Schema schema = new Schema()
    .field("a", DataTypes.INT())
    .field("b", DataTypes.STRING())
    .field("c", DataTypes.LONG());

tableEnv.connect(new FileSystem("/path/to/file"))
    .withFormat(new Csv().fieldDelimiter('|').deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("CsvSinkTable");

// 計算結果表
Table result = ...
// 輸出結果表到注冊的TableSink
result.insertInto("CsvSinkTable");

Table API & SQL底層的轉換與執行

上文提到了Flink提供了兩種planner,分別為old planner和Blink planner,對於不同的planner而言,Table API & SQL底層的執行與轉換是有所不同的。

Old planner

根據是流處理作業還是批處理作業,Table API &SQL會被轉換成DataStream或者DataSet程序。一個查詢在內部表示為一個邏輯查詢計划,會被轉換為兩個階段:

  • 1.邏輯查詢計划優化
  • 2.轉換成DataStream或者DataSet程序

上面的兩個階段只有下面的操作被執行時才會被執行:

  • 當一個表被輸出到TableSink時,比如調用了Table.insertInto()方法
  • 當執行更新查詢時,比如調用TableEnvironment.sqlUpdate()方法
  • 當一個表被轉換為DataStream或者DataSet時

一旦執行上述兩個階段,Table API & SQL的操作會被看做是普通的DataStream或者DataSet程序,所以當StreamExecutionEnvironment.execute()或者ExecutionEnvironment.execute() 被調用時,會執行轉換后的程序。

Blink planner

無論是批處理作業還是流處理作業,如果使用的是Blink planner,底層都會被轉換為DataStream程序。在一個查詢在內部表示為一個邏輯查詢計划,會被轉換成兩個階段:

  • 1.邏輯查詢計划優化
  • 2.轉換成DataStream程序

對於TableEnvironment and StreamTableEnvironment而言,一個查詢的轉換是不同的

首先對於TableEnvironment,當TableEnvironment.execute()方法執行時,Table API & SQL的查詢才會被轉換,因為TableEnvironment會將多個sink優化為一個DAG。

對於StreamTableEnvironment,轉換發生的時間與old planner相同。

與DataStream & DataSet API集成

對於Old planner與Blink planner而言,只要是流處理的操作,都可以與DataStream API集成,僅僅只有Old planner才可以與DataSet API集成,由於Blink planner的批處理作業會被轉換成DataStream程序,所以不能夠與DataSet API集成。值得注意的是,下面提到的table與DataSet之間的轉換僅適用於Old planner。

Table API & SQL的查詢很容易與DataStream或者DataSet程序集成,並可以將Table API & SQL的查詢嵌入DataStream或者DataSet程序中。DataStream或者DataSet可以轉換成表,反之,表也可以被轉換成DataStream或者DataSet。

從DataStream或者DataSet中注冊臨時表(視圖)

**尖叫提示:**只能將DataStream或者DataSet轉換為臨時表(視圖)

下面演示DataStream的轉換,對於DataSet的轉換類似。

// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
DataStream<Tuple2<Long, String>> stream = ...
// 將DataStream注冊為一個名為myTable的視圖,其中字段分別為"f0", "f1"
tableEnv.createTemporaryView("myTable", stream);
// 將DataStream注冊為一個名為myTable2的視圖,其中字段分別為"myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, "myLong, myString");

將DataStream或者DataSet轉化為Table對象

可以直接將DataStream或者DataSet轉換為Table對象,之后可以使用Table API進行查詢操作。下面演示DataStream的轉換,對於DataSet的轉換類似。

// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
DataStream<Tuple2<Long, String>> stream = ...
// 將DataStream轉換為Table對象,默認的字段為"f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);
// 將DataStream轉換為Table對象,默認的字段為"myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");

將表轉換為DataStream或者DataSet

當將Table轉為DataStream或者DataSet時,需要指定DataStream或者DataSet的數據類型。通常最方便的數據類型是row類型,Flink提供了很多的數據類型供用戶選擇,具體包括Row、POJO、樣例類、Tuple和原子類型。

將表轉換為DataStream

一個流處理查詢的結果是動態變化的,所以將表轉為DataStream時需要指定一個更新模式,共有兩種模式:Append ModeRetract Mode

  • Append Mode

如果動態表僅只有Insert操作,即之前輸出的結果不會被更新,則使用該模式。如果更新或刪除操作使用追加模式會失敗報錯

  • Retract Mode

始終可以使用此模式。返回值是boolean類型。它用true或false來標記數據的插入和撤回,返回true代表數據插入,false代表數據的撤回。

// 獲取StreamTableEnvironment. 
StreamTableEnvironment tableEnv = ...; 
// 包含兩個字段的表(String name, Integer age)
Table table = ...
// 將表轉為DataStream,使用Append Mode追加模式,數據類型為Row
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// 將表轉為DataStream,使用Append Mode追加模式,數據類型為定義好的TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toAppendStream(table, tupleType);
// 將表轉為DataStream,使用的模式為Retract Mode撤回模式,類型為Row
// 對於轉換后的DataStream<Tuple2<Boolean, X>>,X表示流的數據類型,
// boolean值表示數據改變的類型,其中INSERT返回true,DELETE返回的是false
DataStream<Tuple2<Boolean, Row>> retractStream = 
  tableEnv.toRetractStream(table, Row.class);

將表轉換為DataSet

// 獲取BatchTableEnvironment
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
// 包含兩個字段的表(String name, Integer age)
Table table = ...
// 將表轉為DataSet數據類型為Row
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
// 將表轉為DataSet,通過TypeInformation定義Tuple2<String, Integer>數據類型
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataSet<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toDataSet(table, tupleType);

表的Schema與數據類型之間的映射

表的Schema與數據類型之間的映射有兩種方式:分別是基於字段下標位置的映射和基於字段名稱的映射。

基於字段下標位置的映射

該方式是按照字段的順序進行一一映射,使用方式如下:

// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
DataStream<Tuple2<Long, Integer>> stream = ...
// 將DataStream轉為表,默認的字段名為"f0"和"f1"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉為表,選取tuple的第一個元素,指定一個名為"myLong"的字段名
Table table = tableEnv.fromDataStream(stream, "myLong");
// 將DataStream轉為表,為tuple的第一個元素指定名為"myLong",為第二個元素指定myInt的字段名
Table table = tableEnv.fromDataStream(stream, "myLong, myInt");

基於字段名稱的映射

基於字段名稱的映射方式支持任意的數據類型包括POJO類型,可以很靈活地定義表Schema映射,所有的字段被映射成一個具體的字段名稱,同時也可以使用"as"為字段起一個別名。其中Tuple元素的第一個元素為f0,第二個元素為f1,以此類推。

// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
DataStream<Tuple2<Long, Integer>> stream = ...
// 將DataStream轉為表,默認的字段名為"f0"和"f1"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉為表,選擇tuple的第二個元素,指定一個名為"f1"的字段名
Table table = tableEnv.fromDataStream(stream, "f1");
// 將DataStream轉為表,交換字段的順序
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// 將DataStream轉為表,交換字段的順序,並為f1起別名為"myInt",為f0起別名為"myLong
Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");

原子類型

Flink將Integer, Double, String或者普通的類型稱之為原子類型,一個數據類型為原子類型的DataStream或者DataSet可以被轉成單個字段屬性的表,這個字段的類型與DataStream或者DataSet的數據類型一致,這個字段的名稱可以進行指定。

//獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
// 數據類型為原子類型Long
DataStream<Long> stream = ...
// 將DataStream轉為表,默認的字段名為"f0"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉為表,指定字段名為myLong"
Table table = tableEnv.fromDataStream(stream, "myLong");

Tuple類型

Tuple類型的DataStream或者DataSet都可以轉為表,可以重新設定表的字段名(即根據tuple元素的位置進行一一映射,轉為表之后,每個元素都有一個別名),如果不為字段指定名稱,則使用默認的名稱(java語言默認的是f0,f1,scala默認的是_1),用戶也可以重新排列字段的順序,並為每個字段起一個別名。

// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
//Tuple2<Long, String>類型的DataStream
DataStream<Tuple2<Long, String>> stream = ...
// 將DataStream轉為表,默認的字段名為 "f0", "f1"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉為表,指定字段名為 "myLong", "myString"(按照Tuple元素的順序位置)
Table table = tableEnv.fromDataStream(stream, "myLong, myString");
// 將DataStream轉為表,指定字段名為 "f0", "f1",並且交換順序
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// 將DataStream轉為表,只選擇Tuple的第二個元素,指定字段名為"f1"
Table table = tableEnv.fromDataStream(stream, "f1");
// 將DataStream轉為表,為Tuple的第二個元素指定別名為myString,為第一個元素指定字段名為myLong
Table table = tableEnv.fromDataStream(stream, "f1 as 'myString', f0 as 'myLong'");

POJO類型

當將POJO類型的DataStream或者DataSet轉為表時,如果不指定表名,則默認使用的是POJO字段本身的名稱,原始字段名稱的映射需要指定原始字段的名稱,可以為其起一個別名,也可以調換字段的順序,也可以只選擇部分的字段。

// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
//數據類型為Person的POJO類型,字段包括"name"和"age"
DataStream<Person> stream = ...
// 將DataStream轉為表,默認的字段名稱為"age", "name"
Table table = tableEnv.fromDataStream(stream);
//  將DataStream轉為表,為"age"字段指定別名myAge, 為"name"字段指定別名myName
Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName");
//  將DataStream轉為表,只選擇一個name字段
Table table = tableEnv.fromDataStream(stream, "name");
//  將DataStream轉為表,只選擇一個name字段,並起一個別名myName
Table table = tableEnv.fromDataStream(stream, "name as myName");

Row類型

Row類型的DataStream或者DataSet轉為表的過程中,可以根據字段的位置或者字段名稱進行映射,同時也可以為字段起一個別名,或者只選擇部分字段。

// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
// Row類型的DataStream,通過RowTypeInfo指定兩個字段"name"和"age"
DataStream<Row> stream = ...
// 將DataStream轉為表,默認的字段名為原始字段名"name"和"age"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉為表,根據位置映射,為第一個字段指定myName別名,為第二個字段指定myAge別名
Table table = tableEnv.fromDataStream(stream, "myName, myAge");
// 將DataStream轉為表,根據字段名映射,為name字段起別名myName,為age字段起別名myAge
Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
// 將DataStream轉為表,根據字段名映射,只選擇name字段
Table table = tableEnv.fromDataStream(stream, "name");
// 將DataStream轉為表,根據字段名映射,只選擇name字段,並起一個別名"myName"
Table table = tableEnv.fromDataStream(stream, "name as myName");

查詢優化

Old planner

Apache Flink利用Apache Calcite來優化和轉換查詢。當前執行的優化包括投影和過濾器下推,去相關子查詢以及其他類型的查詢重寫。Old Planner目前不支持優化JOIN的順序,而是按照查詢中定義的順序執行它們。

通過提供一個CalciteConfig對象,可以調整在不同階段應用的優化規則集。這可通過調用CalciteConfig.createBuilder()方法來進行創建,並通過調用tableEnv.getConfig.setPlannerConfig(calciteConfig)方法將該對象傳遞給TableEnvironment。

Blink planner

Apache Flink利用並擴展了Apache Calcite來執行復雜的查詢優化。這包括一系列基於規則和基於成本的優化(cost_based),例如:

  • 基於Apache Calcite的去相關子查詢
  • 投影裁剪
  • 分區裁剪
  • 過濾器謂詞下推
  • 子計划重復數據刪除以避免重復計算
  • 特殊的子查詢重寫,包括兩個部分:
    • 將IN和EXISTS轉換為左半聯接( left semi-join)
    • 將NOT IN和NOT EXISTS轉換為left anti-join
  • 調整join的順序,需要啟用 table.optimizer.join-reorder-enabled

注意: IN / EXISTS / NOT IN / NOT EXISTS當前僅在子查詢重寫的結合條件下受支持。

查詢優化器不僅基於計划,而且還可以基於數據源的統計信息以及每個操作的細粒度開銷(例如io,cpu,網絡和內存),從而做出更加明智且合理的優化決策。

高級用戶可以通過CalciteConfig對象提供自定義優化規則,通過調用tableEnv.getConfig.setPlannerConfig(calciteConfig),將參數傳遞給TableEnvironment。

查看執行計划

SQL語言支持通過explain來查看某條SQL的執行計划,Flink Table API也可以通過調用explain()方法來查看具體的執行計划。該方法返回一個字符串用來描述三個部分計划,分別為:

  1. 關系查詢的抽象語法樹,即未優化的邏輯查詢計划,
  2. 優化的邏輯查詢計划
  3. 實際執行計划
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1"hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1"hello"));
Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
  .where("LIKE(word, 'F%')")
  .unionAll(table2);
// 查看執行計划
String explanation = tEnv.explain(table);
System.out.println(explanation);

執行計划的結果為:

== 抽象語法樹 ==
LogicalUnion(all=[true])
  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
    FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
  FlinkLogicalDataStreamScan(id=[2], fields=[count, word])

== 優化的邏輯執行計划 ==
DataStreamUnion(all=[true], union all=[count, word])
  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
    DataStreamScan(id=[1], fields=[count, word])
  DataStreamScan(id=[2], fields=[count, word])

== 物理執行計划 ==
Stage 1 : Data Source
 content : collect elements with CollectionInputFormat

Stage 2 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 3 : Operator
  content : from: (count, word)
  ship_strategy : REBALANCE

  Stage 4 : Operator
   content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
   ship_strategy : FORWARD

   Stage 5 : Operator
    content : from: (count, word)
    ship_strategy : REBALANCE

小結

本文主要介紹了Flink TableAPI &SQL,首先介紹了Flink Table API &SQL的基本概念 ,然后介紹了構建Flink Table API & SQL程序所需要的依賴,接着介紹了Flink的兩種planner,還介紹了如何注冊表以及DataStream、DataSet與表的相互轉換,最后介紹了Flink的兩種planner對應的查詢優化並給出了一個查看執行計划的案例。

往期 精彩回顧




Flink DataSet API編程指南
Flink DataStream API編程指南
Flink DataStream API 中的多面手——Process Function詳解
透過窗口觀無限數據流——Flink的Window全面解析


我就知道你在看!

本文分享自微信公眾號 - 大數據技術與數倉(gh_95306769522d)。
如有侵權,請聯系 support@oschina.cn 刪除。
本文參與“OSC源創計划”,歡迎正在閱讀的你也加入,一起分享。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM