1. Task的划分
在flink中,划分task的依據是發生shuffle(也叫redistrubute),或者是並行度發生變化
- 1. wordcount為例

package cn._51doit.flink.day03; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Arrays; public class WordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("feng05", 8888); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { Arrays.stream(line.split(" ")).forEach(w -> out.collect(Tuple2.of(w, 1))); } }); SingleOutputStreamOperator<Tuple2<String, Integer>> filtered = wordAndOne.filter(new FilterFunction<Tuple2<String, Integer>>() { @Override public boolean filter(Tuple2<String, Integer> value) throws Exception { return value.f1 != null; } }); KeyedStream<Tuple2<String, Integer>, Tuple> keyed = filtered.keyBy(0); //SingleOutputStreamOperator並行度為4 SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.sum(1); result.print(); //sink的並行度也是2 env.execute(); } }
其dataflow圖如下所示
socketTextStream是單並行度source,無論你將並行度設置成多少,其並行度都是1,所以到flatMap算子時,並行度就變成了自己設置的4.整個dataflow分成3個Task,9個subTask。
- 2. 改變1,在flatMap后加上startNewChain(),即開啟一個新的鏈
按常理來講,此處的flatMap算子和filter算子間的鏈是要斷開的,但此處自己測試並沒有端,暫時還不知道為什么
- 3. 改變2 在flatMap后加上disableChaining(),即將概算自前后的OperatorChain都斷開,將該算子單獨划分一個task
可以發現,Task數由3變成4,subTask也相應的編程了13個
注意:此處為什么要使用startNewChain、disablechaining呢?
我們在進行計算時,會存在一些計算密集型的算子(比如涉及排序的算子),可以將之獨立出來,然后將其調度到某些機器上,這個算子就能獨享該機器的cpu,內存的資源,提高效率。
總結:Task划分的依據
(1)並行度發生變化時
(2)keyBy() /window()/apply() 等發生 Rebalance 重新分配(即shuffle)
(3)調用startNewChain()方法,開啟一個新的算子鏈
(4)調用diableChaining()方法,即告訴當前算子操作不適用算子鏈操作
2. 共享資源槽(Sharing slot)
2.1 簡單概念
每一個TaskManager(Worker)是一個JVM進程,它可能會在獨立的線程上執行一個或者多個subtask。為了控制一個worker能接收多少個task,worker通過task slot來進行控制
每個task slot表示TaskManager擁有資源的一個固定⼤大⼩小的⼦子集。假如⼀一個TaskManager有三個slot,那么它會將其管理理的內存分成三份給各個slot。資源slot化意味着⼀一個subtask將不不需要跟來⾃自其他job的subtask競爭被管理理的內存,取⽽而代之的是它將擁有⼀一定數量量的內存儲備。需要注意的是,這⾥里里不不會涉及到CPU的隔離,slot⽬目前僅僅⽤用來隔離task的受管理理的內存。
默認情況下,flink允許subtasks共享slots,即使它們是不同tasks的subtasks,只要它們來自同一個job。因此,一個slot可能會負責這個job的某個管道(pipeline)。共享資源槽有如下兩個優點:
- Flink 集群需要與 job 中使用的最高並行度一樣多的 slots。若是沒有sharing slot,就需要計算作業總共包含多少個 tasks,從而判斷集群需要多少slots,非常麻煩。
- 更好的資源利用率。在沒有 slot sharing 的情況下,簡單的 subtasks(source/map())將會占用和復雜的 subtasks (window)一樣多的資源。
如下:
上圖是沒有采用sharing slot的情況,可見2個TaskManager只能使用兩個並行,但若是換成sharing slot,則結果就大不一樣,如下
由圖可明顯看出,同樣的slot數,使用sharing slot的情況並行度由2提高到6,這使得效率大大提高。
2.2 進一步理解
SlotSharingGroup是flink中用來實現slot共享的類,它盡可能的讓subtasks共享一個slot。保證同一個group的sub-tasks共享一個slots。默認的slot sharing group名稱為default,算子也有自己的名字,默認也是default並且算子只能進入與自己名字相同的slot sharing group(即默認一個job下的subtask都可以共享一個slot)。slot sharing group的名字由第一個進入該slot算子的名稱而定,比如第一個進入該slot算子的名稱為feng,則這個slot sharing group的名稱就為feng。
有些時候不想使用資源共享槽,想讓算子單獨享用某個managerTask中的slot(比如一些計算密集型的算子,比如排序、機器學習等),即防止不合理的共享,這時候可以人為的強制指定operator的共享組。比如someStream.filter(...).slotSharingGroup("group1");就強制指定了filter的slot共享組為group1。
提交一個wordcount程序,並行度為4
代碼如下

package cn._51doit.flink.day03; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Arrays; public class SharingGroupDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //使用Socket創建DataStream //socketTextStream是一個非並行的Source,不論並行度設置為多少,總是一個並行 //DataStreamSourc並行度為1 DataStreamSource<String> lines = env.socketTextStream("node-1.51doit.cn", 8888); //DataStream的並行度默認是使用你設置的並行度 //DataStream並行度為4 DataStream<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { Arrays.stream(line.split(" ")).forEach(w -> out.collect(Tuple2.of(w, 1))); } }); //keyBy屬於shuffle(redistribute)算子 //KeyedStream並行度為4 KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); //SingleOutputStreamOperator並行度為4 SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.sum(1); result.print(); env.execute(); } }
現將並行度沈設置成2,能發現有2個slot是空置的
維持並行度為2,但是修改Task資源槽的名稱,讓該Task對應的subTask獨立出來
此處在sum算子上打標簽,即(sum.slotSharingGroup("doit")),sum包括其后面的算子名稱都變為doit,但此處keyed為什么會變doit就不清楚了。
3.Flink的容錯
3.1 State狀態
Flink實時計算程序為了保證計算過程中,出現異常可以容錯,就要將中間的計算結果數據存儲起來,這些中間數據就叫做state。state可以是多種類型的,默認是保存在JobManager的內存中,也可以保存到TaskManager本地文件系統或HDFS這樣的分布式文件系統。
3.2 StateBackEnd
用來保存的存儲后端就叫做StateBackEnd,默認是保存在JobManager的內存中,也可以保存本地系統或HDFS這樣的分布式文件系統。
3.3 CheckPointing
Flink實時計算為了容錯,可以將中間數據定期保存下來,這種定期觸發保存中間結果的機制叫CheckPointing,CheckPointing是周期性執行的,具體的過程是JobManager定期向TaskManager中的SubTask發送RPC消息,SubTask將其計算的State保存到StateBackEnd中,並且向JobManager響應Checkpoint是否成功。如果程序出現異常或者重啟,TaskManager中的SubTask可以從上一次成功的CheckPointing的state恢復,具體見下圖
注意:JobManager只有在接收到所有subTask將計算結果的state成功保存到StateBackEnd的消息后,才會標記checkpoint成功。
3.4 重啟策略
Flink實時計算程序為了容錯,需要開啟CheckPointing,一旦開啟CheckPointing,如果沒有重啟策略,默認的重啟策略是無限重啟,也可以設置成其他的重啟策略,如:重啟固定次數以及重啟間的間隔時間
3.5 CheckPointingMode
- exactly-once
精確一次性語義,可以保證數據消費且消費一次,但是要結合對應的數據源,比如Kafla支持exactly-once
- ar-least-once
至少消費一次,可能會重復消費,但是效率要比exactly-once高
4 state案例
4.1 簡單概念
(1)state概念:
State是Flink計算過程的中間結果和狀態信息,為了容錯,必須把狀態持久化到一個外部系統中去
(2)state的分類
值得看的博客:https://www.lizenghai.com/archives/46460.html(下圖來自此博客)
- KeyState:調用keyBy方法后,每個分區中相互獨立的state
- Operatecast state:沒有分組,每一個subTask自己維護一個狀態
(3)state的使用
- 先定義一個狀態描述器
- 通過context獲取state
- 對數據處理后要更新數據
案例1:重啟策略
RestartStrages

package cn._51doit.flink.day03; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Arrays; public class RestartStrages { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("feng05", 8888); // 開啟checkpoint,索九checkpoint一次 env.enableCheckpointing(5000); // 默認的重啟策略就是無限重啟 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000)); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String word) throws Exception { if (word.equals("feng")) { int i = 1 / 0; } return Tuple2.of(word, 1); } }); //keyBy屬於shuffle(redistribute)算子 //KeyedStream並行度為4 KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); //SingleOutputStreamOperator並行度為4 SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.sum(1); result.print(); env.execute(); } }
發現程序中斷后會重啟,並且重啟后,前面的計算結果還能被復用(sum算子內部實現了state的保存)
案例2:能否自己實現sum算子,既能正確的累加單詞的次數,還能在程序出現異常時容錯
MyHashMapDemo

package cn._51doit.flink.day03; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.HashMap; public class MyHashMapDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); //開啟checkpoint env.enableCheckpointing(5000); //開啟checkpoint,默認的重啟策略就是無限重啟 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000)); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String word) throws Exception { if (word.equals("laoduan")) { int i = 1 / 0; //模擬出現錯誤,任務重啟 } return Tuple2.of(word, 1); } }); KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { private HashMap<String, Integer> state = new HashMap<>(); @Override public Tuple2<String, Integer> map(Tuple2<String, Integer> input) throws Exception { String currentKey = input.f0; Integer currentCount = input.f1; Integer historyCount = state.get(currentKey); if (historyCount == null) { historyCount = 0; } int sum = historyCount + currentCount; //累加 //更新狀態數據(我自己實現的計數器) state.put(currentKey, sum); return Tuple2.of(currentKey, sum); //輸出結果 } }); result.print(); env.execute(); } }
此種定義hashMap的形式只能正確的累加單詞的次數,並不能實現容錯。
案例3:使用keyState實現sum,能滿足需求

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); //開啟checkpoint env.enableCheckpointing(5000); //開啟checkpoint,默認的重啟策略就是無限重啟 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000)); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String word) throws Exception { if (word.equals("laoduan")) { int i = 1 / 0; //模擬出現錯誤,任務重啟 } return Tuple2.of(word, 1); } }); KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { private transient ValueState<Integer> countState; //在構造器方法之后,map方法之前執行一次 @Override public void open(Configuration parameters) throws Exception { //初始化狀態或恢復狀態 //使用狀態的步驟: //1.定義一個狀態描述器,狀態的名稱,存儲數據的類型等 ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>( "wc-state", Integer.class ); //2.使用狀態描述從對應的StateBack器獲取狀態 countState = getRuntimeContext().getState(stateDescriptor); } @Override public Tuple2<String, Integer> map(Tuple2<String, Integer> input) throws Exception { String currentKey = input.f0; Integer currentCount = input.f1; Integer historyCount = countState.value(); if(historyCount == null) { historyCount = 0; } int sum = historyCount + currentCount; //更新state countState.update(sum); return Tuple2.of(currentKey, sum); } }); result.print(); env.execute();
案例4:OperatorState
自定義Source
MyAtLeastOnceSource

package cn._51doit.flink.day03; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.io.RandomAccessFile; public class MyAtLeastOnceSource extends RichParallelSourceFunction<String> implements CheckpointedFunction { private transient ListState<Long> listState; private boolean flag = true; private Long offset = 0L; //在構造方法之后,open方法之前執行一次,用於初始化Operator State或恢復Operator State @Override public void initializeState(FunctionInitializationContext context) throws Exception { //定義一個狀態描述器 ListStateDescriptor<Long> stateDescriptor = new ListStateDescriptor<>( "offset-state", Long.class ); //listState中存儲的就是一個long類型的數值 listState = context.getOperatorStateStore().getListState(stateDescriptor); //從ListState中恢復數據 if(context.isRestored()) { for (Long first : listState.get()) offset = first; } } //snapshotState方法是在checkpoint時,會調用 @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { //將上一次checkpoint的數據清除 listState.clear(); //將最新的偏移量保存到ListState中 listState.add(offset); } @Override public void run(SourceContext<String> ctx) throws Exception { int taskIndex = getRuntimeContext().getIndexOfThisSubtask(); RandomAccessFile raf = new RandomAccessFile("/Users/xing/Desktop/data/" + taskIndex + ".txt", "r"); //從指定的位置讀取數據 raf.seek(offset); //獲取一個checkpoint的鎖 final Object checkpointLock = ctx.getCheckpointLock(); while (flag) { String line = raf.readLine(); if(line != null) { //獲取最新的偏移量 synchronized (checkpointLock) { line = new String(line.getBytes("ISO-8859-1"), "UTF-8"); offset = raf.getFilePointer(); ctx.collect(taskIndex + ".txt => " + line); } } else { Thread.sleep(1000); } } } @Override public void cancel() { flag = false; } }
MyAtLeastOnceSourceDemo

package cn._51doit.flink.day03; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class MyAtLeastOnceSourceDemo { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(30000); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000)); //自定義一個多並行的Source DataStreamSource<String> lines1 = env.addSource(new MyAtLeastOnceSource()); DataStreamSource<String> lines2 = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator<String> error = lines2.map(new MapFunction<String, String>() { @Override public String map(String line) throws Exception { if (line.startsWith("error")) { int i = 1 / 0; } return line; } }); DataStream<String> union = lines1.union(error); union.print(); env.execute(); } }
兩次checkpoint之間的數據會被重復讀,所以是AtListOnce
MyHashMaoDemo