知識點:
Flink 為算子狀態(operator state)提供三種基本數據結構:
列表狀態(List state): 將狀態表示為一組數據的列表。
聯合列表狀態(Union list state): 也將狀態表示為數據的列表。它與常規列表狀態的區別在於,在發生故障時,或者從保 存點(savepoint)啟動應用程序時如何恢復。
廣播狀態(Broadcast state) 如果一個算子有多項任務,而它的每項任務狀態又都相同,那么這種特殊情況最適合應 用廣播狀態
import org.apache.flink.api.scala._ env.enableCheckpointing(1 * 60 * 1000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) env.getCheckpointConfig.setCheckpointTimeout(10 * 60 * 1000) env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) env.setStateBackend(new FsStateBackend("hdfs://localhost:8020/flink/DhCallRecordEs/es/checkpoints"))
env.getConfig.setUseSnapshotCompression(true)
1、主類
package com.example.demo.flink; import com.example.demo.flink.impl.CustomSink; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @program: demo * @description: valuestate * @author: yang * @create: 2020-12-28 15:46 */ public class TestOperatorStateMain { public static void main(String[] args) throws Exception{ //獲取執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); //StreamExecutionEnvironment.getExecutionEnvironment(); //設置並行度 env.setParallelism(16); //獲取數據源 DataStreamSource<Tuple2<String, Integer>> dataStreamSource = env.fromElements( Tuple2.of("Spark", 3), Tuple2.of("Flink", 5), Tuple2.of("Hadoop", 7), Tuple2.of("Spark", 4)); // 輸出: //(1,5.0) //(2,4.0) dataStreamSource.addSink(new CustomSink(3)).setParallelism(1); env.execute("TestStatefulApi"); } }
2、處理實現類
package com.example.demo.flink.impl; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import java.util.ArrayList; import java.util.List; /** * @program: demo * @description: * @author: yang * @create: 2020-12-29 11:36 */ public class CustomSink implements SinkFunction<Tuple2<String,Integer>>, CheckpointedFunction { private int threshold; private List<Tuple2<String,Integer>> bufferElements; private ListState<Tuple2<String,Integer>> checkpointState; public CustomSink(int i) { this.threshold = i; this.bufferElements = new ArrayList<>(); } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { //設置快照 checkpointState.clear(); for(Tuple2<String,Integer> ele : bufferElements){ checkpointState.add(ele); } } @Override public void initializeState(FunctionInitializationContext context) throws Exception { ListStateDescriptor descriptor = new ListStateDescriptor<Tuple2<String,Integer>>("Operator", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() { })); //將checkpoint中的數據加載進內存 checkpointState = context.getOperatorStateStore().getListState(descriptor); if(context.isRestored()){ for (Tuple2<String,Integer> ele: checkpointState.get()) { bufferElements.add(ele); } } } @Override public void invoke(Tuple2<String, Integer> value, Context context) throws Exception { bufferElements.add(value); if(bufferElements.size() == threshold){ System.out.println("自定義格式:" + bufferElements); bufferElements.clear(); } } }