Flink狀態之OperatorState


知識點:

Flink 為算子狀態(operator state)提供三種基本數據結構:  
    列表狀態(List state): 將狀態表示為一組數據的列表。  
    聯合列表狀態(Union list state): 也將狀態表示為數據的列表。它與常規列表狀態的區別在於,在發生故障時,或者從保 存點(savepoint)啟動應用程序時如何恢復。  
    廣播狀態(Broadcast state) 如果一個算子有多項任務,而它的每項任務狀態又都相同,那么這種特殊情況最適合應 用廣播狀態

 

import org.apache.flink.api.scala._
    env.enableCheckpointing(1 * 60 * 1000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
    env.getCheckpointConfig.setCheckpointTimeout(10 * 60 * 1000)
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    env.setStateBackend(new FsStateBackend("hdfs://localhost:8020/flink/DhCallRecordEs/es/checkpoints")) 
env.getConfig.setUseSnapshotCompression(
true)

 

1、主類

package com.example.demo.flink;

import com.example.demo.flink.impl.CustomSink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


/**
 * @program: demo
 * @description: valuestate
 * @author: yang
 * @create: 2020-12-28 15:46
 */
public class TestOperatorStateMain {
    public static void main(String[] args) throws  Exception{
        //獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        //StreamExecutionEnvironment.getExecutionEnvironment();
        //設置並行度
        env.setParallelism(16);
        //獲取數據源
        DataStreamSource<Tuple2<String, Integer>> dataStreamSource =
                env.fromElements(
                        Tuple2.of("Spark", 3),
                        Tuple2.of("Flink", 5),
                        Tuple2.of("Hadoop", 7),
                        Tuple2.of("Spark", 4));

        // 輸出:
        //(1,5.0)
        //(2,4.0)
        dataStreamSource.addSink(new CustomSink(3)).setParallelism(1);
        env.execute("TestStatefulApi");

    }

}

2、處理實現類

package com.example.demo.flink.impl;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.util.ArrayList;
import java.util.List;

/**
 * @program: demo
 * @description:
 * @author: yang
 * @create: 2020-12-29 11:36
 */
public class CustomSink implements SinkFunction<Tuple2<String,Integer>>, CheckpointedFunction {
    private int threshold;

    private List<Tuple2<String,Integer>> bufferElements;

    private ListState<Tuple2<String,Integer>> checkpointState;

    public CustomSink(int i) {
        this.threshold = i;
        this.bufferElements = new ArrayList<>();
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        //設置快照
        checkpointState.clear();
        for(Tuple2<String,Integer> ele : bufferElements){
            checkpointState.add(ele);
        }
    }


    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor descriptor = new ListStateDescriptor<Tuple2<String,Integer>>("Operator", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
        }));
        //將checkpoint中的數據加載進內存
        checkpointState = context.getOperatorStateStore().getListState(descriptor);
        if(context.isRestored()){
            for (Tuple2<String,Integer> ele: checkpointState.get()) {
                bufferElements.add(ele);
            }
        }
    }

    @Override
    public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
        bufferElements.add(value);
        if(bufferElements.size() == threshold){
            System.out.println("自定義格式:" + bufferElements);
            bufferElements.clear();
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM