Flink MapState過期時間設置


一、業務背景

實時統計每天考勤人數

使用MapState<Srting, Set>

key:日期字符串 -> yyyyMMdd

value:當天考勤員工ID,利用Set自動去重的特性統計當前考勤人數

狀態里只需要存儲當天的數據,之前的數據可以清理掉。設置狀態過期時間24小時,距離數據上一次修改超過24小時,該數據會被清理。

// 設置狀態過期配置
StateTtlConfig ttlConfig = StateTtlConfig
        .newBuilder(Time.hours(24))
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
        .build();
MapStateDescriptor<String, Set> mapStateDescriptor = new MapStateDescriptor<>("MapStateDescriptor", String.class, Set.class);
// 狀態過期配置與壯狀態綁定
mapStateDescriptor.enableTimeToLive(ttlConfig);
attendanceUserIdState = getRuntimeContext().getMapState(mapStateDescriptor);

二、狀態過期機制測試

每隔1秒發送一條數據,狀態有效期設置為3秒。打印狀態里的數據

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStreamSource<String> stringDataStreamSource = env.addSource(new SourceFunction<String>() {
        @Override
        public void run(SourceContext<String> sourceContext) throws Exception {
            while (true) {
                TimeUnit.SECONDS.sleep(1);
                sourceContext.collect(LocalDateTime.now().toString());
            }
        }

        @Override
        public void cancel() {

        }
    });

    SingleOutputStreamOperator<Tuple2<String, Integer>> keyStream = stringDataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public Tuple2 map(String s) {
            return Tuple2.of(s, 1);
        }
    });

    KeyedStream<Tuple2<String, Integer>, Tuple> tuple2TupleKeyedStream = keyStream.keyBy(1);

    SingleOutputStreamOperator<String> map = tuple2TupleKeyedStream.map(new RichMapFunction<Tuple2<String, Integer>, String>() {
        transient MapState<String, Object> state;

        @Override
        public void open(Configuration parameters) throws Exception {
            StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(3))
                    .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
                    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                    .build();

            MapStateDescriptor<String, Object> mapStateDescriptor = new MapStateDescriptor<>("mapStateDescriptor", String.class, Object.class);
            mapStateDescriptor.enableTimeToLive(ttlConfig);
            state = getRuntimeContext().getMapState(mapStateDescriptor);
        }

        @Override
        public String map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
            state.put(stringIntegerTuple2.f0, new Object());
            Iterator<Map.Entry<String, Object>> iterator = state.iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, Object> next = iterator.next();
                String key = next.getKey();
                System.out.println(key);
            }
            return "====================" + LocalDateTime.now();
        }
    });
    map.print();

    env.execute("StateDemo");
}

運行結果

2021-08-16T19:52:29.126
9> ====================2021-08-16T19:52:29.183
2021-08-16T19:52:29.126
2021-08-16T19:52:30.136
9> ====================2021-08-16T19:52:30.211
2021-08-16T19:52:29.126
2021-08-16T19:52:30.136
2021-08-16T19:52:31.141
9> ====================2021-08-16T19:52:31.242
2021-08-16T19:52:30.136
2021-08-16T19:52:31.141
2021-08-16T19:52:32.144
9> ====================2021-08-16T19:52:32.330
2021-08-16T19:52:31.141
2021-08-16T19:52:33.148
2021-08-16T19:52:32.144
9> ====================2021-08-16T19:52:33.355

三、知識點

報錯:

Queryable state is currently not supported with TTL

檢查UpdateType是否設置了Disabled。

狀態中的過期數據如何被清理

默認情況下,只有在明確讀出過期值時才會刪除過期值,例如通過調用ValueState.value().

注意:這意味着默認情況下,如果未讀取過期狀態的數據,則不會刪除它,可能會導致狀態不斷增長。

此外,還可以添加數據清理的策略,默認的和添加的策略都會生效。

1.cleanupFullSnapshot

在創建checkPoint或savepoint的完整快照時,不會包含狀態中過期的數據。

該配置還是不會影響本地狀態存儲的大小,但是整個作業的完整快照會減少。只有當用戶從快照重新加載到本地時,才會清除用戶本地的狀態。

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(3))
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
        .cleanupFullSnapshot()
        .build();

2.cleanupIncrementally

只適用於對內存狀態后端(FsStateBackend和MemoryStateBackend)。

在所有的狀態上維護一個去全局的惰性迭代器。某些事件(例如狀態訪問)會觸發清理。每次觸發清理時,迭代器會向前遍歷刪除已過期數據。

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(3))
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
        .cleanupIncrementally(10, false)
        .build();

參數1:每次觸發清理時要檢查的數據數量。

參數2:標志位,用於表示是否每條記錄處理之后還出發清除邏輯。

3.cleanupInRocksdbCompactFilter

僅適用於RocksDB狀態后端

RocksDB會定期運行一步的壓縮流程來合並數據,該過濾器適用生存時間檢查狀態條目的過期時間戳,並丟棄所有的過期值。

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(3))
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
        .cleanupInRocksdbCompactFilter(100L)
        .build();


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM