Flink状态保存CheckPoint


知识点:

一致性:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/guarantees.html

        env.setStateBackend( new FsStateBackend("hdfs://uat-datacenter1:8020/flink/checkpoint"));

        env.enableCheckpointing(5000);
     env.getCheckpointConfig.setCheckpointTimeout(60000L);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));

        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

1、主类

package com.example.demo.flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.concurrent.TimeUnit;

/**
 * @program: demo
 * @description:
 * @author: yang
 * @create: 2020-12-29 14:14
 */
public class TestCheckpoint {

    public static void main(String[] args) throws Exception {
//        ParameterTool parameterTool = ParameterTool.fromArgs(args);
//        String hostname = parameterTool.get("hostname");
//        int port = parameterTool.getInt("port");

        String hostname = "uat-datacenter2";
        int port = 5000;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStateBackend( new FsStateBackend("hdfs://uat-datacenter1:8020/flink/checkpoint")); //状态后端设置

        env.enableCheckpointing(5000); //保存时间
        env.getCheckpointConfig.setCheckpointTimeout(60000L); //保存超时时间
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //保存状态一致性

        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); //最大并行保存checkpoint个数

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS))); //任务失败重启机制

        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        DataStreamSource<String> dataStreamSource = env.socketTextStream(hostname, port);

        SingleOutputStreamOperator<String> result = dataStreamSource.map(new MapFunction<String, String>() {

            @Override
            public String map(String s) throws Exception {
                return "hs_" + s;
            }
        }).uid("split-map");

        result.print().uid("print-operator");

        env.execute("test");
    }
}

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM