一、業務背景
實時統計每天考勤人數
使用MapState<Srting, Set>
key:日期字符串 -> yyyyMMdd
value:當天考勤員工ID,利用Set自動去重的特性統計當前考勤人數
狀態里只需要存儲當天的數據,之前的數據可以清理掉。設置狀態過期時間24小時,距離數據上一次修改超過24小時,該數據會被清理。
// 設置狀態過期配置
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(24))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
MapStateDescriptor<String, Set> mapStateDescriptor = new MapStateDescriptor<>("MapStateDescriptor", String.class, Set.class);
// 狀態過期配置與壯狀態綁定
mapStateDescriptor.enableTimeToLive(ttlConfig);
attendanceUserIdState = getRuntimeContext().getMapState(mapStateDescriptor);
二、狀態過期機制測試
每隔1秒發送一條數據,狀態有效期設置為3秒。打印狀態里的數據
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stringDataStreamSource = env.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
while (true) {
TimeUnit.SECONDS.sleep(1);
sourceContext.collect(LocalDateTime.now().toString());
}
}
@Override
public void cancel() {
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> keyStream = stringDataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2 map(String s) {
return Tuple2.of(s, 1);
}
});
KeyedStream<Tuple2<String, Integer>, Tuple> tuple2TupleKeyedStream = keyStream.keyBy(1);
SingleOutputStreamOperator<String> map = tuple2TupleKeyedStream.map(new RichMapFunction<Tuple2<String, Integer>, String>() {
transient MapState<String, Object> state;
@Override
public void open(Configuration parameters) throws Exception {
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(3))
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
MapStateDescriptor<String, Object> mapStateDescriptor = new MapStateDescriptor<>("mapStateDescriptor", String.class, Object.class);
mapStateDescriptor.enableTimeToLive(ttlConfig);
state = getRuntimeContext().getMapState(mapStateDescriptor);
}
@Override
public String map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
state.put(stringIntegerTuple2.f0, new Object());
Iterator<Map.Entry<String, Object>> iterator = state.iterator();
while (iterator.hasNext()) {
Map.Entry<String, Object> next = iterator.next();
String key = next.getKey();
System.out.println(key);
}
return "====================" + LocalDateTime.now();
}
});
map.print();
env.execute("StateDemo");
}
運行結果
2021-08-16T19:52:29.126
9> ====================2021-08-16T19:52:29.183
2021-08-16T19:52:29.126
2021-08-16T19:52:30.136
9> ====================2021-08-16T19:52:30.211
2021-08-16T19:52:29.126
2021-08-16T19:52:30.136
2021-08-16T19:52:31.141
9> ====================2021-08-16T19:52:31.242
2021-08-16T19:52:30.136
2021-08-16T19:52:31.141
2021-08-16T19:52:32.144
9> ====================2021-08-16T19:52:32.330
2021-08-16T19:52:31.141
2021-08-16T19:52:33.148
2021-08-16T19:52:32.144
9> ====================2021-08-16T19:52:33.355
三、知識點
報錯:
Queryable state is currently not supported with TTL
檢查UpdateType是否設置了Disabled。
狀態中的過期數據如何被清理
默認情況下,只有在明確讀出過期值時才會刪除過期值,例如通過調用ValueState.value().
注意:這意味着默認情況下,如果未讀取過期狀態的數據,則不會刪除它,可能會導致狀態不斷增長。
此外,還可以添加數據清理的策略,默認的和添加的策略都會生效。
1.cleanupFullSnapshot
在創建checkPoint或savepoint的完整快照時,不會包含狀態中過期的數據。
該配置還是不會影響本地狀態存儲的大小,但是整個作業的完整快照會減少。只有當用戶從快照重新加載到本地時,才會清除用戶本地的狀態。
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(3))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupFullSnapshot()
.build();
2.cleanupIncrementally
只適用於對內存狀態后端(FsStateBackend和MemoryStateBackend)。
在所有的狀態上維護一個去全局的惰性迭代器。某些事件(例如狀態訪問)會觸發清理。每次觸發清理時,迭代器會向前遍歷刪除已過期數據。
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(3))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupIncrementally(10, false)
.build();
參數1:每次觸發清理時要檢查的數據數量。
參數2:標志位,用於表示是否每條記錄處理之后還出發清除邏輯。
3.cleanupInRocksdbCompactFilter
僅適用於RocksDB狀態后端
RocksDB會定期運行一步的壓縮流程來合並數據,該過濾器適用生存時間檢查狀態條目的過期時間戳,並丟棄所有的過期值。
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(3))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(100L)
.build();
