本文為《Flink大數據項目實戰》學習筆記,想通過視頻系統學習Flink這個最火爆的大數據計算框架的同學,推薦學習課程:
Flink大數據項目實戰:http://t.cn/EJtKhaz
1.執行計划Graph
Flink 通過Stream API (Batch API同理)開發的應用,底層有四層執行計划,我們首先來看Flink的四層執行計划如下圖所示。
通過Stream API開發的Flink應用,底層首先轉換為StreamGraph,然后再轉換為JobGraph,接着轉換為ExecutionGraph,最后生成“物理執行圖”。
StreamGraph
1.根據用戶代碼生成最初的圖
2.它通過類表示程序的拓撲結構
3.它是在client端生成
JobGraph
1.優化streamgraph
2.將多個符合條件的Node chain在一起
3.在client端生成,然后交給JobManager
ExecutionGraph
JobManger根據JobGraph 並行化生成ExecutionGraph
物理執行圖
實際執行圖,不可見
1.1 StreamGraph
StreamGraph
通過Stream API提交的文件,首先會被翻譯成StreamGraph。StreamGraph的生成的邏輯是在StreamGraphGenerate類的generate方法。而這個generate的方法又會在StreamExecutionEnvironment.execute方法被調用。
1.env中存儲 List<StreamTransformation<?> ,里面存儲了各種算子操作。
2.StreamTransformation(是一個類)
a)它描述DataStream之間的轉化關系 。
b)它包含了StreamOperator/UDF 。
c)它包含了很多子類,比如OneInputTransformation/TwoInputTransform/ SourceTransformation/ SinkTransformation/ SplitTransformation等。
3.StreamNode/StreamEdge
StreamNode(算子)/StreamEdge(算子與算子之間的聯系)是通過StreamTransformation來構造。
1.2 StreamGraph轉JobGraph
1.3 JobGraph
從StreamGraph到JobGraph轉換過程中,內部角色也會進行轉換
1.StreamNode->JobVertex:StreamNode轉換為JobVertex
2.StreamEdge->JobEdge:StreamEdge轉換為JobEdge
3.將符合條件的StreamNode chain成一個JobVertex(頂點)
a)沒有禁用Chain
b)上下游算子並行度一致
c)下游算子的入度為1(也就是說下游節點沒有來自其他節點的輸入)
d)上下游算子在同一個slot group下游節點的 chain 策略為 ALWAYS(可以與上下游鏈接,map、flatmap、filter等默認是ALWAYS)
e)上游節點的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接,Source默認是HEAD)
f)上下游算子之間沒有數據shuffle (數據分區方式是 forward)
4.根據group指定JobVertex所屬SlotSharingGroup
5.配置checkpoint策略
6.配置重啟策略
1.4 JobGraph -> ExecutionGraph
1.5 ExecutionGraph
從JobGraph轉換ExecutionGraph的過程中,內部會出現如下的轉換。
1.ExecutionJobVertex <- JobVertex:JobVertex轉換為ExecutionJobVertex 。
2.ExecutionVertex(比如map)可以並發多個任務。
3.ExecutionEdge <- JobEdge:JobEdge轉換為ExecutionEdge。
4.ExecutionGraph 是一個2維結構。
5.根據2維結構分發對應Vertex到指定slot 。
2. DataStreamContext
Flink通過StreamExecutionEnvironment.getExecutionEnvironment()方法獲取一個執行環境,Flink引用是在本地執行,還是以集群方式執行,系統會自動識別。如果是本地執行會調用createLocalEnvironment()方法,如果是集群執行會調用createExecutionEnvironment()。
3. 數據源(DataSource)
Flink數據源可以有兩種實現方式:
1.內置數據源
a)基於文件
b)基於Socket
c)基於Collection
2.自定義數據源
a)實現SourceFunction(非並行的)
b)實現ParallelSourceFunction
c)繼承RichParallelSourceFunction
public class SimpleSourceFunction implements ParallelSourceFunction<Long> {
private long num = 0L;
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Long> sourceContext) throws Exception {
while (isRunning) {
sourceContext.collect(num); num++;
Thread.sleep(10000);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
4. Transformation
Transformation(Operators/操作符/算子):可以將一個或多個DataStream轉換為新的DataStream。
5. DataSink
Flink也包含兩類Sink:
1.常用的sink會在后續的connectors中介紹。
2.自定義Sink
自定義Sink可以實現SinkFunction 接口,也可以繼承RichSinkFunction。
6. 流式迭代運算(Iterations)
簡單理解迭代運算:
當前一次運算的輸出作為下一次運算的輸入(當前運算叫做迭代運算)。不斷反復進行某種運算,直到達到某個條件才跳出迭代(是不是想起了遞歸)
流式迭代運算:
1.它沒有最大迭代次數
2.它需要通過split/filter轉換操作指定流的哪些部分數據反饋給迭代算子,哪些部分數據被轉發到下游DataStream
3.基本套路
1)基於輸入流構建IterativeStream(迭代頭)
2)定義迭代邏輯(map fun等)
3)定義反饋流邏輯(從迭代過的流中過濾出符合條件的元素組成的部分流反饋給迭代頭進行重復計算的邏輯)
4)調用IterativeStream的closeWith方法可以關閉一個迭代(也可表述為定義了迭代尾)
5)定義“終止迭代”的邏輯(符合條件的元素將被分發給下游而不用於進行下一次迭代)
4.流式迭代運算實例
問題域:輸入一組數據,我們對他們分別進行減1運算,直到等於0為止.
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @Author: lifei
* @Date: 2018/12/16 下午6:43
*/
public class IterativeStreamJob {
public static void main(String[] args) throws Exception {
//輸入一組數據,我們對他們分別進行減1運算,直到等於0為止
final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> input=env.generateSequence(0,100);//1,2,3,4,5
//基於輸入流構建IterativeStream(迭代頭)
IterativeStream<Long> itStream=input.iterate();
//定義迭代邏輯(map fun等)
DataStream<Long> minusOne=itStream.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
return value-1;
}
});
//定義反饋流邏輯(從迭代過的流中過濾出符合條件的元素組成的部分流反饋給迭代頭進行重復計算的邏輯)
DataStream<Long> greaterThanZero=minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return value>0;
}
});
//調用IterativeStream的closeWith方法可以關閉一個迭代(也可表述為定義了迭代尾)
itStream.closeWith(greaterThanZero);
//定義“終止迭代”的邏輯(符合條件的元素將被分發給下游而不用於進行下一次迭代)
DataStream<Long> lessThanZero=minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return value<=0;
}
});
lessThanZero.print();
env.execute("IterativeStreamJob");
}
}
7. Execution參數
Controlling Latency(控制延遲)
1.默認情況下,流中的元素並不會一個一個的在網絡中傳輸(這會導致不必要的網絡流量消耗),而是緩存起來,緩存的大小可以在Flink的配置文件、 ExecutionEnvironment、設置某個算子上進行配置(默認100ms)。
1)好處:提高吞吐
2)壞處:增加了延遲
2.如何把握平衡
1)為了最大吞吐量,可以設置setBufferTimeout(-1),這會移除timeout機制,緩存中的數據一滿就會被發送
2)為了最小的延遲,可以將超時設置為接近0的數(例如5或者10ms)
3)緩存的超時不要設置為0,因為設置為0會帶來一些性能的損耗
3.其他更多的Execution參數后面會有專題講解
8. 調試
對於具體開發項目,Flink提供了多種調試手段。Streaming程序發布之前最好先進行調試,看看是不是能按預期執行。為了降低分布式流處理程序調試的難度,Flink提供了一些列方法:
1.本地執行環境
2.Collection Data Sources
3.Iterator Data Sink
本地執行環境:
本地執行環境不需要刻意創建,可以斷點調試
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<String> lines = env.addSource(/* some source */);
env.execute();
Collection Data Sources:
Flink提供了一些Java 集合支持的特殊數據源來使得測試更加容易,程序測試成功后,將source和sink替換成真正source和sink即可。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.fromElements(1, 2, 3, 4, 5);
env.fromCollection(Collection);
env.fromCollection(Iterator, Class);
env.generateSequence(0, 1000)
Iterator Data Sink:
Flink提供一個特殊的sink來收集DataStream的結果
DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)