本文基於 flink 1.12.0
之前嘗試了一下 flink sql 的 去重和Top n 功能,只是簡單的看了下官網,然后用 sql 實現了功能,但是還有些疑問沒有解決。比如:不使用 mini-batch 模式,去重的結果很單一,降序就只輸出第一條數據(升序就一直輸出最后一條)
為了解決這些疑問,特意研究了下去重部分的源碼類結構圖如下:
去重基類
DeduplicateFunctionBase 定義了去重的狀態,由於是去重,所以只需要一個 ValueState 存儲一個 Row 的數據就可以了(不管是處理時間還是事件時間,數據上都有)
// state stores previous message under the key. 基於key 的去重狀態 protected ValueState<T> state; public DeduplicateFunctionBase( TypeInformation<T> typeInfo, TypeSerializer<OUT> serializer, long stateRetentionTime) { this.typeInfo = typeInfo; // 狀態保留時間,決定去重的數據的作用范圍 this.stateRetentionTime = stateRetentionTime; this.serializer = serializer; } @Override public void open(Configuration configure) throws Exception { super.open(configure); ValueStateDescriptor<T> stateDesc = new ValueStateDescriptor<>("deduplicate-state", typeInfo); // 設置去重狀態的 ttl(這個很重要) StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime); // 如果 ttl 是開啟的 if (ttlConfig.isEnabled()) { stateDesc.enableTimeToLive(ttlConfig); } // 創建去重狀態 state = getRuntimeContext().getState(stateDesc); }
處理時間的 First Row
ROW_NUMBER() OVER (PARTITION BY category_id ORDER BY process_time asc) AS rownum
即取基於處理時間的第一條數據
處理類為:ProcTimeDeduplicateKeepFirstRowFunction
處理時間的邏輯判斷基於處理時間特性,后一條一定比前一條大這個邏輯,直接判斷去重 state.value 是否為空,為空則表示是第一條數據,輸出;不為空則前面有數據,不是第一條,不輸出
public class ProcTimeDeduplicateKeepFirstRowFunction extends DeduplicateFunctionBase<Boolean, RowData, RowData, RowData> { private static final long serialVersionUID = 5865777137707602549L; // state stores a boolean flag to indicate whether key appears before. public ProcTimeDeduplicateKeepFirstRowFunction(long stateRetentionTime) { super(Types.BOOLEAN, null, stateRetentionTime); } @Override public void processElement(RowData input, Context ctx, Collector<RowData> out) throws Exception { // 調用處理時間的判斷方法: DeduplicateFunctionHelper.processFirstRowOnProcTime processFirstRowOnProcTime(input, state, out); } }
DeduplicateFunctionHelper.processFirstRowOnProcTime
static void processFirstRowOnProcTime( RowData currentRow, ValueState<Boolean> state, Collector<RowData> out) throws Exception { // 檢查當前行是 insert only 的,不然抱錯 checkInsertOnly(currentRow); // ignore record if it is not first row // 狀態不為為空,說明不是處理時間的第一條,不輸出,返回 if (state.value() != null) { return; } // 第一條添加狀態 state.update(true); // emit the first row which is INSERT message // 輸出第一條數據 out.collect(currentRow); }
處理時間的 Last Row
ROW_NUMBER() OVER (PARTITION BY category_id ORDER BY process_time desc) AS rownum
即取基於處理時間的最后一條數據
處理時間的邏輯基於處理時間特性,后一條一定比前一條大這個邏輯,直接判斷去重 state.value 是否為空,為空則表示是第一條數據,直接輸出,不為空則前面有數據,判斷是否更新上一條數據,並輸出當前數據;
處理類為:ProcTimeDeduplicateKeepFirstRowFunction, Last row 有點不同的是,如果接收的 cdc 源,是可以支持刪除前一條數據的(這里不討論)
public class ProcTimeDeduplicateKeepLastRowFunction extends DeduplicateFunctionBase<RowData, RowData, RowData, RowData> { private static final long serialVersionUID = -291348892087180350L; private final boolean generateUpdateBefore; private final boolean generateInsert; private final boolean inputIsInsertOnly; public ProcTimeDeduplicateKeepLastRowFunction( InternalTypeInfo<RowData> typeInfo, long stateRetentionTime, boolean generateUpdateBefore, boolean generateInsert, boolean inputInsertOnly) { super(typeInfo, null, stateRetentionTime); this.generateUpdateBefore = generateUpdateBefore; this.generateInsert = generateInsert; // StreamExecChangelogNormalize 處理的時候會設置為 false,StreamExecDeduplicate 設置為 true this.inputIsInsertOnly = inputInsertOnly; } @Override public void processElement(RowData input, Context ctx, Collector<RowData> out) throws Exception { // 判斷是否是 insert only 的 if (inputIsInsertOnly) { // 只 insert 的 DeduplicateFunctionHelper processLastRowOnProcTime(input, generateUpdateBefore, generateInsert, state, out); } else { // changlog 會發出刪除命令,刪除前一條數據 DeduplicateFunctionHelper processLastRowOnChangelog(input, generateUpdateBefore, state, out); } } }
DeduplicateFunctionHelper.processLastRowOnProcTime
/** * Processes element to deduplicate on keys with process time semantic, sends current element as last row, * retracts previous element if needed. * * @param currentRow latest row received by deduplicate function * @param generateUpdateBefore whether need to send UPDATE_BEFORE message for updates * @param state state of function, null if generateUpdateBefore is false * @param out underlying collector */ static void processLastRowOnProcTime( RowData currentRow, boolean generateUpdateBefore, boolean generateInsert, ValueState<RowData> state, Collector<RowData> out) throws Exception { // 檢測為只寫的 checkInsertOnly(currentRow); // 是否更新上一條數據,是否寫數據 if (generateUpdateBefore || generateInsert) { // use state to keep the previous row content if we need to generate UPDATE_BEFORE // or use to distinguish the first row, if we need to generate INSERT // 取去重狀態數據 RowData preRow = state.value(); state.update(currentRow); // 沒有上一條,直接輸出當前這條 if (preRow == null) { // the first row, send INSERT message 輸出第一條數據是 INSERT currentRow.setRowKind(RowKind.INSERT); out.collect(currentRow); } else { // 如果存在上一條數據,配置為更新上一條,會輸出上一條數據(方便下游可以更新就的數據) if (generateUpdateBefore) { preRow.setRowKind(RowKind.UPDATE_BEFORE); out.collect(preRow); } // 再輸出當前數據 currentRow.setRowKind(RowKind.UPDATE_AFTER); out.collect(currentRow); } } else { // 如果不更新上一條,不是 insert,就輸出一個 更新 // always send UPDATE_AFTER if INSERT is not needed currentRow.setRowKind(RowKind.UPDATE_AFTER); out.collect(currentRow); } }
事件時間的去重
事件時間的代碼和處理時間的代碼不同,將取第一條和最后一條合並在了一起,用了個 boolean 值的變量 “keepLastRow” 標識
事件時間去重類
public class RowTimeDeduplicateFunction extends DeduplicateFunctionBase<RowData, RowData, RowData, RowData> { private static final long serialVersionUID = 1L; private final boolean generateUpdateBefore; private final boolean generateInsert; private final int rowtimeIndex; private final boolean keepLastRow; public RowTimeDeduplicateFunction( InternalTypeInfo<RowData> typeInfo, long minRetentionTime, int rowtimeIndex, boolean generateUpdateBefore, boolean generateInsert, boolean keepLastRow) { super(typeInfo, null, minRetentionTime); // 是否更新前一條 this.generateUpdateBefore = generateUpdateBefore; // 是否是 INSERT this.generateInsert = generateInsert; // 事件時間列的 index this.rowtimeIndex = rowtimeIndex; // 保留第一條還是最后一條 this.keepLastRow = keepLastRow; } @Override public void processElement(RowData input, Context ctx, Collector<RowData> out) throws Exception { deduplicateOnRowTime( state, input, out, generateUpdateBefore, generateInsert, rowtimeIndex, keepLastRow); } /** * Processes element to deduplicate on keys with row time semantic, sends current element if it is last * or first row, retracts previous element if needed. * * @param state state of function * @param currentRow latest row received by deduplicate function * @param out underlying collector * @param generateUpdateBefore flag to generate UPDATE_BEFORE message or not * @param generateInsert flag to gennerate INSERT message or not * @param rowtimeIndex the index of rowtime field * @param keepLastRow flag to keep last row or keep first row */ public static void deduplicateOnRowTime( ValueState<RowData> state, RowData currentRow, Collector<RowData> out, boolean generateUpdateBefore, boolean generateInsert, int rowtimeIndex, boolean keepLastRow) throws Exception { checkInsertOnly(currentRow); RowData preRow = state.value(); if (isDuplicate(preRow, currentRow, rowtimeIndex, keepLastRow)) { // 不是重復的,判斷更新重復數據 updateDeduplicateResult( generateUpdateBefore, generateInsert, preRow, currentRow, out); // 將當前數據寫到狀態中 state.update(currentRow); } } }
事件時間判斷重復方法
static boolean isDuplicate(RowData preRow, RowData currentRow, int rowtimeIndex, boolean keepLastRow) { if (keepLastRow) { // 保留最后一條: 去重狀態為 null, 上一條數據時間 <= 當前數據的 時間 return preRow == null || getRowtime(preRow, rowtimeIndex) <= getRowtime(currentRow, rowtimeIndex); } else { // 保留第一條: 去重狀態為 null, 當前數據時間 < 上一條數據的 時間 return preRow == null || getRowtime(currentRow, rowtimeIndex) < getRowtime(preRow, rowtimeIndex); } } // 只反序列化 事件時間列 private static long getRowtime(RowData input, int rowtimeIndex) { return input.getLong(rowtimeIndex); }
DeduplicateFunctionHelper.updateDeduplicateResult
static void updateDeduplicateResult( boolean generateUpdateBefore, boolean generateInsert, RowData preRow, RowData currentRow, Collector<RowData> out) { // 更新前面的一條 或 是 INSERT if (generateUpdateBefore || generateInsert) { // 前一條數據為 null if (preRow == null) { // the first row, send INSERT message 直接輸出 INSERT currentRow.setRowKind(RowKind.INSERT); out.collect(currentRow); } else { // 如果要更新上一條數據 if (generateUpdateBefore) { final RowKind preRowKind = preRow.getRowKind(); // 上一條數據的狀態設為 UPDATE_BEFORE preRow.setRowKind(RowKind.UPDATE_BEFORE); out.collect(preRow); preRow.setRowKind(preRowKind); } // 輸出當前數據 狀態: UPDATE_AFTER currentRow.setRowKind(RowKind.UPDATE_AFTER); out.collect(currentRow); } } else { // 輸出當前數據 狀態: UPDATE_AFTER currentRow.setRowKind(RowKind.UPDATE_AFTER); out.collect(currentRow); } }
從代碼可以清楚的看到 去重的邏輯,需要注意的是去重狀態是有有 ttl 的,ttl 的默認時間是 36000 s,所以默認情況下,取第一條的情況下,在狀態還沒過期的情況下,只會在啟動的時候輸出一條數據(這時候會給人一種是基於全局去重的錯覺)。
調整狀態的時間可以設置參數: table.exec.state.ttl=60s 參見代碼: DeduplicateFunctionBase 成員變量 stateRetentionTime
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文