首先:flink根據使用的便捷性提供了三種API,自下而上是:
Table API & SQL
1、聲明行:用戶只關心做什么,不用關心怎么做
2、高性能:支持性能優化,可以獲取更好的執行性能
3、流批統一:相同的統計邏輯,既可以流模式運行,也可以批模式運行
4、性能穩定:語義遵循SQL標准,不易變動
5、易理解:語義明確,所見即所得
Table API:tab.groupBy("word").select("word,count(1) as count")
SQL:SELECT word,COUNT(*) as cnt FROM MyTable GROUP BY word
Table API 特點:
1、Table API使得多聲明的數據處理起來比較容易
例如:我們把a大於10的數據存xxx的外部表,同時需要把a小於10的數據插入到外部表yyy,我們是使用TableAPI很方便。
Table.filter(a>10).insertInto("xxx")
Table.filter(a<10).insertInto("yyy")
2、TableAPI使得擴展標准SQL更容易(當且僅當需要的時候)
Table API 編程:
1、WordCount示例:
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.OldCsv; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.types.Row; public class JavaStreamWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); String path = JavaStreamWordCount.class.getClassLoader().getResource("words.txt").getPath(); tEnv.connect(new FileSystem().path(path)) .withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n")) .withSchema(new Schema().field("word", Types.STRING)) .inAppendMode() .registerTableSource("fileSource"); Table result = tEnv.scan("fileSource") .groupBy("word") .select("word, count(1) as count"); tEnv.toRetractStream(result, Row.class).print(); env.execute(); } }
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.OldCsv; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.types.Row; public class JavaStreamWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); String path = JavaStreamWordCount.class.getClassLoader().getResource("words.txt").getPath(); tEnv.connect(new FileSystem().path(path)) .withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n")) .withSchema(new Schema().field("word", Types.STRING)) .inAppendMode() .registerTableSource("fileSource"); Table result = tEnv.scan("fileSource") .groupBy("word") .select("word, count(1) as count"); tEnv.toRetractStream(result, Row.class).print(); env.execute(); } }
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.OldCsv; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.types.Row; public class JavaBatchWordCount { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); String path = JavaBatchWordCount.class.getClassLoader().getResource("words.txt").getPath(); tEnv.connect(new FileSystem().path(path)) .withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n")) .withSchema(new Schema().field("word", Types.STRING)) .registerTableSource("fileSource"); Table result = tEnv.scan("fileSource") .groupBy("word") .select("word, count(1) as count"); tEnv.toDataSet(result, Row.class).print(); } }
參考:https://github.com/hequn8128/TableApiDemo
2、TableAPI操作
(1)how to get a Table
Table myTable = tableExnvironment.scan("MyTable"); //Table 是從tableExnvironment中scan出來的,那么MyTable是如果注冊呢,即:How to register a table??大致又三種:
(2)how to emit a Table
(3) how to query a Table
3、 Columns Operation & Function
4、 Row-based Operation
Table SQL 編程:
1、如何在流上運行SQL查詢
參考:https://github.com/ververica/sql-training
2、如何使用SQL CLI客戶端
3、執行window aggregate 和non-window aggregate,並理解其區別
4、如何用SQL消費Kafka數據
5、如何用SQL將結果寫入Kafka和ElasticSearch
還有......更過會在1.9支持,敬請關注