flink03-----1.Task的划分 2.共享資源槽 3.flink的容錯


1. Task的划分

  在flink中,划分task的依據是發生shuffle(也叫redistrubute),或者是並行度發生變化

  • 1.  wordcount為例
package cn._51doit.flink.day03;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class WordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                Arrays.stream(line.split(" ")).forEach(w -> out.collect(Tuple2.of(w, 1)));
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> filtered = wordAndOne.filter(new FilterFunction<Tuple2<String, Integer>>() {
            @Override
            public boolean filter(Tuple2<String, Integer> value) throws Exception {
                return value.f1 != null;
            }
        });
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = filtered.keyBy(0);
        //SingleOutputStreamOperator並行度為4
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.sum(1);
        result.print(); //sink的並行度也是2
        env.execute();
    }
}
View Code

其dataflow圖如下所示

 

 

 

 socketTextStream是單並行度source,無論你將並行度設置成多少,其並行度都是1,所以到flatMap算子時,並行度就變成了自己設置的4.整個dataflow分成3個Task,9個subTask。

  • 2.  改變1,在flatMap后加上startNewChain(),即開啟一個新的鏈

     按常理來講,此處的flatMap算子和filter算子間的鏈是要斷開的,但此處自己測試並沒有端,暫時還不知道為什么

  • 3. 改變2   在flatMap后加上disableChaining(),即將概算自前后的OperatorChain都斷開,將該算子單獨划分一個task

 

 可以發現,Task數由3變成4,subTask也相應的編程了13個

 

注意:此處為什么要使用startNewChain、disablechaining呢?

  我們在進行計算時,會存在一些計算密集型的算子(比如涉及排序的算子),可以將之獨立出來,然后將其調度到某些機器上,這個算子就能獨享該機器的cpu,內存的資源,提高效率。

 

總結:Task划分的依據

(1)並行度發生變化時

(2)keyBy() /window()/apply() 等發生 Rebalance 重新分配(即shuffle)

(3)調用startNewChain()方法,開啟一個新的算子鏈

(4)調用diableChaining()方法,即告訴當前算子操作不適用算子鏈操作

 

2. 共享資源槽(Sharing slot)

2.1 簡單概念

   每一個TaskManager(Worker)是一個JVM進程,它可能會在獨立的線程上執行一個或者多個subtask。為了控制一個worker能接收多少個task,worker通過task slot來進行控制

  每個task slot表示TaskManager擁有資源的一個固定⼤大⼩小的⼦子集。假如⼀一個TaskManager有三個slot,那么它會將其管理理的內存分成三份給各個slot。資源slot化意味着⼀一個subtask將不不需要跟來⾃自其他job的subtask競爭被管理理的內存,取⽽而代之的是它將擁有⼀一定數量量的內存儲備。需要注意的是,這⾥里里不不會涉及到CPU的隔離,slot⽬目前僅僅⽤用來隔離task的受管理理的內存。

  默認情況下,flink允許subtasks共享slots,即使它們是不同tasks的subtasks,只要它們來自同一個job。因此,一個slot可能會負責這個job的某個管道(pipeline)。共享資源槽有如下兩個優點:

  • Flink 集群需要與 job 中使用的最高並行度一樣多的 slots。若是沒有sharing slot,就需要計算作業總共包含多少個 tasks,從而判斷集群需要多少slots,非常麻煩。
  • 更好的資源利用率。在沒有 slot sharing 的情況下,簡單的 subtasks(source/map())將會占用和復雜的 subtasks (window)一樣多的資源。

如下:

 

 上圖是沒有采用sharing slot的情況,可見2個TaskManager只能使用兩個並行,但若是換成sharing slot,則結果就大不一樣,如下

 

 由圖可明顯看出,同樣的slot數,使用sharing slot的情況並行度由2提高到6,這使得效率大大提高。

 

2.2 進一步理解

  SlotSharingGroup是flink中用來實現slot共享的類,它盡可能的讓subtasks共享一個slot。保證同一個group的sub-tasks共享一個slots。默認的slot sharing group名稱為default,算子也有自己的名字,默認也是default並且算子只能進入與自己名字相同的slot sharing group(即默認一個job下的subtask都可以共享一個slot)。slot sharing group的名字由第一個進入該slot算子的名稱而定,比如第一個進入該slot算子的名稱為feng,則這個slot sharing group的名稱就為feng。

 

有些時候不想使用資源共享槽,想讓算子單獨享用某個managerTask中的slot(比如一些計算密集型的算子,比如排序、機器學習等),即防止不合理的共享,這時候可以人為的強制指定operator的共享組。比如someStream.filter(...).slotSharingGroup("group1");就強制指定了filter的slot共享組為group1。

提交一個wordcount程序,並行度為4

代碼如下

package cn._51doit.flink.day03;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class SharingGroupDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //使用Socket創建DataStream
        //socketTextStream是一個非並行的Source,不論並行度設置為多少,總是一個並行
        //DataStreamSourc並行度為1
        DataStreamSource<String> lines = env.socketTextStream("node-1.51doit.cn", 8888);

        //DataStream的並行度默認是使用你設置的並行度
        //DataStream並行度為4
        DataStream<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                Arrays.stream(line.split(" ")).forEach(w -> out.collect(Tuple2.of(w, 1)));
            }
        });

        //keyBy屬於shuffle(redistribute)算子
        //KeyedStream並行度為4
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
        //SingleOutputStreamOperator並行度為4
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.sum(1);

        result.print();

        env.execute();

    }
}
View Code

 

 

 現將並行度沈設置成2,能發現有2個slot是空置的

 

 

 

 維持並行度為2,但是修改Task資源槽的名稱,讓該Task對應的subTask獨立出來

 此處在sum算子上打標簽,即(sum.slotSharingGroup("doit")),sum包括其后面的算子名稱都變為doit,但此處keyed為什么會變doit就不清楚了。

 

 

 

3.Flink的容錯

3.1 State狀態

  Flink實時計算程序為了保證計算過程中,出現異常可以容錯,就要將中間的計算結果數據存儲起來,這些中間數據就叫做state。state可以是多種類型的,默認是保存在JobManager的內存中,也可以保存到TaskManager本地文件系統或HDFS這樣的分布式文件系統。

3.2 StateBackEnd

  用來保存的存儲后端就叫做StateBackEnd,默認是保存在JobManager的內存中,也可以保存本地系統或HDFS這樣的分布式文件系統。

3.3 CheckPointing

  Flink實時計算為了容錯,可以將中間數據定期保存下來,這種定期觸發保存中間結果的機制叫CheckPointing,CheckPointing是周期性執行的,具體的過程是JobManager定期向TaskManager中的SubTask發送RPC消息,SubTask將其計算的State保存到StateBackEnd中,並且向JobManager響應Checkpoint是否成功。如果程序出現異常或者重啟,TaskManager中的SubTask可以從上一次成功的CheckPointing的state恢復,具體見下圖

 

 

 注意:JobManager只有在接收到所有subTask將計算結果的state成功保存到StateBackEnd的消息后,才會標記checkpoint成功。

3.4 重啟策略

  Flink實時計算程序為了容錯,需要開啟CheckPointing,一旦開啟CheckPointing,如果沒有重啟策略,默認的重啟策略是無限重啟,也可以設置成其他的重啟策略,如:重啟固定次數以及重啟間的間隔時間

 

3.5 CheckPointingMode

  • exactly-once

  精確一次性語義,可以保證數據消費且消費一次,但是要結合對應的數據源,比如Kafla支持exactly-once

  • ar-least-once

  至少消費一次,可能會重復消費,但是效率要比exactly-once高

 

4  state案例

4.1 簡單概念

(1)state概念:

  State是Flink計算過程的中間結果和狀態信息,為了容錯,必須把狀態持久化到一個外部系統中去

(2)state的分類

值得看的博客:https://www.lizenghai.com/archives/46460.html(下圖來自此博客)

  • KeyState:調用keyBy方法后,每個分區中相互獨立的state
  • Operatecast state:沒有分組,每一個subTask自己維護一個狀態

 

 

 

(3)state的使用

  • 先定義一個狀態描述器
  • 通過context獲取state
  • 對數據處理后要更新數據

案例1:重啟策略

RestartStrages

package cn._51doit.flink.day03;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class RestartStrages {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
        // 開啟checkpoint,索九checkpoint一次
        env.enableCheckpointing(5000);
        // 默認的重啟策略就是無限重啟
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                if (word.equals("feng")) {
                    int i = 1 / 0;
                }
                return Tuple2.of(word, 1);
            }
        });
        //keyBy屬於shuffle(redistribute)算子
        //KeyedStream並行度為4
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
        //SingleOutputStreamOperator並行度為4
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.sum(1);
        result.print();
        env.execute();
    }
}
View Code

發現程序中斷后會重啟,並且重啟后,前面的計算結果還能被復用(sum算子內部實現了state的保存)

 

案例2:能否自己實現sum算子,既能正確的累加單詞的次數,還能在程序出現異常時容錯

MyHashMapDemo

package cn._51doit.flink.day03;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.HashMap;

public class MyHashMapDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
        //開啟checkpoint
        env.enableCheckpointing(5000); //開啟checkpoint,默認的重啟策略就是無限重啟
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                if (word.equals("laoduan")) {
                    int i = 1 / 0; //模擬出現錯誤,任務重啟
                }
                return Tuple2.of(word, 1);
            }
        });
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            private HashMap<String, Integer> state = new HashMap<>();
            @Override
            public Tuple2<String, Integer> map(Tuple2<String, Integer> input) throws Exception {
                String currentKey = input.f0;
                Integer currentCount = input.f1;
                Integer historyCount = state.get(currentKey);
                if (historyCount == null) {
                    historyCount = 0;
                }
                int sum = historyCount + currentCount; //累加
                //更新狀態數據(我自己實現的計數器)
                state.put(currentKey, sum);
                return Tuple2.of(currentKey, sum); //輸出結果
            }
        });
        result.print();
        env.execute();
    }
}
View Code

此種定義hashMap的形式只能正確的累加單詞的次數,並不能實現容錯。

案例3:使用keyState實現sum,能滿足需求

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
        //開啟checkpoint
        env.enableCheckpointing(5000); //開啟checkpoint,默認的重啟策略就是無限重啟
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                if (word.equals("laoduan")) {
                    int i = 1 / 0; //模擬出現錯誤,任務重啟
                }
                return Tuple2.of(word, 1);
            }
        });

        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {

            private transient ValueState<Integer> countState;

            //在構造器方法之后,map方法之前執行一次
            @Override
            public void open(Configuration parameters) throws Exception {
                //初始化狀態或恢復狀態
                //使用狀態的步驟:
                //1.定義一個狀態描述器,狀態的名稱,存儲數據的類型等
                ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>(
                        "wc-state",
                        Integer.class
                );
                //2.使用狀態描述從對應的StateBack器獲取狀態
                countState = getRuntimeContext().getState(stateDescriptor);
            }

            @Override
            public Tuple2<String, Integer> map(Tuple2<String, Integer> input) throws Exception {
                String currentKey = input.f0;
                Integer currentCount = input.f1;
                Integer historyCount = countState.value();
                if(historyCount == null) {
                    historyCount = 0;
                }
                int sum = historyCount + currentCount;
                //更新state
                countState.update(sum);
                return Tuple2.of(currentKey, sum);
            }
        });

        result.print();

        env.execute();
View Code

 案例4:OperatorState

 

 自定義Source

MyAtLeastOnceSource

package cn._51doit.flink.day03;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.io.RandomAccessFile;

public class MyAtLeastOnceSource extends RichParallelSourceFunction<String> implements CheckpointedFunction {

    private transient ListState<Long> listState;

    private boolean flag = true;
    private Long offset = 0L;

    //在構造方法之后,open方法之前執行一次,用於初始化Operator State或恢復Operator State
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        //定義一個狀態描述器
        ListStateDescriptor<Long> stateDescriptor = new ListStateDescriptor<>(
                "offset-state",
                Long.class
        );
        //listState中存儲的就是一個long類型的數值
        listState = context.getOperatorStateStore().getListState(stateDescriptor);

        //從ListState中恢復數據
        if(context.isRestored()) {
            for (Long first : listState.get())
                offset = first;
        }
    }

    //snapshotState方法是在checkpoint時,會調用
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        //將上一次checkpoint的數據清除
        listState.clear();
        //將最新的偏移量保存到ListState中
        listState.add(offset);
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
        RandomAccessFile raf = new RandomAccessFile("/Users/xing/Desktop/data/" + taskIndex + ".txt", "r");
        //從指定的位置讀取數據
        raf.seek(offset);
        //獲取一個checkpoint的鎖
        final Object checkpointLock = ctx.getCheckpointLock();
        while (flag) {
            String line = raf.readLine();
            if(line != null) {
                //獲取最新的偏移量
                synchronized (checkpointLock) {
                    line = new String(line.getBytes("ISO-8859-1"), "UTF-8");
                    offset = raf.getFilePointer();
                    ctx.collect(taskIndex + ".txt => "  + line);
                }
            } else {
                Thread.sleep(1000);
            }
        }

    }

    @Override
    public void cancel() {
        flag = false;
    }
}
View Code

MyAtLeastOnceSourceDemo

package cn._51doit.flink.day03;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MyAtLeastOnceSourceDemo {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.enableCheckpointing(30000);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));

        //自定義一個多並行的Source
        DataStreamSource<String> lines1 = env.addSource(new MyAtLeastOnceSource());

        DataStreamSource<String> lines2 = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<String> error = lines2.map(new MapFunction<String, String>() {
            @Override
            public String map(String line) throws Exception {
                if (line.startsWith("error")) {
                    int i = 1 / 0;
                }
                return line;
            }
        });

        DataStream<String> union = lines1.union(error);

        union.print();

        env.execute();

    }
}
View Code

兩次checkpoint之間的數據會被重復讀,所以是AtListOnce

 

  

 

 

 

 

 

 

 

 

MyHashMaoDemo


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM