Flink 自身提供了不同級別的抽象來支持我們開發流式或者批量處理程序,下圖描述了 Flink 支持的 4 種不同級別的抽象。
Please note that the Table API and SQL are not yet feature complete and are being actively developed. Not all operations are supported by every combination of [Table API, SQL] and [stream, batch] input.
Flink Table & SQL 功能一直處於完善開發中,且在不斷進行迭代
Flink’s SQL support is based on Apache Calcite which implements the SQL standard
Flink把 SQL 的解析、優化和執行交給了 Apache Calcite。
Queries specified in either interface have the same semantics and specify the same result regardless whether the input is a batch input (DataSet) or a stream input (DataStream).
Flink 在編程模型上提供了 DataStream 和 DataSet 兩套 API,並沒有做到事實上的批流統一,因為用戶和開發者還是開發了兩套代碼。正是因為 Flink Table & SQL 的加入,可以說 Flink 在某種程度上做到了事實上的批流一體。
Starting from Flink 1.9, Flink provides two different planner implementations for evaluating Table & SQL API programs: the Blink planner and the old planner that was available before Flink 1.9. Planners are responsible for translating relational operators into an executable, optimized Flink job. Both of the planners come with different optimization rules and runtime classes. They may also differ in the set of supported features. Attention For production use cases, we recommend the old planner that was present before Flink 1.9 for now.
從Flink 1.9開始,Flink提供了兩種不同的planner實現來評估Table&SQL API程序,分別是:Blink planner 和Flink 1.9之前的舊planner 。planner負責將關系操作轉換為可執行的,並且經過優化的任務。(參考spark catalyst理解)
注意 對於生產情況,建議使用Flink 1.9之前的舊planner。具體如下
<!-- Either... (for the old planner that was available before Flink 1.9) --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.10.0</version> <scope>provided</scope> </dependency> <!-- or.. (for the new Blink planner) --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>1.10.0</version> <scope>provided</scope> </dependency>
無論是批查詢 SQL 還是流式查詢 SQL,都會經過對應的Planner Parser 轉換成為SQL語法數SQLNode tree,然后生成邏輯執行計划 Logical Plan,邏輯執行計划在經過優化(optimized )后生成真正可以執行的物理執行計划,交給 DataSet(批查詢 SQL) 或者 DataStream (流式查詢 SQL)的 API 去執行。整體流程跟Spark、Hive也比較相似。
Table API and SQL的編程模型
對於批處理和流式處理, Table API and SQL 編程都遵從相同的模式。
// create a TableEnvironment for specific planner batch or streaming // 創建環境TableEnvironment TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section // create a Table // 獲取輸入表 tableEnv.connect(...).createTemporaryTable("table1"); // register an output Table // 注冊一個輸出表 tableEnv.connect(...).createTemporaryTable("outputTable"); // create a Table object from a Table API query // 通過Table API的方式生成表對象 Table tapiResult = tableEnv.from("table1").select(...); // create a Table object from a SQL query // 通過sql查詢的方式生成表對象 Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table1 ... "); // emit a Table API result Table to a TableSink, same for SQL result // 將生成的表結果輸出 tapiResult.insertInto("outputTable"); // execute tableEnv.execute("java_job");
有沒有spark dataset編程和sql編程的感覺?
創建TableEnvironment
TableEnvironment主要負責以下:
Registering a Table in the internal catalog --在內部catalog中注冊Table Registering catalogs --注冊catalogs Loading pluggable modules --加載插件模塊 Executing SQL queries --執行sql查詢 Registering a user-defined (scalar, table, or aggregation) function --注冊udf函數 Converting a DataStream or DataSet into a Table --將datastream或dataset轉換為Table Holding a reference to an ExecutionEnvironment or StreamExecutionEnvironment --持有對ExecutionEnvironment或StreamExecutionEnvironment的引用
下面分別就Flink流式查詢、Flink批次查詢、Blink流式查詢、Blink批次查詢給出了示例
其中Flink和Blink的區別在於Planner的選擇,我們在使用中也應當顯示給出用哪種Planner
If both planner jars are on the classpath (the default behavior), you should explicitly set which planner to use in the current program.
// ********************** // FLINK STREAMING QUERY // ********************** import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.java.StreamTableEnvironment; EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings); // or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings); // ****************** // FLINK BATCH QUERY // ****************** import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.java.BatchTableEnvironment; ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv); // ********************** // BLINK STREAMING QUERY // ********************** import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.java.StreamTableEnvironment; StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings); // ****************** // BLINK BATCH QUERY // ****************** import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
創建表(Create Tables in the Catalog)
表可以是虛擬表(VIEWS)或常規表(TABLES)。VIEWS可以從現有Table對象創建,通常是Table API或SQL查詢的結果。TABLES一般為外部數據對應的表,比如文件,數據庫表或消息隊列。
臨時表與永久表
臨時表與單個Flink會話的生命周期相關,臨時表始終存儲在內存中。
永久表需要一個catalog來管理表對應的元數據,比如hive metastore,該表將一直存在,直到明確刪除該表為止。
創建虛擬表
// get a TableEnvironment TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section // table is the result of a simple projection query Table projTable = tableEnv.from("X").select(...); // register the Table projTable as table "projectedTable" tableEnv.createTemporaryView("projectedTable", projTable);
通過Connector創建表
比如外部關系型數據庫、kafka
tableEnvironment
.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MyTable")
查詢表(Query a Table)
以Table API的方式查詢表
// get a TableEnvironment TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section // register Orders table // scan registered Orders table Table orders = tableEnv.from("Orders"); // compute revenue for all customers from France Table revenue = orders .filter("cCountry === 'FRANCE'") .groupBy("cID, cName") .select("cID, cName, revenue.sum AS revSum"); // emit or convert Table // execute query
以SQL的方式查詢表
// get a TableEnvironment TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section // register Orders table // compute revenue for all customers from France Table revenue = tableEnv.sqlQuery( "SELECT cID, cName, SUM(revenue) AS revSum " + "FROM Orders " + "WHERE cCountry = 'FRANCE' " + "GROUP BY cID, cName" ); // emit or convert Table // execute query
以SQL的方式更新表
// get a TableEnvironment TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section // register "Orders" table // register "RevenueFrance" output table // compute revenue for all customers from France and emit to "RevenueFrance" tableEnv.sqlUpdate( "INSERT INTO RevenueFrance " + "SELECT cID, cName, SUM(revenue) AS revSum " + "FROM Orders " + "WHERE cCountry = 'FRANCE' " + "GROUP BY cID, cName" ); // execute query
此外,也可以Table API和SQL兩種方式混合使用。
生成表(Emit a Table / TableSink)
TableSink支持多種文件格式,比如CSV,Apache Parquet,Apache Avro;或者輸出到其他存儲系統,比如JDBC,Apache HBase,Apache Cassandra,Elasticsearch;也可以寫到一些消息中間件,比如Apache Kafka,RabbitMQ
// get a TableEnvironment TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section // create an output Table final Schema schema = new Schema() .field("a", DataTypes.INT()) .field("b", DataTypes.STRING()) .field("c", DataTypes.LONG()); tableEnv.connect(new FileSystem("/path/to/file")) .withFormat(new Csv().fieldDelimiter('|').deriveSchema()) .withSchema(schema) .createTemporaryTable("CsvSinkTable"); // compute a result Table using Table API operators and/or SQL queries Table result = ... // emit the result Table to the registered TableSink result.insertInto("CsvSinkTable"); // execute the program
查看執行計划(Explaining a Table)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello")); DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello")); Table table1 = tEnv.fromDataStream(stream1, "count, word"); Table table2 = tEnv.fromDataStream(stream2, "count, word"); Table table = table1 .where("LIKE(word, 'F%')") .unionAll(table2); String explanation = tEnv.explain(table); System.out.println(explanation);
結果:
== Abstract Syntax Tree == LogicalUnion(all=[true]) LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) FlinkLogicalDataStreamScan(id=[1], fields=[count, word]) FlinkLogicalDataStreamScan(id=[2], fields=[count, word]) == Optimized Logical Plan == DataStreamUnion(all=[true], union all=[count, word]) DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')]) DataStreamScan(id=[1], fields=[count, word]) DataStreamScan(id=[2], fields=[count, word]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Data Source content : collect elements with CollectionInputFormat Stage 3 : Operator content : from: (count, word) ship_strategy : REBALANCE Stage 4 : Operator content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word) ship_strategy : FORWARD Stage 5 : Operator content : from: (count, word) ship_strategy : REBALANCE
動態表 (Dynamic Table)
與傳統的表 SQL 查詢相比,Flink Table & SQL 在處理流數據時會時時刻刻處於動態的數據變化中,所以便有了一個動態表的概念。
動態表的查詢與靜態表一樣,但是,在查詢動態表的時候,SQL 會做連續查詢,不會終止。
下圖顯示了流、動態表和連續查詢之間的關系:
-
數據流被轉換成一個動態表。
-
連續查詢動態表,生成新的動態表。
-
生成的動態表被轉換回流。
舉個例子吧,比如我們用戶點擊事件的實時數據流,如下
[ user: VARCHAR, // the name of the user cTime: TIMESTAMP, // the time when the URL was accessed url: VARCHAR // the URL that was accessed by the user ]
我們想使用sql查詢,那么首先需要將用戶點擊事件流轉換為表,數據流中的每一條數據都視為在表中插入一條數據,如下:
在任何時間點,連續查詢的結果在語義上等價於以批處理模式在輸入表快照上執行相同查詢的結果,比如我們基於用戶點擊事件表clicks統計每個用戶的點擊次數:
可以看到 Flink SQL 和傳統的 SQL 一樣,支持了包含查詢、連接、聚合等場景,另外還支持了包括窗口、排序等場景。
SQL操作
詳細的看官網吧:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#operations
目前 Flink SQL 支持的內置函數
Flink 中還有大量的內置函數,我們可以直接使用,比如:
-
比較函數
-
邏輯函數
-
算術函數
-
字符串處理函數
-
時間函數
具體的看官網吧,跟平時使用的差不多。
自定義UDF函數
自定義Scalar Functions
需要繼承org.apache.flink.table.functions.ScalarFunction類,然后實現一個或多個eval方法(名字要求是eval並且需要是public方法),另外要定義其返回值類型。eval方法也支持變長參數,比如eval(String... strs)
例子:
public class HashCode extends ScalarFunction { private int factor = 12; public HashCode(int factor) { this.factor = factor; } public int eval(String s) { return s.hashCode() * factor; } } BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env); // register the function // 注冊函數 tableEnv.registerFunction("hashCode", new HashCode(10)); // use the function in Java Table API // 以Table API的方式使用我們注冊的函數 myTable.select("string, string.hashCode(), hashCode(string)"); // use the function in SQL API // 在sql中使用我們定義的函數 tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");
對於復雜的返回類型,比如我們自己自定義的類,可以通過重寫ScalarFunction#getResultType()方法自己定義結果類型的類型信息。
public static class TimestampModifier extends ScalarFunction { public long eval(long t) { return t % 1000; } public TypeInformation<?> getResultType(Class<?>[] signature) { return Types.SQL_TIMESTAMP; } }
Flink 里有三種自定義函數:ScalarFunction,TableFunction 和 AggregateFunction。我們可以從輸入和輸出的維度對這些自定義函數進行分類。如下圖所示,ScalarFunction 是輸入一行,輸出一行;TableFunction 是輸入一行,輸出多行;AggregateFunction 是輸入多行輸出一行。為了讓語義更加完整,Table API 新加了 TableAggregateFunction,它可以接收和輸出多行。TableAggregateFunction 添加后,Table API 的功能可以得到很大的擴展,某種程度上可以用它來實現自定義 operator。比如,我們可以用 TableAggregateFunction 來實現 TopN。
TableAggregateFunction 使用也很簡單,方法簽名和用法如下圖所示:
用法上,我們只需要調用 table.flatAggregate(),然后傳入一個 TableAggregateFunction 實例即可。用戶可以繼承 TableAggregateFunction 來實現自定義的函數。繼承的時候,需要先定義一個 Accumulator,用來存取狀態,此外自定義的 TableAggregateFunction 需要實現 accumulate 和 emitValue 方法。accumulate 方法用來處理輸入的數據,而 emitValue 方法負責根據 accumulator 里的狀態輸出結果。