1、State概念理解
在Flink中,按照基本類型,對State做了以下兩類的划分:Keyed State, Operator State。
Keyed State:和Key有關的狀態類型,它只能被基於KeyedStream之上的操作,方法所使用。我們可以從邏輯上理解這種狀態是一個並行度操作實例和一種Key的對應, <parallel-operator-instance, key>。
Operator State:(或者non-keyed state),它是和Key無關的一種狀態類型。相應地我們從邏輯上去理解這個概念,它相當於一個並行度實例,對應一份狀態數據。因為這里沒有涉及Key的概念,所以在並行度(擴/縮容)發生變化的時候,這里會有狀態數據的重分布的處理。
概念理解如下圖:
1、如果一個job沒有設置checkpoint,那么state默認是是保存在java的堆內存中,這樣會導致task失敗后,state存在丟失現象;
2、checkpoint在一個job中負責一份全局的狀態快照,里邊包含了所有的task和operator狀態;
3、task指的是flink中執行的基本單位,operator指的是算子操作;
4、state可以被記錄,也可以在失敗的時候被恢復;
5、state存在兩種,一種是 key state, 一種是 operator state;
1.1 Keyed State 應用示例:
關鍵點總結:
1、上述State對象,僅僅是用來與狀態進行交互,包括狀態的更新,狀態刪除,狀態清空等。
2、真正的狀態值可能存在內存、磁盤、或者其他分布式存儲系統中。
代碼示例:
public class StateManager extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { /** * 操作 state 的句柄 * @param longLongTuple2 * @param collector * @throws Exception */ private transient ValueState<Tuple2<Long, Long>> sum; @Override public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { //獲取state值 Tuple2<Long, Long> currentSum = sum.value(); currentSum.f0 = currentSum.f0 + 1; currentSum.f1 = currentSum.f1 + value.f1; //操作state更新 sum.update(currentSum); //輸出flatMap的算子結果 if(currentSum.f0 >= 2) { out.collect(new Tuple2<Long, Long>(value.f0, currentSum.f1/currentSum.f0)); } } @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>( "average", //狀態的名稱 TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), //狀態的類型 Tuple2.of(0L, 0L) //狀態的初始默認值 ); sum = getRuntimeContext().getState(descriptor); } }
1.2 Operator State 應用示例:
2、checkpoint的應用示例
基於狀態的容錯:
1、依靠checkpoint機制;
2、保證exactly-once;
3、只能保證flink系統內的exactly-once;
4、對source和sink需要依賴外部的組建一同保證;
state的存入:
state恢復:
checkpoint概念:
checkpoint的配置:
1、默認是disable,需要手動開啟;
2、checkpoint開啟后,默認的 checkpointMode 是Exactly-once;
3、checkpointMode有兩種,一種是 Exactly-once, 另一種是 At-least-once;
4、Exactly-once大多數程序是適合的, At-least-once可能用在某些延遲超低的應用程序(始終延遲幾ms)
代碼配置如下:
//獲取flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 每隔1000 ms進行啟動一個檢查點【設置checkpoint的周期】 env.enableCheckpointing(1000); // 高級選項: // 設置模式為exactly-once (這是默認值) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 檢查點必須在一分鍾內完成,或者被丟棄【checkpoint的超時時間】 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一時間只允許進行一個檢查點 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint【詳細解釋見備注】 //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程序被cancel后,會刪除Checkpoint數據,只有job執行失敗的時候才會保存checkpoint env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
3、State Backend的應用示例
三種保存方式介紹:
代碼示例:
//設置statebackend //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));