Flink提供了像表一樣處理的API和像執行SQL語句一樣把結果集進行執行。這樣很方便的讓大家進行數據處理了。比如執行一些查詢,在無界數據和批處理的任務上,然后將這些按一定的格式進行輸出,很方便的讓大家像執行SQL一樣簡單。
今天主要寫的東西分為如下幾個方面,然后遵循着下邊幾個方面進行展開:
1. Flink的不同API的層級梗概。
2. FlinkSQL的編程的步驟。
3. Flink編程的例子。
一、 Flink有着不同級別的API,不同級別的API方便不同用戶進行處理。普通用戶使用Datastream以及Dataset進行程序編寫,我們可以在其更高的基礎上使用Table API以及SQL,這也是Flink的強大之處,可以像使用處理表一樣處理數據。如果想研究的更高可以看更底層的東西。
SQL | High-level Language |
Table API | Declarative DSL |
Datastream / Dataset API | Core API |
Stateful Stream Processing | Low-level building block (streams, state, [event] time) |
二、 Flink的Table API 和 SQL編程步驟如下:
1) 創建一個TableEnvironment表環境用於后續使用。TableEnvironment是 SQL 和 Table API的核心概念,它用於設置執行所需要的數據屬性,和ExecutionEnvironment類似,它主要負責:
a) 注冊表數據源,從內部或者外部來源。
b) 執行相應的SQL語句。
c) 注冊自定義集數。
d 將結果集進行掃描和寫入到目標數據源。
e) 相同的environment可以執行相應的join unin操作。
2)接下來,咱們看一下如何注冊數據源,注意不同的Flink版本有不同的實現,但是核心的內容是不變的:
a) 可以直接從數據集里進行注冊。比如 tableEnvironment.registerDataSet()。
b) 在一個已經存在的Table中直接執行scan或者select,那么會生成一個新的Table,也就是數據可以從已有的Table中再次獲取,Table t = tableEnv.scan("x").select("a, b,c")。
c) 可以是TableSource, 也就是從不同的文件、數據庫、消息系統進行讀取。 比如csv文件,TableSource csvSource = new CsvTableSource("path/to/file")。
3)讀取完數據后進行處理,處理完之后要存儲起來,那么需要Sink(存儲)到文件或者數據庫、消息系統等。
a) 比如Sink到CSV文件。 TableSink csvSink = new TableCSVSink("path/to/sink", ..)。
b) Sink為指定字段句和類型到CSV文件中。
指定表字段: String[] fieldNames = {"fild1", "filed2", "field3"};
指定字段類型: TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
指定表名和csv文件:tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
三、接下來,看一下真實的例子。
1)從給定的單詞和單詞的個數中統計一下,每個單詞出現的數據,使用SQL語句進行實現查詢統計。完整的樣例如下(注意,不同的FLink版本實現上有稍微的差異):
package myflink.sql; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; public class WordCountSQL { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.getTableEnvironment(env); DataSet<WC> input = env.fromElements( WC.of("hello", 1), WC.of("hqs", 1), WC.of("world", 1), WC.of("hello", 1) ); //注冊數據集 tEnv.registerDataSet("WordCount", input, "word, frequency"); //執行SQL,並結果集做為一個新表 Table table = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word"); DataSet<WC> result = tEnv.toDataSet(table, WC.class); result.print(); } public static class WC { public String word; //hello public long frequency; //創建構造方法,讓flink進行實例化 public WC() {} public static WC of(String word, long frequency) { WC wc = new WC(); wc.word = word; wc.frequency = frequency; return wc; } @Override public String toString() { return "WC " + word + " " + frequency; } } }
輸出的結果為,和我們想的結果是一樣的。
WC world 1 WC hello 2 WC hqs 1
2)接下來的例子會復雜一些,從一個txt文件中讀取數據,txt文件中包含id, 人字, 書名,價格信息。然后將數據注冊成一個表,然后將這個表的結果進行統計,按人名統計出來這個人買書所花費的錢,將結果sink到一個文件中。上代碼。
package myflink.sql; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.Types; import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.table.sinks.CsvTableSink; import org.apache.flink.table.sinks.TableSink; public class SQLFromFile { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env); env.setParallelism(1); //讀取文件 DataSource<String> input = env.readTextFile("test.txt"); //將讀取到的文件進行輸出 input.print(); //轉換為DataSet DataSet<Orders> inputDataSet = input.map(new MapFunction<String, Orders>() { @Override public Orders map(String s) throws Exception { String[] splits = s.split(" "); return Orders.of(Integer.valueOf(splits[0]), String.valueOf(splits[1]), String.valueOf(splits[2]), Double.valueOf(splits[3])); } }); //轉換為table Table order = tableEnv.fromDataSet(inputDataSet); //注冊Orders表名 tableEnv.registerTable("Orders", order); Table nameResult = tableEnv.scan("Orders").select("name"); //輸出一下表 nameResult.printSchema(); //執行一下查詢 Table sqlQueryResult = tableEnv.sqlQuery("select name, sum(price) as total from Orders group by name order by total desc"); //查詢結果轉換為DataSet DataSet<Result> result = tableEnv.toDataSet(sqlQueryResult, Result.class); result.print(); //以tuple的方式進行輸出 result.map(new MapFunction<Result, Tuple2<String, Double>>() { @Override public Tuple2<String, Double> map(Result result) throws Exception { String name = result.name; Double total = result.total; return Tuple2.of(name, total); } }).print(); TableSink sink = new CsvTableSink("SQLText.txt", " | "); //設置字段名 String[] filedNames = {"name", "total"}; //設置字段類型 TypeInformation[] filedTypes = {Types.STRING(), Types.DOUBLE()}; tableEnv.registerTableSink("SQLTEXT", filedNames, filedTypes, sink); sqlQueryResult.insertInto("SQLTEXT"); env.execute(); } public static class Orders { public Integer id; public String name; public String book; public Double price; public Orders() { super(); } public static Orders of(Integer id, String name, String book, Double price) { Orders orders = new Orders(); orders.id = id; orders.name = name; orders.book = book; orders.price = price; return orders; } } public static class Result { public String name; public Double total; public Result() { super(); } public static Result of(String name, Double total) { Result result = new Result(); result.name = name; result.total = total; return result; } } }
想參考完整的代碼,可以訪問 https://github.com/stonehqs/flink-demo 。
有問題,歡迎拍磚。