本文為《Flink大數據項目實戰》學習筆記,想通過視頻系統學習Flink這個最火爆的大數據計算框架的同學,推薦學習課程:
Flink大數據項目實戰:http://t.cn/EJtKhaz
1. 繼續侃Flink編程基本套路
1.1 DataSet and DataStream
DataSet and DataStream表示Flink app中的分布式數據集。它們包含重復的、不可變數據集。DataSet有界數據集,用在Flink批處理。DataStream可以是無界,用在Flink流處理。它們可以從數據源創建,也可以通過各種轉換操作創建。
1.2共同的編程套路
DataSet and DataStream 這里以WordCount為例,共同的編程套路如下所示:
1.獲取執行環境(execution environment)
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2.加載/創建初始數據集
// 讀取輸入數據
DataStream<String> text;
if (params.has("input")) {
// 讀取text文件
text = env.readTextFile(params.get("input"));
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
// 讀取默認測試數據集
text = env.fromElements(WordCountData.WORDS);
}
3.對數據集進行各種轉換操作(生成新的數據集)
DataStream<Tuple2<String, Integer>> counts =
// 切分每行單詞
text.flatMap(new Tokenizer())
//對每個單詞分組統計詞頻數
.keyBy(0).sum(1);
4.指定將計算的結果放到何處去
// 輸出統計結果
if (params.has("output")) {
//寫入文件地址
counts.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
//數據打印控制台
counts.print();
}
5.觸發APP執行
// 執行flink 程序
env.execute("Streaming WordCount");
1.3惰性計算
Flink APP都是延遲執行的,只有當execute()被顯示調用時才會真正執行,本地執行還是在集群上執行取決於執行環境的類型。好處:用戶可以根據業務構建復雜的應用,Flink可以整體進優化並生成執行計划。
2. 指定鍵(Specifying Keys)
2.1誰需要指定鍵
哪些操作需要指定key呢?常見的操作如join, coGroup, keyBy, groupBy,Reduce, GroupReduce, Aggregate, Windows等。
Flink編程模型的key是虛擬的,不需要你創建鍵值對,可以在具體算子通過參數指定,如下代碼所示:
DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*define key here*/)
.reduceGroup(/*do something*/);
2.2為Tuple定義鍵
Tuple定義鍵的方式有很多種,接下來我們一起看幾個示例:
按照指定屬性分組
DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
注意:此時表示使用Tuple3三元組的第一個成員作為keyBy
按照組合鍵進行分組
DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
注意:此時表示使用Tuple3三元組的前兩個元素一起作為keyBy
特殊情況:嵌套Tuple
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
注意:這里使用KeyBy(0)指定鍵,系統將會使用整個Tuple2作為鍵(整型和浮點型的)。如果想使用Tuple2內部字段作為鍵,你可以使用字段來表示鍵,這種方法會在后面闡述。
2.3使用字段表達式定義鍵
基於字符串的字段表達式可以用來引用嵌套字段(例如Tuple,POJO)
public class WC {
public String word;
public User user;
public int count;
}
public class User{
public int age;
public String zip;
}
示例:通過word字段進行分組
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
語法:
1.直接使用字段名選擇POJO字段
例如 user 表示 一個POJO的user字段
2.Tuple通過offset來選擇
"_1"和"5"分別代表第一和第六個Scala Tuple字段
“f0” and “f5”分別代表第一和第六個Java Tuple字段
3.選擇POJO和Tuples的嵌套屬性
user.zip
在scala里你可以"_2.user.zip"或"user._4.1.zip”
在java里你可以“2.user.zip”或者" user.f0.1.zip ”
4.使用通配符表達式選擇所有屬性,java為“*”,scala為 "_"。不是POJO或者Tuple的類型也適用。
2.4字段表達式實例-Java
以下定義兩個Java類:
public static class WC {
public ComplexNestedClass complex;
private int count;
public int getCount() {
return count;
}
public void setCount(int c) {
this.count = c;
}
}
public static class ComplexNestedClass {
public Integer someNumber;
public float someFloat;
public Tuple3<Long, Long, String> word;
public IntWritable hadoopCitizen;
}
我們一起看看如下key字段如何理解:
1."count": wc 類的count字段
2."complex":遞歸的選取ComplexNestedClass的所有字段
3."complex.word.f2": ComplexNestedClass類中的tuple word的第三個字段;
4."complex.hadoopCitizen":選擇Hadoop IntWritable類型。
2.5字段表達式實例-Scala
以下定義兩個Scala類:
class WC(var complex: ComplexNestedClass, var count: Int) {
def this() { this(null, 0) }
}
class ComplexNestedClass(
var someNumber: Int,
someFloat: Float,
word: (Long, Long, String),
hadoopCitizen: IntWritable) {
def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}
我們一起看看如下key字段如何理解:
1."count": wc 類的count字段
2."complex":遞歸的選取ComplexNestedClass的所有字段
3."complex.word._3": ComplexNestedClass類中的tuple word的第三個字段;
4."complex.hadoopCitizen":選擇Hadoop IntWritable類型。
2.6 Key Selector Functions
還有一種定義鍵的方式叫做“鍵選擇器”函數。鍵選擇器函數需要一個元素作為入參,返回這個元素的鍵。這個鍵可以是任何類型的,也可從指定計算中生成。
class WC(var complex: ComplexNestedClass, var count: Int) {
def this() { this(null, 0) }
}
public class WC {public String word; public int count;}
DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words
.keyBy(new KeySelector<WC, String>() {
public String getKey(WC wc) {
return wc.word;
}
});
3. 自定義轉換函數
3.1實現接口
大多數的轉換操作需要用戶自己定義函數,可以通過實現MapFunction接口,並重寫map函數來實現。
3.2匿名類
也可以直接使用匿名類,不需要定義類名稱,直接new接口重寫map方法即可。
3.3 Lambda表達式
使用Lambda表達式比自定義函數更方便,更直接。
3.4 Rich Functions
遇到特殊的需求,比如讀取數據庫中的數據,如果數據庫連接放在map函數里面迭代循環,實現譜圖mapFunction接口無法滿足要求。
我們需要繼承RichMapFunction,將獲取數據庫連接放在open方法中,具體轉換放在map方法中。
當然它也可以使用匿名類:
Rich Function擁有非常有用的四個方法:open,close,getRuntimeContext和setRuntimecontext
這些功能在參數化函數、創建和確定本地狀態、獲取廣播變量、獲取運行時信息(例如累加器和計數器)和迭代信息時非常有幫助。
4. 支持的數據類型
Flink對DataSet和DataStream中可使用的元素類型添加了一些約束。原因是系統可以通過分析這些類型來確定有效的執行策略和選擇不同的序列化方式。
有7中不同的數據類型:
1.Java Tuple 和 Scala Case類;
2.Java POJO;
3.基本類型;
4.通用類;
5.值;
6.Hadoop Writables;
7.特殊類型
4.1Java Tuple
Tuple是包含固定數量各種類型字段的復合類。Flink Java API提供了Tuple1-Tuple25。Tuple的字段可以是Flink的任意類型,甚至嵌套Tuple。
訪問Tuple屬性的方式有以下兩種:
1.屬性名(f0,f1…fn)
2.getField(int pos)
4.2Scala Case類
Scala的Case類(以及Scala的Tuple,實際是Case class的特殊類型)是包含了一定數量多種類型字段的組合類型。Tuple字段通過他們的1-offset名稱定位,例如 _1代表第一個字段。Case class 通過字段名稱獲得:
case class WordCount(word: String, count: Int)
val input = env.fromElements(
WordCount("hello", 1),
WordCount("world", 2)) // Case Class Data Set
input.keyBy("word")// key by field expression "word"
val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
input2.keyBy(0, 1) // key by field positions 0 and 1
4.3POJOs
Java和Scala的類在滿足下列條件時,將會被Flink視作特殊的POJO數據類型專門進行處理:
1.是公共類;
2.無參構造是公共的;
3.所有的屬性都是可獲得的(聲明為公共的,或提供get,set方法);
4.字段的類型必須是Flink支持的。Flink會用Avro來序列化任意的對象。
Flink會分析POJO類型的結構獲知POJO的字段。POJO類型要比一般類型好用。此外,Flink訪問POJO要比一般類型更高效。
public class WordWithCount {
public String word;
public int count;
public WordWithCount() {}
public WordWithCount(String word, int count) { this.word = word; this.count = count; }
}
DataStream<WordWithCount> wordCounts = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2));
wordCounts.keyBy("word");
4.4基本類型
Flink支持Java和Scala所有的基本數據類型,比如 Integer,String,和Double。
4.5一般通用類
Flink支持大多數的Java,Scala類(API和自定義)。包含不能序列化字段的類在增加一些限制后也可支持。遵循Java Bean規范的類一般都可以使用。
所有不能視為POJO的類Flink都會當做一般類處理。這些數據類型被視作黑箱,其內容是不可見的。通用類使用Kryo進行序列/反序列化。
4.6值類型Values
通過實現org.apache.flinktypes.Value接口的read和write方法提供自定義代碼來進行序列化/反序列化,而不是使用通用的序列化框架。
Flink預定義的值類型與原生數據類型是一一對應的(例如:ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue)。這些值類型作為原生數據類型的可變變體,他們的值是可以改變的,允許程序重用對象從而緩解GC的壓力。
4.7 Hadoop的Writable類
它實現org.apache.hadoop.Writable接口的類型,該類型的序列化邏輯在write()和readFields()方法中實現。
4.8特殊類型
Flink比較特殊的類型有以下兩種:
1.Scala的 Either、Option和Try。
2.Java ApI有自己的Either實現。
4.9類型擦除和類型推理
注意:本小節內容僅針對Java
Java編譯器在編譯之后會丟棄很多泛型類型信息。這在Java中稱為類型擦除。這意味着在運行時,對象的實例不再知道其泛型類型。
例如,在JVM中,DataStream<String>和DataStream<Long>的實例看起來是相同的。
List<String> l1 = new ArrayList<String>();
List<Integer> l2 = new ArrayList<Integer>();
System.out.println(l1.getClass() == l2.getClass());
泛型:一種較為准確的說法就是為了參數化類型,或者說可以將類型當作參數傳遞給一個類或者是方法。
Flink 的Java API會試圖去重建(可以做類型推理)這些被丟棄的類型信息,並將它們明確地存儲在數據集以及操作中。你可以通過DataStream.getType()方法來獲取類型,這個方法將返回一個TypeInformation的實例,這個實例是Flink內部表示類型的方式。
5. 累加器和計數器
5.1累加器和計數器
計數器是最簡單的累加器。
內置累加器主要包含以下幾類:
1.IntCounter, LongCounter 和 DoubleCounter
2.Histogram(柱狀圖)
5.2如何使用累加器
第一步:在自定義的轉換操作里創建累加器對象:
private IntCounter numLines = new IntCounter();
第二步:注冊累加器對象,通常是在rich function的open()方法中。這里你還需要定義累加器的名字getRuntimeContext().addAccumulator(“num-lines”, this.numLines);
第三步:在operator函數的任何地方使用累加器,包括在open()和close()方法中
this.numLines.add(1);
第四步:結果存儲在JobExecutionResult里:
JobExecutionResult JobExecutionResult =env.execute("Flink Batch Java API Skeleton")
myJobExecutionResult.getAccumulatorResult("num-lines")
5.3自定義累加器
為了實現你自己的累加器,我們需要實現Accumulator接口,如果你想讓你自定義的累加器需要被Flink所收錄,請創建一個提交請求。可以選擇實現Accumulator或者SimpleAccumulator。
1.Accumulator<V, R>是最靈活的:它定義了需要進行累加的值的類型V以及最后結果的類型R,例如:對於一個histogram,v是數值類型的而R是一個histogram。
2.SimpleAccumulator則是在進行累計數據類型和返回的數據類型一致的情況下使用的,例如計數器。