說明:本文為《Flink大數據項目實戰》學習筆記,想通過視頻系統學習Flink這個最火爆的大數據計算框架的同學,推薦學習課程:
Flink大數據項目實戰:http://t.cn/EJtKhaz
2.4字段表達式實例-Java
以下定義兩個Java類:
public static class WC {
public ComplexNestedClass complex;
private int count;
public int getCount() {
return count;
}
public void setCount(int c) {
this.count = c;
}
}
public static class ComplexNestedClass {
public Integer someNumber;
public float someFloat;
public Tuple3<Long, Long, String> word;
public IntWritable hadoopCitizen;
}
我們一起看看如下key字段如何理解:
1."count": wc 類的count字段
2."complex":遞歸的選取ComplexNestedClass的所有字段
3."complex.word.f2": ComplexNestedClass類中的tuple word的第三個字段;
4."complex.hadoopCitizen":選擇Hadoop IntWritable類型。
2.5字段表達式實例-Scala
以下定義兩個Scala類:
3.1實現接口
大多數的轉換操作需要用戶自己定義函數,可以通過實現MapFunction接口,並重寫map函數來實現。
3.2匿名類
也可以直接使用匿名類,不需要定義類名稱,直接new接口重寫map方法即可。
3.3 Lambda表達式
使用Lambda表達式比自定義函數更方便,更直接。
我們需要繼承RichMapFunction,將獲取數據庫連接放在open方法中,具體轉換放在map方法中。
當然它也可以使用匿名類:
Rich Function擁有非常有用的四個方法:open,close,getRuntimeContext和setRuntimecontext
這些功能在參數化函數、創建和確定本地狀態、獲取廣播變量、獲取運行時信息(例如累加器和計數器)和迭代信息時非常有幫助。
4. 支持的數據類型
Flink對DataSet和DataStream中可使用的元素類型添加了一些約束。原因是系統可以通過分析這些類型來確定有效的執行策略和選擇不同的序列化方式。
有7中不同的數據類型:
1.Java Tuple 和 Scala Case類;
2.Java POJO;
3.基本類型;
4.通用類;
5.值;
6.Hadoop Writables;
7.特殊類型
4.1Java Tuple
Tuple是包含固定數量各種類型字段的復合類。Flink Java API提供了Tuple1-Tuple25。Tuple的字段可以是Flink的任意類型,甚至嵌套Tuple。
訪問Tuple屬性的方式有以下兩種:
1.屬性名(f0,f1…fn)
2.getField(int pos)
4.2Scala Case類
Scala的Case類(以及Scala的Tuple,實際是Case class的特殊類型)是包含了一定數量多種類型字段的組合類型。Tuple字段通過他們的1-offset名稱定位,例如 _1代表第一個字段。Case class 通過字段名稱獲得:
case class WordCount(word: String, count: Int)
val input = env.fromElements(
WordCount("hello", 1),
WordCount("world", 2)) // Case Class Data Set
input.keyBy("word")// key by field expression "word"
val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
input2.keyBy(0, 1) // key by field positions 0 and 1
4.3POJOs
Java和Scala的類在滿足下列條件時,將會被Flink視作特殊的POJO數據類型專門進行處理:
1.是公共類;
2.無參構造是公共的;
3.所有的屬性都是可獲得的(聲明為公共的,或提供get,set方法);
4.字段的類型必須是Flink支持的。Flink會用Avro來序列化任意的對象。
Flink會分析POJO類型的結構獲知POJO的字段。POJO類型要比一般類型好用。此外,Flink訪問POJO要比一般類型更高效。
public class WordWithCount {
public String word;
public int count;
public WordWithCount() {}
public WordWithCount(String word, int count) { this.word = word; this.count = count; }
}
DataStream<WordWithCount> wordCounts = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2));
wordCounts.keyBy("word");
4.4基本類型
Flink支持Java和Scala所有的基本數據類型,比如 Integer,String,和Double。
4.5一般通用類
Flink支持大多數的Java,Scala類(API和自定義)。包含不能序列化字段的類在增加一些限制后也可支持。遵循Java Bean規范的類一般都可以使用。
所有不能視為POJO的類Flink都會當做一般類處理。這些數據類型被視作黑箱,其內容是不可見的。通用類使用Kryo進行序列/反序列化。
4.6值類型Values
通過實現org.apache.flinktypes.Value接口的read和write方法提供自定義代碼來進行序列化/反序列化,而不是使用通用的序列化框架。
Flink預定義的值類型與原生數據類型是一一對應的(例如:ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue)。這些值類型作為原生數據類型的可變變體,他們的值是可以改變的,允許程序重用對象從而緩解GC的壓力。
4.7 Hadoop的Writable類
它實現org.apache.hadoop.Writable接口的類型,該類型的序列化邏輯在write()和readFields()方法中實現。
4.8特殊類型
Flink比較特殊的類型有以下兩種:
1.Scala的 Either、Option和Try。
2.Java ApI有自己的Either實現。
4.9類型擦除和類型推理
注意:本小節內容僅針對Java
Java編譯器在編譯之后會丟棄很多泛型類型信息。這在Java中稱為類型擦除。這意味着在運行時,對象的實例不再知道其泛型類型。
例如,在JVM中,DataStream<String>和DataStream<Long>的實例看起來是相同的。
List<String> l1 = new ArrayList<String>();
List<Integer> l2 = new ArrayList<Integer>();
System.out.println(l1.getClass() == l2.getClass());
泛型:一種較為准確的說法就是為了參數化類型,或者說可以將類型當作參數傳遞給一個類或者是方法。
Flink 的Java API會試圖去重建(可以做類型推理)這些被丟棄的類型信息,並將它們明確地存儲在數據集以及操作中。你可以通過DataStream.getType()方法來獲取類型,這個方法將返回一個TypeInformation的實例,這個實例是Flink內部表示類型的方式。
5. 累加器和計數器
5.1累加器和計數器
計數器是最簡單的累加器。
內置累加器主要包含以下幾類:
1.IntCounter, LongCounter 和 DoubleCounter
2.Histogram(柱狀圖)
5.2如何使用累加器
第一步:在自定義的轉換操作里創建累加器對象:
private IntCounter numLines = new IntCounter();
第二步:注冊累加器對象,通常是在rich function的open()方法中。這里你還需要定義累加器的名字getRuntimeContext().addAccumulator(“num-lines”, this.numLines);
第三步:在operator函數的任何地方使用累加器,包括在open()和close()方法中
this.numLines.add(1);
第四步:結果存儲在JobExecutionResult里:
JobExecutionResult JobExecutionResult =env.execute("Flink Batch Java API Skeleton")
5.3自定義累加器
為了實現你自己的累加器,我們需要實現Accumulator接口,如果你想讓你自定義的累加器需要被Flink所收錄,請創建一個提交請求。可以選擇實現Accumulator或者SimpleAccumulator。
1.Accumulator<V, R>是最靈活的:它定義了需要進行累加的值的類型V以及最后結果的類型R,例如:對於一個histogram,v是數值類型的而R是一個histogram。
2.SimpleAccumulator則是在進行累計數據類型和返回的數據類型一致的情況下使用的,例如計數器。
(7)DataSream API
1.執行計划Graph
Flink 通過Stream API (Batch API同理)開發的應用,底層有四層執行計划,我們首先來看Flink的四層執行計划如下圖所示。
通過Stream API開發的Flink應用,底層首先轉換為StreamGraph,然后再轉換為JobGraph,接着轉換為ExecutionGraph,最后生成“物理執行圖”。
StreamGraph
1.根據用戶代碼生成最初的圖
2.它通過類表示程序的拓撲結構
3.它是在client端生成
JobGraph
1.優化streamgraph
2.將多個符合條件的Node chain在一起
3.在client端生成,然后交給JobManager
ExecutionGraph
JobManger根據JobGraph 並行化生成ExecutionGraph
物理執行圖
實際執行圖,不可見
1.1 StreamGraph
StreamGraph
通過Stream API提交的文件,首先會被翻譯成StreamGraph。StreamGraph的生成的邏輯是在StreamGraphGenerate類的generate方法。而這個generate的方法又會在StreamExecutionEnvironment.execute方法被調用。
1.env中存儲 List<StreamTransformation<?> ,里面存儲了各種算子操作。
2.StreamTransformation(是一個類)
a)它描述DataStream之間的轉化關系 。
b)它包含了StreamOperator/UDF 。
c)它包含了很多子類,比如OneInputTransformation/TwoInputTransform/ SourceTransformation/ SinkTransformation/ SplitTransformation等。
3.StreamNode/StreamEdge
StreamNode(算子)/StreamEdge(算子與算子之間的聯系)是通過StreamTransformation來構造。
1.2 StreamGraph轉JobGraph
1.3 JobGraph
從StreamGraph到JobGraph轉換過程中,內部角色也會進行轉換
1.StreamNode->JobVertex:StreamNode轉換為JobVertex
2.StreamEdge->JobEdge:StreamEdge轉換為JobEdge
3.將符合條件的StreamNode chain成一個JobVertex(頂點)
a)沒有禁用Chain
b)上下游算子並行度一致
c)下游算子的入度為1(也就是說下游節點沒有來自其他節點的輸入)
d)上下游算子在同一個slot group下游節點的 chain 策略為 ALWAYS(可以與上下游鏈接,
map、flatmap、filter等默認是ALWAYS)
e)上游節點的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接,Source默認是HEAD)
f)上下游算子之間沒有數據shuffle (數據分區方式是 forward)
4.根據group指定JobVertex所屬SlotSharingGroup
5.配置checkpoint策略
6.配置重啟策略
1.4 JobGraph -> ExecutionGraph
1.5 ExecutionGraph
從JobGraph轉換ExecutionGraph的過程中,內部會出現如下的轉換。
1.ExecutionJobVertex <- JobVertex:JobVertex轉換為ExecutionJobVertex 。
2.ExecutionVertex(比如map)可以並發多個任務。
3.ExecutionEdge <- JobEdge:JobEdge轉換為ExecutionEdge。
4.ExecutionGraph 是一個2維結構。
5.根據2維結構分發對應Vertex到指定slot 。