Flink系列之狀態及檢查點


  Flink不同於其他實時計算的框架之處是它可以提供針對不同的狀態進行編程和計算。本篇文章的主要思路如下,大家可以選擇性閱讀。

  1. Flink的狀態分類及不同點。

  2. Flink針對不同的狀態進行編程。

  3. 檢查點機制和配置。

  4. 狀態的存儲。

  

  •  Flilnk的狀態分類及不同點

    Flink有兩種不同的狀態分類,一種是Keyed State(鍵狀態),一種是Operator State(算子狀態)。

    • Keyed State

      主要是針對KeyedStream中使用,當使用keyBy方法的時候進行計算。 我們都知道在計算的過程中就是將Flink按照<並行operator, key> 進行計算,每個key又歸屬於單個Operator,所以我們可以簡單的理解為<operator, key>。也就是說首先按Operator分配到不同的實例,然后在不同的實例中,相同的Key分配到相同的組中,然后這些狀態就可以在相同的組中進行獲取和計算。

    • Operator State

      主要針對不同的算子的狀態計算。按照不同的算子如Map, FlatMap,Reduce等算子去分配不同的實例群。像Kafka Connector的例子就很好的應用了這個功能,根據不同的topic去讀取不同的狀態,比如計算獲取到topic的paritition分區和 offset偏移量。 每個算子實例會維護着這個topic的partition及offset的Map狀態,這個例子就是很好的使用了Opertator的state。如果Operator並行度發生改變了的話,那么狀態也會相應的分配好對應的狀態。

 

  • 可管理的及原生狀態

  這兩種狀態又分為 Managed State (可管理狀態)和 Raw State (原生狀態)

    • Managed State : 可管理狀態就是自己去定義和編寫狀態處理的邏輯,全部由自己和Flink進行控制。
    • Raw State : 原生狀態也就是在Operator算子觸發 checkPoint 檢查點的時候,Flink會在其數據結構中寫入一部分字節碼Byte,Flink只能看到其中有一些碼,但是無法去進行控制。

  所有的流數據功能都可以使用Managed State,這個也是Flink編程所推薦的。因為要使用Raw State的話比較底層也比較復雜,要實現算子方法時才使用。

 

  • Flink針對不同的狀態進行編程

  我們只針對可管理的狀態進行操作,不同的管理 Keyed State 和Operator State 狀態原始方法定義可參考官網介紹

    • Keyed State

     我們針對Keyed managed state進行編程。來個場景,假如Flink計算某個功能的時間,如果某個功能Key時間超過某個閾值了則進行計數,如果數據超過了設置的次數,那么直接輸出到控制台。直接參考如下代碼。

  代碼大致的思路是:

  繼承RichFlatMapFunction, 定義一個ListState<Long>用於記錄當前的狀態。

  定義閾值和錯誤次數值,觸發后直接輸出控制台下。

  open方法實例化ListState。在里邊設置了一下狀態的TTL,即狀態的生命周期。

  flatMap方法按key分配后的value進行判斷和記錄。

  最后main方法進行數據准備和輸出。

package myflink.state;

import org.apache.commons.compress.utils.Lists;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class ThresholdWarning extends RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Long>>> {

    //通過ListState來存儲非正常數據的狀態
    private transient ListState<Long> abnormalData;
    //需要監控的閾值
    private Long threshold;
    //觸發報警的次數
    private Integer numberOfTimes;

    public ThresholdWarning(Long threshold, Integer numberOfTimes) {
        this.threshold = threshold;
        this.numberOfTimes = numberOfTimes;
    }

    @Override
    public void open(Configuration parameters) throws Exception {

        ListStateDescriptor listStateDescriptor = new ListStateDescriptor<Long>("abnormal-state",
                TypeInformation.of(Long.class));

        //狀態存活生命周期設置TTL Time To Live
        StateTtlConfig  ttlConfig = StateTtlConfig
                //設置有效期為10秒
                .newBuilder(Time.seconds(10L))
                //設置有效的更新規則,當創建和寫入的時候需要重新更新為10S
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                //設置狀態的可見性,設置狀態如果沒有刪除,那么就是可見的,另外一個值:ReturnExpiredIfNotCleanedUp ,
                // 如果沒有清理的話,狀態會一直可見的
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        //設置TTL配置
        listStateDescriptor.enableTimeToLive(ttlConfig);

        //通過狀態名稱(句柄)獲取狀態實例,如果不存在則會自動建
        abnormalData = getRuntimeContext().getListState(new ListStateDescriptor<Long>("abnormal-state",
                TypeInformation.of(Long.class)));
    }



    @Override
    public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Long>>> out) throws Exception {
        Long inputValue = value.f1;
        //如果輸入的值超過閾值,則記錄該次不正常的數據信息
        if(inputValue >= threshold) {
            abnormalData.add(inputValue);
        }

        ArrayList<Long> list = Lists.newArrayList(abnormalData.get().iterator());

        //如果不正常的數據超過了指定的數量,則輸出報警信息
        if(list.size() >= numberOfTimes) {
            out.collect(Tuple2.of(value.f0 + " 超過指定閾值數量", list));
            //報警信息輸出后,清空狀態
            abnormalData.clear();
        }
    }

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //設置並行度為1,用於觀察輸出
//        env.setParallelism(1);

        DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.fromElements(
                Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),
                Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),
                Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),
                Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));

        tuple2DataStreamSource
                .keyBy(0)
                //超過100的閾值3次后輸出報警信息
                .flatMap(new ThresholdWarning(100L, 3))
                .printToErr();

        env.execute("Managed Keyed State");
    }
}

  輸出的結果如下,大於等於100的出現3次即進行輸出。和我們想象的都一樣。

1> (b 超過指定閾值數量,[100, 200, 200])
1> (b 超過指定閾值數量,[500, 600, 700])
3> (a 超過指定閾值數量,[400, 100, 200])
    •  operator state

  我們還在原來基礎的例子上調整一下,不按key,按Operator類型,只要超過時間的次數達到了就要輸出。在其中,把Operator的hashCode進行輸出一下,用於區分是否為相同的Operator。首先我們將並行度設置為1,然后一會兒再把並行度調整成2。

  代碼的大致思路為:

  繼承RichFlatMapFunction,實現CheckpointedFunction接口,即在觸發檢查點的時候進行操作。

  initializeState方法的時候將opertor的狀態和檢查點狀態進行初始化。

  snapshotState方法即存儲狀態時將當時的鏡像進行存儲。可以存儲到外部設備。

  flatMap的時候進行閾值判斷和數據收集。

  main方法進行檢查點設置,數據准備,執行和輸出。

package myflink.state;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class ThresholdOperatorWarning extends RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String,
        List<Tuple2<String, Long>>>> implements CheckpointedFunction {


    //非正確數據狀態
    private List<Tuple2<String, Long>> bufferedData;

    //檢查點狀態
    private transient ListState<Tuple2<String, Long>> checkPointedState;

    //需要監控的閾值
    private Long threshold;
    //次數
    private Integer numberOfTimes;

    ThresholdOperatorWarning(Long threshold, Integer numberOfTimes) {
        this.threshold = threshold;
        this.numberOfTimes = numberOfTimes;
        this.bufferedData = new ArrayList<>();
    }

    @Override
    public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Tuple2<String, Long>>>> out)
            throws Exception {
        Long inputValue = value.f1;

        //超過閾值則進行記錄
        if(inputValue >= threshold) {
            bufferedData.add(value);
        }

        //超過指定次數則進行匯總和匯總輸出
        if(bufferedData.size() >= numberOfTimes) {
            //輸出狀態實例的hashCode
            out.collect(Tuple2.of(checkPointedState.hashCode() + "閾值報警! " , bufferedData ));
            //清理緩存
            bufferedData.clear();
        }

    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        //當數據進行快照時,將數據存儲到checkPointedState
        checkPointedState.clear();
        for (Tuple2<String, Long> element : bufferedData) {
            checkPointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        //這里獲取的是operatorStateStore
        checkPointedState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Tuple2<String, Long>>(
                "abnormalData", TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})
        ));

        //如果發生重啟,則需要從快照中將狀態進行恢復
        if(context.isRestored()) {
            for (Tuple2<String, Long> element : checkPointedState.get()) {
                bufferedData.add(element);
            }
        }
    }


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();



        //開啟檢查點
        env.enableCheckpointing(1000L);
        // 其他可選配置如下:

        // 設置語義,默認是EXACTLY_ONCE
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 設置檢查點之間最小停頓時間
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 設置執行Checkpoint操作時的超時時間
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 設置最大並發執行的檢查點的數量
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 將檢查點持久化到外部存儲
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);



        //設置並行度為1
        env.setParallelism(1);
        //數據源

        DataStreamSource<Tuple2<String, Long>>  tuple2DataStreamSource = env.fromElements(
                Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),
                Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),
                Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),
                Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L)
        );

        tuple2DataStreamSource.flatMap(new ThresholdOperatorWarning(100L, 3))
                .printToErr();

        env.execute("managed Operator State");

    }

}

  當前並行度為1,結果如下,數據沒有按key統計,而是按照里邊的值進行統計,符合我們的要求。因為是同一個Operator,所以hashcode是一樣的。

(1629838640閾值報警! ,[(a,400), (a,100), (a,200)])
(1629838640閾值報警! ,[(a,200), (b,100), (b,200)])
(1629838640閾值報警! ,[(b,200), (b,500), (b,600)])

  接下來將並行度設置為2,結果如下。我們看一下main里邊的數據,符合大於等於100的數據一共有10個,那么兩個不同的operator分配的時候這10數據的時候,一個operator分5個,那么滿足超過3個的時候才收集並輸出。因為5個里邊只有一組3個滿足,2個不滿足所以不會輸出,所以符合我們的預期。

1> (475161679閾值報警! ,[(a,100), (a,200), (b,200)])
2> (1633355453閾值報警! ,[(a,400), (a,200), (b,100)])
  • 檢查點的機制和配置
    • 檢查點的機制

      上邊我們程序里邊設置了檢查點,檢查點是當數據進行處理的時候將數據的狀態進行記錄,當程序出現問題的時候方便恢復。

     可以像這樣的情況:  數據源——>  123456789|12345678| 12341234|——>sink。|即檢查點,是一個checkpoint barrier,當算子運行計算的時候會把當前的狀態進行記錄,比如讀取Kafka的數據,假如讀取到offset=6868,然后將這個值進行了記錄, 當這時有機器出現了問題,程序需要進行恢復並執行,那么需要重新讀取這條數據再計算。引用一張圖片可以有更清楚的認識。

  

 

 

    •  檢查點的配置

      默認情況下,檢查點是關閉着的,我們需要明確開啟。其他的一些配置可參考如下內容:

     //開啟檢查點
        env.enableCheckpointing(1000L);
        // 其他可選配置如下:

        // 設置語義,默認是EXACTLY_ONCE
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 設置檢查點之間最小停頓時間
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 設置執行Checkpoint操作時的超時時間
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 設置最大並發執行的檢查點的數量
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 將檢查點持久化到外部存儲
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    • 檢查點手工保存

       flink支持手工將檢查點的狀態存儲到外部,也可以指定存儲到HDFS文件。存儲到外邊是為了程序出現了問題時進行恢復,比如OOM問題。程序升級和重啟時也需要重新從檢查點進行恢復。

# 觸發指定id的作業的Savepoint,並將結果存儲到指定目錄下
bin/flink savepoint :jobId [:targetDirectory]
  •  狀態的存儲

    Keyed State和Operator State會存儲在內存中,因為數據是持續不斷的輸入的,當數據量非常大的時候,內存會出現不足的情況,那么我們也是需要將當前的狀態進行保存的。官方稱為狀態后端。

    • Flink的狀態保存支持3種方式

      MemoryStateBackend,這種方式是將數據存儲在JVM中,這種方式是用於開發。

      FsStateBackend, 即以文件的形式存儲到磁盤中,可以是HDFS或本地文件。當JobManger把任務發送給Taskmanger進行計算,此時數據會在JVM中,當觸發了checkpoint后才會將數據存儲到文件中。

      RocksDBStateBackend,這種形式是介於前邊兩種的情況,這個是將狀態數據到KV數據庫中,當觸發狀態的時候會將數據再持久化到文件中。這樣即提高了速度,空間也變得更大了。

 

    • 狀態存儲配置

      默認情況是MemoryStateBackend,即內存中。

      剩下兩種的配置如下,這種方式只對當前Job有效。RocksDB配置的話需要額外引用一下包。

// 配置 FsStateBackend
env.setStateBackend(new FsStateBackend("hdfs://namenode:60060/flink/checkpoints"));
// 配置 RocksDBStateBackend
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:60060/flink/checkpoints"))

      通過修改flink-yaml.conf可以對該集群所有作業生效。

state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:60060/flink/checkpoints

 

     

MemoryStateBackend


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM