Flink中案例學習--State與CheckPoint理解


1、State概念理解

在Flink中,按照基本類型,對State做了以下兩類的划分:Keyed State, Operator State。

Keyed State:和Key有關的狀態類型,它只能被基於KeyedStream之上的操作,方法所使用。我們可以從邏輯上理解這種狀態是一個並行度操作實例和一種Key的對應, <parallel-operator-instance, key>。
Operator State:(或者non-keyed state),它是和Key無關的一種狀態類型。相應地我們從邏輯上去理解這個概念,它相當於一個並行度實例,對應一份狀態數據。因為這里沒有涉及Key的概念,所以在並行度(擴/縮容)發生變化的時候,這里會有狀態數據的重分布的處理。

概念理解如下圖:

1、如果一個job沒有設置checkpoint,那么state默認是是保存在java的堆內存中,這樣會導致task失敗后,state存在丟失現象;

2、checkpoint在一個job中負責一份全局的狀態快照,里邊包含了所有的task和operator狀態;

3、task指的是flink中執行的基本單位,operator指的是算子操作;

4、state可以被記錄,也可以在失敗的時候被恢復;

5、state存在兩種,一種是 key state, 一種是 operator state;

 

1.1 Keyed State 應用示例:

 

關鍵點總結:

1、上述State對象,僅僅是用來與狀態進行交互,包括狀態的更新,狀態刪除,狀態清空等。

2、真正的狀態值可能存在內存、磁盤、或者其他分布式存儲系統中。

 

代碼示例:

public class StateManager extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * 操作 state 的句柄
     * @param longLongTuple2
     * @param collector
     * @throws Exception
     */

    private transient ValueState<Tuple2<Long, Long>> sum;


    @Override
    public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {

        //獲取state值
        Tuple2<Long, Long> currentSum = sum.value();

        currentSum.f0 = currentSum.f0 + 1;
        currentSum.f1 = currentSum.f1 + value.f1;

        //操作state更新
        sum.update(currentSum);

        //輸出flatMap的算子結果
        if(currentSum.f0 >= 2)
        {
            out.collect(new Tuple2<Long, Long>(value.f0, currentSum.f1/currentSum.f0));
        }

    }


    @Override
    public void open(Configuration parameters) throws Exception {

        ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>(
                "average",                                                      //狀態的名稱
                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),      //狀態的類型
                Tuple2.of(0L, 0L)                                               //狀態的初始默認值
        );

        sum = getRuntimeContext().getState(descriptor);

    }
}

 

1.2 Operator State 應用示例:

 

 

2、checkpoint的應用示例

基於狀態的容錯:

1、依靠checkpoint機制;

2、保證exactly-once;

3、只能保證flink系統內的exactly-once;

4、對source和sink需要依賴外部的組建一同保證;

state的存入:

state恢復:

checkpoint概念:

 

 checkpoint的配置:

1、默認是disable,需要手動開啟;

2、checkpoint開啟后,默認的 checkpointMode 是Exactly-once;

3、checkpointMode有兩種,一種是 Exactly-once, 另一種是 At-least-once;

4、Exactly-once大多數程序是適合的, At-least-once可能用在某些延遲超低的應用程序(始終延遲幾ms)

代碼配置如下:

        //獲取flink的運行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 每隔1000 ms進行啟動一個檢查點【設置checkpoint的周期】
        env.enableCheckpointing(1000);
        // 高級選項:
        // 設置模式為exactly-once (這是默認值)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 檢查點必須在一分鍾內完成,或者被丟棄【checkpoint的超時時間】
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 同一時間只允許進行一個檢查點
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint【詳細解釋見備注】
        //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint
        //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程序被cancel后,會刪除Checkpoint數據,只有job執行失敗的時候才會保存checkpoint


        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

 

3、State Backend的應用示例

 三種保存方式介紹:

代碼示例:

//設置statebackend
 //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM