
Apache Flink提供了兩種頂層的關系型API,分別為Table API和SQL,Flink通過Table API&SQL實現了批流統一。其中Table API是用於Scala和Java的語言集成查詢API,它允許以非常直觀的方式組合關系運算符(例如select,where和join)的查詢。Flink SQL基於Apache Calcite 實現了標准的SQL,用戶可以使用標准的SQL處理數據集。Table API和SQL與Flink的DataStream和DataSet API緊密集成在一起,用戶可以實現相互轉化,比如可以將DataStream或者DataSet注冊為table進行操作數據。值得注意的是,Table API and SQL目前尚未完全完善,還在積極的開發中,所以並不是所有的算子操作都可以通過其實現。
依賴
從Flink1.9開始,Flink為Table & SQL API提供了兩種planner,分別為Blink planner和old planner,其中old planner是在Flink1.9之前的版本使用。主要區別如下:
尖叫提示:對於生產環境,目前推薦使用old planner.
-
flink-table-common
: 通用模塊,包含 Flink Planner 和 Blink Planner 一些共用的代碼 -
flink-table-api-java
: java語言的Table & SQL API,僅針對table(處於早期的開發階段,不推薦使用) -
flink-table-api-scala
: scala語言的Table & SQL API,僅針對table(處於早期的開發階段,不推薦使用) -
flink-table-api-java-bridge
: java語言的Table & SQL API,支持DataStream/DataSet API(推薦使用) -
flink-table-api-scala-bridge
: scala語言的Table & SQL API,支持DataStream/DataSet API(推薦使用) -
flink-table-planner
:planner 和runtime. planner為Flink1,9之前的old planner(推薦使用) -
flink-table-planner-blink
: 新的Blink planner. -
flink-table-runtime-blink
: 新的Blink runtime. -
flink-table-uber
: 將上述的API模塊及old planner打成一個jar包,形如flink-table-*.jar,位與/lib目錄下 -
flink-table-uber-blink
:將上述的API模塊及Blink 模塊打成一個jar包,形如fflink-table-blink-*.jar,位與/lib目錄下
Blink planner & old planner
Blink planner和old planner有許多不同的特點,具體列舉如下:
-
Blink planner將批處理作業看做是流處理作業的特例。所以,不支持Table 與DataSet之間的轉換,批處理的作業也不會被轉成DataSet程序,而是被轉為DataStream程序。 -
Blink planner不支持 BatchTableSource
,使用的是有界的StreamTableSource。 -
Blink planner僅支持新的 Catalog
,不支持ExternalCatalog
(已過時)。 -
對於FilterableTableSource的實現,兩種Planner是不同的。old planner會謂詞下推到 PlannerExpression
(未來會被移除),而Blink planner 會謂詞下推到Expression
(表示一個產生計算結果的邏輯樹)。 -
僅僅Blink planner支持key-value形式的配置,即通過Configuration進行參數設置。 -
關於PlannerConfig的實現,兩種planner有所不同。 -
Blink planner 會將多個sink優化成一個DAG(僅支持TableEnvironment,StreamTableEnvironment不支持),old planner總是將每一個sink優化成一個新的DAG,每一個DAG都是相互獨立的。 -
old planner不支持catalog統計,Blink planner支持catalog統計。
Flink Table & SQL程序的pom依賴
根據使用的語言不同,可以選擇下面的依賴,包括scala版和java版,如下:
<!-- java版 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
<!-- scala版 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
除此之外,如果需要在本地的IDE中運行Table API & SQL的程序,則需要添加下面的pom依賴:
<!-- Flink 1.9之前的old planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
<!-- 新的Blink planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
另外,如果需要實現自定義的格式(比如和kafka交互)或者用戶自定義函數,需要添加如下依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
Table API & SQL的編程模板
所有的Table API&SQL的程序(無論是批處理還是流處理)都有着相同的形式,下面將給出通用的編程結構形式:
// 創建一個TableEnvironment對象,指定planner、處理模式(batch、streaming)
TableEnvironment tableEnv = ...;
// 創建一個表
tableEnv.connect(...).createTemporaryTable("table1");
// 注冊一個外部的表
tableEnv.connect(...).createTemporaryTable("outputTable");
// 通過Table API的查詢創建一個Table 對象
Table tapiResult = tableEnv.from("table1").select(...);
// 通過SQL查詢的查詢創建一個Table 對象
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
// 將結果寫入TableSink
tapiResult.insertInto("outputTable");
// 執行
tableEnv.execute("java_job");
注意:Table API & SQL的查詢可以相互集成,另外還可以在DataStream或者DataSet中使用Table API & SQL的API,實現DataStreams、 DataSet與Table之間的相互轉換。
創建TableEnvironment
TableEnvironment是Table API & SQL程序的一個入口,主要包括如下的功能:
-
在內部的catalog中注冊Table -
注冊catalog -
加載可插拔模塊 -
執行SQL查詢 -
注冊用戶定義函數 -
DataStream
、DataSet
與Table之間的相互轉換 -
持有對 ExecutionEnvironment
、StreamExecutionEnvironment
的引用
一個Table必定屬於一個具體的TableEnvironment,不可以將不同TableEnvironment的表放在一起使用(比如join,union等操作)。
TableEnvironment是通過調用 BatchTableEnvironment.create()
或者StreamTableEnvironment.create()的靜態方法進行創建的。另外,默認兩個planner的jar包都存在與classpath下,所有需要明確指定使用的planner。
// **********************
// FLINK 流處理查詢
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
//或者TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
// ******************
// FLINK 批處理查詢
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
// **********************
// BLINK 流處理查詢
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// 或者 TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
// ******************
// BLINK 批處理查詢
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
在catalog中創建表
臨時表與永久表
表可以分為臨時表和永久表兩種,其中永久表需要一個catalog(比如Hive的Metastore)倆維護表的元數據信息,一旦永久表被創建,只要連接到該catalog就可以訪問該表,只有顯示刪除永久表,該表才可以被刪除。臨時表的生命周期是Flink Session,這些表不能夠被其他的Flink Session訪問,這些表不屬於任何的catalog或者數據庫,如果與臨時表相對應的數據庫被刪除了,該臨時表也不會被刪除。
創建表
虛表(Virtual Tables)
一個Table對象相當於SQL中的視圖(虛表),它封裝了一個邏輯執行計划,可以通過一個catalog創建,具體如下:
// 獲取一個TableEnvironment
TableEnvironment tableEnv = ...;
// table對象,查詢的結果集
Table projTable = tableEnv.from("X").select(...);
// 注冊一個表,名稱為 "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);
外部數據源表(Connector Tables)
可以把外部的數據源注冊成表,比如可以讀取MySQL數據庫數據、Kafka數據等
tableEnvironment
.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MyTable")
擴展創建表的標識屬性
表的注冊總是包含三部分標識屬性:catalog、數據庫、表名。用戶可以在內部設置一個catalog和一個數據庫作為當前的catalog和數據庫,所以對於catalog和數據庫這兩個標識屬性是可選的,即如果不指定,默認使用的是“current catalog”和 “current database”。
TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");//設置catalog
tEnv.useDatabase("custom_database");//設置數據庫
Table table = ...;
// 注冊一個名為exampleView的視圖,catalog名為custom_catalog
// 數據庫的名為custom_database
tableEnv.createTemporaryView("exampleView", table);
// 注冊一個名為exampleView的視圖,catalog的名為custom_catalog
// 數據庫的名為other_database
tableEnv.createTemporaryView("other_database.exampleView", table);
// 注冊一個名為'View'的視圖,catalog的名稱為custom_catalog
// 數據庫的名為custom_database,'View'是保留關鍵字,需要使用``(反引號)
tableEnv.createTemporaryView("`View`", table);
// 注冊一個名為example.View的視圖,catalog的名為custom_catalog,
// 數據庫名為custom_database
tableEnv.createTemporaryView("`example.View`", table);
// 注冊一個名為'exampleView'的視圖, catalog的名為'other_catalog'
// 數據庫名為other_database'
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);
查詢表
Table API
Table API是一個集成Scala與Java語言的查詢API,與SQL相比,它的查詢不是一個標准的SQL語句,而是由一步一步的操作組成的。如下展示了一個使用Table API實現一個簡單的聚合查詢。
// 獲取TableEnvironment
TableEnvironment tableEnv = ...;
//注冊Orders表
// 查詢注冊的表
Table orders = tableEnv.from("Orders");
// 計算操作
Table revenue = orders
.filter("cCountry === 'FRANCE'")
.groupBy("cID, cName")
.select("cID, cName, revenue.sum AS revSum");
SQL
Flink SQL依賴於Apache Calcite,其實現了標准的SQL語法,如下案例:
// 獲取TableEnvironment
TableEnvironment tableEnv = ...;
//注冊Orders表
// 計算邏輯同上面的Table API
Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// 注冊"RevenueFrance"外部輸出表
// 計算結果插入"RevenueFrance"表
tableEnv.sqlUpdate(
"INSERT INTO RevenueFrance " +
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
輸出表
一個表通過將其寫入到TableSink,然后進行輸出。TableSink是一個通用的支持多種文件格式(CSV、Parquet, Avro)和多種外部存儲系統(JDBC, Apache HBase, Apache Cassandra, Elasticsearch)以及多種消息對列(Apache Kafka, RabbitMQ)的接口。
批處理的表只能被寫入到 BatchTableSink
,流處理的表需要指明AppendStreamTableSink、RetractStreamTableSink或者 UpsertStreamTableSink
// 獲取TableEnvironment
TableEnvironment tableEnv = ...;
// 創建輸出表
final Schema schema = new Schema()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.LONG());
tableEnv.connect(new FileSystem("/path/to/file"))
.withFormat(new Csv().fieldDelimiter('|').deriveSchema())
.withSchema(schema)
.createTemporaryTable("CsvSinkTable");
// 計算結果表
Table result = ...
// 輸出結果表到注冊的TableSink
result.insertInto("CsvSinkTable");
Table API & SQL底層的轉換與執行
上文提到了Flink提供了兩種planner,分別為old planner和Blink planner,對於不同的planner而言,Table API & SQL底層的執行與轉換是有所不同的。
Old planner
根據是流處理作業還是批處理作業,Table API &SQL會被轉換成DataStream或者DataSet程序。一個查詢在內部表示為一個邏輯查詢計划,會被轉換為兩個階段:
-
1.邏輯查詢計划優化 -
2.轉換成DataStream或者DataSet程序
上面的兩個階段只有下面的操作被執行時才會被執行:
-
當一個表被輸出到TableSink時,比如調用了Table.insertInto()方法 -
當執行更新查詢時,比如調用TableEnvironment.sqlUpdate()方法 -
當一個表被轉換為DataStream或者DataSet時
一旦執行上述兩個階段,Table API & SQL的操作會被看做是普通的DataStream或者DataSet程序,所以當StreamExecutionEnvironment.execute()
或者ExecutionEnvironment.execute()
被調用時,會執行轉換后的程序。
Blink planner
無論是批處理作業還是流處理作業,如果使用的是Blink planner,底層都會被轉換為DataStream程序。在一個查詢在內部表示為一個邏輯查詢計划,會被轉換成兩個階段:
-
1.邏輯查詢計划優化 -
2.轉換成DataStream程序
對於TableEnvironment
and StreamTableEnvironment
而言,一個查詢的轉換是不同的
首先對於TableEnvironment,當TableEnvironment.execute()方法執行時,Table API & SQL的查詢才會被轉換,因為TableEnvironment會將多個sink優化為一個DAG。
對於StreamTableEnvironment,轉換發生的時間與old planner相同。
與DataStream & DataSet API集成
對於Old planner與Blink planner而言,只要是流處理的操作,都可以與DataStream API集成,僅僅只有Old planner才可以與DataSet API集成,由於Blink planner的批處理作業會被轉換成DataStream程序,所以不能夠與DataSet API集成。值得注意的是,下面提到的table與DataSet之間的轉換僅適用於Old planner。
Table API & SQL的查詢很容易與DataStream或者DataSet程序集成,並可以將Table API & SQL的查詢嵌入DataStream或者DataSet程序中。DataStream或者DataSet可以轉換成表,反之,表也可以被轉換成DataStream或者DataSet。
從DataStream或者DataSet中注冊臨時表(視圖)
**尖叫提示:**只能將DataStream或者DataSet轉換為臨時表(視圖)
下面演示DataStream的轉換,對於DataSet的轉換類似。
// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
DataStream<Tuple2<Long, String>> stream = ...
// 將DataStream注冊為一個名為myTable的視圖,其中字段分別為"f0", "f1"
tableEnv.createTemporaryView("myTable", stream);
// 將DataStream注冊為一個名為myTable2的視圖,其中字段分別為"myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, "myLong, myString");
將DataStream或者DataSet轉化為Table對象
可以直接將DataStream或者DataSet轉換為Table對象,之后可以使用Table API進行查詢操作。下面演示DataStream的轉換,對於DataSet的轉換類似。
// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
DataStream<Tuple2<Long, String>> stream = ...
// 將DataStream轉換為Table對象,默認的字段為"f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);
// 將DataStream轉換為Table對象,默認的字段為"myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
將表轉換為DataStream或者DataSet
當將Table轉為DataStream或者DataSet時,需要指定DataStream或者DataSet的數據類型。通常最方便的數據類型是row類型,Flink提供了很多的數據類型供用戶選擇,具體包括Row、POJO、樣例類、Tuple和原子類型。
將表轉換為DataStream
一個流處理查詢的結果是動態變化的,所以將表轉為DataStream時需要指定一個更新模式,共有兩種模式:Append Mode和Retract Mode。
-
Append Mode
如果動態表僅只有Insert操作,即之前輸出的結果不會被更新,則使用該模式。如果更新或刪除操作使用追加模式會失敗報錯
-
Retract Mode
始終可以使用此模式。返回值是boolean類型。它用true或false來標記數據的插入和撤回,返回true代表數據插入,false代表數據的撤回。
// 獲取StreamTableEnvironment.
StreamTableEnvironment tableEnv = ...;
// 包含兩個字段的表(String name, Integer age)
Table table = ...
// 將表轉為DataStream,使用Append Mode追加模式,數據類型為Row
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// 將表轉為DataStream,使用Append Mode追加模式,數據類型為定義好的TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple =
tableEnv.toAppendStream(table, tupleType);
// 將表轉為DataStream,使用的模式為Retract Mode撤回模式,類型為Row
// 對於轉換后的DataStream<Tuple2<Boolean, X>>,X表示流的數據類型,
// boolean值表示數據改變的類型,其中INSERT返回true,DELETE返回的是false
DataStream<Tuple2<Boolean, Row>> retractStream =
tableEnv.toRetractStream(table, Row.class);
將表轉換為DataSet
// 獲取BatchTableEnvironment
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
// 包含兩個字段的表(String name, Integer age)
Table table = ...
// 將表轉為DataSet數據類型為Row
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
// 將表轉為DataSet,通過TypeInformation定義Tuple2<String, Integer>數據類型
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataSet<Tuple2<String, Integer>> dsTuple =
tableEnv.toDataSet(table, tupleType);
表的Schema與數據類型之間的映射
表的Schema與數據類型之間的映射有兩種方式:分別是基於字段下標位置的映射和基於字段名稱的映射。
基於字段下標位置的映射
該方式是按照字段的順序進行一一映射,使用方式如下:
// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
DataStream<Tuple2<Long, Integer>> stream = ...
// 將DataStream轉為表,默認的字段名為"f0"和"f1"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉為表,選取tuple的第一個元素,指定一個名為"myLong"的字段名
Table table = tableEnv.fromDataStream(stream, "myLong");
// 將DataStream轉為表,為tuple的第一個元素指定名為"myLong",為第二個元素指定myInt的字段名
Table table = tableEnv.fromDataStream(stream, "myLong, myInt");
基於字段名稱的映射
基於字段名稱的映射方式支持任意的數據類型包括POJO類型,可以很靈活地定義表Schema映射,所有的字段被映射成一個具體的字段名稱,同時也可以使用"as"為字段起一個別名。其中Tuple元素的第一個元素為f0,第二個元素為f1,以此類推。
// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
DataStream<Tuple2<Long, Integer>> stream = ...
// 將DataStream轉為表,默認的字段名為"f0"和"f1"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉為表,選擇tuple的第二個元素,指定一個名為"f1"的字段名
Table table = tableEnv.fromDataStream(stream, "f1");
// 將DataStream轉為表,交換字段的順序
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// 將DataStream轉為表,交換字段的順序,並為f1起別名為"myInt",為f0起別名為"myLong
Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");
原子類型
Flink將Integer
, Double
, String
或者普通的類型稱之為原子類型,一個數據類型為原子類型的DataStream或者DataSet可以被轉成單個字段屬性的表,這個字段的類型與DataStream或者DataSet的數據類型一致,這個字段的名稱可以進行指定。
//獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
// 數據類型為原子類型Long
DataStream<Long> stream = ...
// 將DataStream轉為表,默認的字段名為"f0"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉為表,指定字段名為myLong"
Table table = tableEnv.fromDataStream(stream, "myLong");
Tuple類型
Tuple類型的DataStream或者DataSet都可以轉為表,可以重新設定表的字段名(即根據tuple元素的位置進行一一映射,轉為表之后,每個元素都有一個別名),如果不為字段指定名稱,則使用默認的名稱(java語言默認的是f0,f1,scala默認的是_1),用戶也可以重新排列字段的順序,並為每個字段起一個別名。
// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
//Tuple2<Long, String>類型的DataStream
DataStream<Tuple2<Long, String>> stream = ...
// 將DataStream轉為表,默認的字段名為 "f0", "f1"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉為表,指定字段名為 "myLong", "myString"(按照Tuple元素的順序位置)
Table table = tableEnv.fromDataStream(stream, "myLong, myString");
// 將DataStream轉為表,指定字段名為 "f0", "f1",並且交換順序
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// 將DataStream轉為表,只選擇Tuple的第二個元素,指定字段名為"f1"
Table table = tableEnv.fromDataStream(stream, "f1");
// 將DataStream轉為表,為Tuple的第二個元素指定別名為myString,為第一個元素指定字段名為myLong
Table table = tableEnv.fromDataStream(stream, "f1 as 'myString', f0 as 'myLong'");
POJO類型
當將POJO類型的DataStream或者DataSet轉為表時,如果不指定表名,則默認使用的是POJO字段本身的名稱,原始字段名稱的映射需要指定原始字段的名稱,可以為其起一個別名,也可以調換字段的順序,也可以只選擇部分的字段。
// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
//數據類型為Person的POJO類型,字段包括"name"和"age"
DataStream<Person> stream = ...
// 將DataStream轉為表,默認的字段名稱為"age", "name"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉為表,為"age"字段指定別名myAge, 為"name"字段指定別名myName
Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName");
// 將DataStream轉為表,只選擇一個name字段
Table table = tableEnv.fromDataStream(stream, "name");
// 將DataStream轉為表,只選擇一個name字段,並起一個別名myName
Table table = tableEnv.fromDataStream(stream, "name as myName");
Row類型
Row類型的DataStream或者DataSet轉為表的過程中,可以根據字段的位置或者字段名稱進行映射,同時也可以為字段起一個別名,或者只選擇部分字段。
// 獲取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
// Row類型的DataStream,通過RowTypeInfo指定兩個字段"name"和"age"
DataStream<Row> stream = ...
// 將DataStream轉為表,默認的字段名為原始字段名"name"和"age"
Table table = tableEnv.fromDataStream(stream);
// 將DataStream轉為表,根據位置映射,為第一個字段指定myName別名,為第二個字段指定myAge別名
Table table = tableEnv.fromDataStream(stream, "myName, myAge");
// 將DataStream轉為表,根據字段名映射,為name字段起別名myName,為age字段起別名myAge
Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
// 將DataStream轉為表,根據字段名映射,只選擇name字段
Table table = tableEnv.fromDataStream(stream, "name");
// 將DataStream轉為表,根據字段名映射,只選擇name字段,並起一個別名"myName"
Table table = tableEnv.fromDataStream(stream, "name as myName");
查詢優化
Old planner
Apache Flink利用Apache Calcite來優化和轉換查詢。當前執行的優化包括投影和過濾器下推,去相關子查詢以及其他類型的查詢重寫。Old Planner目前不支持優化JOIN的順序,而是按照查詢中定義的順序執行它們。
通過提供一個CalciteConfig
對象,可以調整在不同階段應用的優化規則集。這可通過調用CalciteConfig.createBuilder()
方法來進行創建,並通過調用tableEnv.getConfig.setPlannerConfig(calciteConfig)
方法將該對象傳遞給TableEnvironment。
Blink planner
Apache Flink利用並擴展了Apache Calcite來執行復雜的查詢優化。這包括一系列基於規則和基於成本的優化(cost_based),例如:
-
基於Apache Calcite的去相關子查詢 -
投影裁剪 -
分區裁剪 -
過濾器謂詞下推 -
子計划重復數據刪除以避免重復計算 -
特殊的子查詢重寫,包括兩個部分: -
將IN和EXISTS轉換為左半聯接( left semi-join) -
將NOT IN和NOT EXISTS轉換為left anti-join -
調整join的順序,需要啟用 table.optimizer.join-reorder-enabled
注意: IN / EXISTS / NOT IN / NOT EXISTS當前僅在子查詢重寫的結合條件下受支持。
查詢優化器不僅基於計划,而且還可以基於數據源的統計信息以及每個操作的細粒度開銷(例如io,cpu,網絡和內存),從而做出更加明智且合理的優化決策。
高級用戶可以通過CalciteConfig
對象提供自定義優化規則,通過調用tableEnv.getConfig.setPlannerConfig(calciteConfig),將參數傳遞給TableEnvironment。
查看執行計划
SQL語言支持通過explain來查看某條SQL的執行計划,Flink Table API也可以通過調用explain()方法來查看具體的執行計划。該方法返回一個字符串用來描述三個部分計划,分別為:
-
關系查詢的抽象語法樹,即未優化的邏輯查詢計划, -
優化的邏輯查詢計划 -
實際執行計划
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
.where("LIKE(word, 'F%')")
.unionAll(table2);
// 查看執行計划
String explanation = tEnv.explain(table);
System.out.println(explanation);
執行計划的結果為:
== 抽象語法樹 ==
LogicalUnion(all=[true])
LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
FlinkLogicalDataStreamScan(id=[2], fields=[count, word])
== 優化的邏輯執行計划 ==
DataStreamUnion(all=[true], union all=[count, word])
DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
DataStreamScan(id=[1], fields=[count, word])
DataStreamScan(id=[2], fields=[count, word])
== 物理執行計划 ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Data Source
content : collect elements with CollectionInputFormat
Stage 3 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
Stage 4 : Operator
content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
ship_strategy : FORWARD
Stage 5 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
小結
本文主要介紹了Flink TableAPI &SQL,首先介紹了Flink Table API &SQL的基本概念 ,然后介紹了構建Flink Table API & SQL程序所需要的依賴,接着介紹了Flink的兩種planner,還介紹了如何注冊表以及DataStream、DataSet與表的相互轉換,最后介紹了Flink的兩種planner對應的查詢優化並給出了一個查看執行計划的案例。
Flink DataStream API編程指南
Flink DataStream API 中的多面手——Process Function詳解
本文分享自微信公眾號 - 大數據技術與數倉(gh_95306769522d)。
如有侵權,請聯系 support@oschina.cn 刪除。
本文參與“OSC源創計划”,歡迎正在閱讀的你也加入,一起分享。