Flink使用 DataSet 和 DataStream 代表數據集。DateSet 用於批處理,代表數據是有限的;而 DataStream 用於流數據,代表數據是無界的。數據集中的數據是不可以變的,也就是說不能對其中的元素增加或刪除。我們通過數據源創建 DataSet 或者 DataStream ,通過 map,filter 等轉換(transform)操作對數據集進行操作產生新的數據集。
編寫 Flink 程序一般經過一下幾個步驟:
- 獲得 execution 環境
- 創建輸入數據
- 在數據集上進行轉換操作(下文統一稱為:transform)
- 輸出結果數據
- 觸發程序執行
下面我們將介紹編寫 Flink 程序所涉及的基本 API。
輸入和輸出
首先,需要獲得 execution 環境,Flink 提供了一下以下三種方式:
getExecutionEnvironment() createLocalEnvironment() createRemoteEnvironment(String host, int port, String... jarFiles)
以第一個為例創建 execution 環境的代碼如下
批處理:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> text = env.readTextFile("file:///D:\\words.txt"); text.print();
流處理:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.readTextFile("file:///D:\\words.txt"); text.print();
env.execute();
words.txt 文件內容:
a
b
c
d
e
a
b
上面代碼創建了 execution 環境,同時利用 env 創建了輸入源。在數據集上調用 print 方法可以將數據輸出到控制台,當然也可以調用 writeAsText 等方法將數據輸出到其他介質。上面流處理最后一行代碼調用了 execute 方法,在流處理中需要顯式調用該方法觸發程序的執行。
上述代碼有兩種方式運行,一種是直接在 IDE 中執行,就像運行一個普通的 Java 程序,Flink 將啟動一個本地的環境執行程序。另一種方式是將程序打包,提交到 Flink 集群運行。上面例子基本包含了一個 Flink 程序的基本骨架,但是並沒有對數據集進行更多的 transform 操作,下面我們簡單介紹基本 transform 操作。
map操作
這里的 map 操作類似 MapReduce 中的 map,對數據進行解析,處理。示例如下
批處理:
DataSet<Tuple2<String, Integer>> words = text.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String s) throws Exception { return new Tuple2<>(s, 1); } }); words.print();
流處理:
DataStream<Tuple2<String, Integer>> words = text.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String s) throws Exception { return new Tuple2<>(s, 1); } });
words.print()
這里批處理和流處理除了數據集的類型不同,其余寫法都一樣。就是將每個單詞映射成了一個 (單詞, 1) 二元組。與 map 類似的 transform 還有 filter,過濾不需要的記錄,讀者可以自行嘗試。
指定 key
大數據處理經常需要按照某個維度進行處理,也就是需要指定 key。在 DataSet 中使用 groupBy 指定 key,而在 DataStream 中使用 keyBy 指定 key。這里我們以 keyBy 為例進行介紹。
Flink 的數據模型並不是基於 key-value 的,key 是虛擬的,可以看做是定義在數據上的函數。
在 Tuple 中定義 key
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = words.keyBy(0); //0 代表 Tuple2 (二元組)中第一個元素
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = words.keyBy(0,1); //0,1 代表二元組中第一個和第二個元素作為 key\
對於嵌套的 tuple
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
ds.keyBy(0) 將會把 Tuple2<Integer, Float> 整體作為 key。
用字段表達式指定 key
public class WC { public String word; public int count; } DataStream<WC> words = // [...] DataStream<WC> wordCounts = words.keyBy("word");
這里指定 WC 對象的 word 字段作為 key。字段表達式語法如下:
- Java對象使用字段名作為key,例子如上
- 對於 Tuple 類型使用字段名(f0, f1,...)或者偏移(從0開始)指定 key,例如 f0 和 5 分別代表 Tuple 第一個字段和第六個字段
- Java 對象和 Tuple 嵌套的字段作為 key,例如:f1.user.zip 表示 Tuple 第二個字段中的 user 對象中的 zip 字段作為 key
- 通配符 * 代表選擇所有類型作為 key
字段表達式的舉例
public static class WC { public ComplexNestedClass complex; //nested POJO private int count; // getter / setter for private field (count) public int getCount() { return count; } public void setCount(int c) { this.count = c; } } public static class ComplexNestedClass { public Integer someNumber; public float someFloat; public Tuple3<Long, Long, String> word; public IntWritable hadoopCitizen; }
- "count": WC類的 count 字段
- "complex": complex 的所有字段(遞歸地)
- "complex.word.f2": ComplexNestedClass 類中 word 三元組的第三個字段
- "complex.hadoopCitizen": complex類中的 hadoopCitizen 字段
使用 Key Selector 指定 key
通過 key 選擇器函數來制定 key,key 選擇器的輸入為每個元素,輸出為指定的 key,例子如下
words.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() { @Override public Object getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return stringIntegerTuple2.f0; } });
可以看到實現的效果與 keyBy(0) 是一樣的。
以上便是 Flink 指定 key 的方法。
總結
這篇文章主要介紹了 Flink 程序的基本骨架。獲得環境、創建輸入源、對數據集做 transform 以及輸出。由於數據處理經常會按照不同維度(不同的 key)進行統計,因此,本篇內容重點介紹了 Flink 中如何指定 key。后續將會繼續介紹 Flink API 的使用。
歡迎關注公眾號「渡碼」