【源碼】Flink sql 流式去重源碼解析


本文基於 flink 1.12.0

之前嘗試了一下 flink sql 的 去重和Top n 功能,只是簡單的看了下官網,然后用 sql 實現了功能,但是還有些疑問沒有解決。比如:不使用 mini-batch 模式,去重的結果很單一,降序就只輸出第一條數據(升序就一直輸出最后一條)

為了解決這些疑問,特意研究了下去重部分的源碼類結構圖如下:

 

 

去重基類

DeduplicateFunctionBase 定義了去重的狀態,由於是去重,所以只需要一個 ValueState 存儲一個 Row 的數據就可以了(不管是處理時間還是事件時間,數據上都有)

// state stores previous message under the key. 基於key 的去重狀態
protected ValueState<T> state;

public DeduplicateFunctionBase(
        TypeInformation<T> typeInfo,
        TypeSerializer<OUT> serializer,
        long stateRetentionTime) {
    this.typeInfo = typeInfo;
    // 狀態保留時間,決定去重的數據的作用范圍
    this.stateRetentionTime = stateRetentionTime; 
    this.serializer = serializer;
}

@Override
public void open(Configuration configure) throws Exception {
    super.open(configure);
    ValueStateDescriptor<T> stateDesc = new ValueStateDescriptor<>("deduplicate-state", typeInfo);
    // 設置去重狀態的 ttl(這個很重要)
    StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime);
    // 如果 ttl 是開啟的
    if (ttlConfig.isEnabled()) {
        stateDesc.enableTimeToLive(ttlConfig);
    }
    // 創建去重狀態
    state = getRuntimeContext().getState(stateDesc);
}

處理時間的 First Row

ROW_NUMBER() OVER (PARTITION BY category_id ORDER BY process_time asc) AS rownum

即取基於處理時間的第一條數據

處理類為:ProcTimeDeduplicateKeepFirstRowFunction

處理時間的邏輯判斷基於處理時間特性,后一條一定比前一條大這個邏輯,直接判斷去重 state.value 是否為空,為空則表示是第一條數據,輸出;不為空則前面有數據,不是第一條,不輸出

public class ProcTimeDeduplicateKeepFirstRowFunction
        extends DeduplicateFunctionBase<Boolean, RowData, RowData, RowData> {

    private static final long serialVersionUID = 5865777137707602549L;

    // state stores a boolean flag to indicate whether key appears before.
    public ProcTimeDeduplicateKeepFirstRowFunction(long stateRetentionTime) {
        super(Types.BOOLEAN, null, stateRetentionTime);
    }

    @Override
    public void processElement(RowData input, Context ctx, Collector<RowData> out) throws Exception {
        // 調用處理時間的判斷方法: DeduplicateFunctionHelper.processFirstRowOnProcTime
        processFirstRowOnProcTime(input, state, out);
    }
}

DeduplicateFunctionHelper.processFirstRowOnProcTime

static void processFirstRowOnProcTime(
            RowData currentRow,
            ValueState<Boolean> state,
            Collector<RowData> out) throws Exception {

        // 檢查當前行是 insert only 的,不然抱錯
        checkInsertOnly(currentRow);
        // ignore record if it is not first row
        // 狀態不為為空,說明不是處理時間的第一條,不輸出,返回
        if (state.value() != null) {
            return;
        }
        // 第一條添加狀態
        state.update(true);
        // emit the first row which is INSERT message
        // 輸出第一條數據
        out.collect(currentRow);
    }

處理時間的 Last Row

ROW_NUMBER() OVER (PARTITION BY category_id ORDER BY process_time desc) AS rownum

即取基於處理時間的最后一條數據

處理時間的邏輯基於處理時間特性,后一條一定比前一條大這個邏輯,直接判斷去重 state.value 是否為空,為空則表示是第一條數據,直接輸出,不為空則前面有數據,判斷是否更新上一條數據,並輸出當前數據;


處理類為:ProcTimeDeduplicateKeepFirstRowFunction, Last row 有點不同的是,如果接收的 cdc 源,是可以支持刪除前一條數據的(這里不討論)

public class ProcTimeDeduplicateKeepLastRowFunction
        extends DeduplicateFunctionBase<RowData, RowData, RowData, RowData> {

    private static final long serialVersionUID = -291348892087180350L;
    private final boolean generateUpdateBefore;
    private final boolean generateInsert;
    private final boolean inputIsInsertOnly;

    public ProcTimeDeduplicateKeepLastRowFunction(
            InternalTypeInfo<RowData> typeInfo,
            long stateRetentionTime,
            boolean generateUpdateBefore,
            boolean generateInsert,
            boolean inputInsertOnly) {
        super(typeInfo, null, stateRetentionTime);
        this.generateUpdateBefore = generateUpdateBefore;
        this.generateInsert = generateInsert;
        // StreamExecChangelogNormalize 處理的時候會設置為 false,StreamExecDeduplicate 設置為 true
        this.inputIsInsertOnly = inputInsertOnly;
    }

    @Override
    public void processElement(RowData input, Context ctx, Collector<RowData> out) throws Exception {
        // 判斷是否是 insert only 的
        if (inputIsInsertOnly) {
            // 只 insert 的 DeduplicateFunctionHelper
            processLastRowOnProcTime(input, generateUpdateBefore, generateInsert, state, out);
        } else {
            // changlog 會發出刪除命令,刪除前一條數據  DeduplicateFunctionHelper
            processLastRowOnChangelog(input, generateUpdateBefore, state, out);
        }
    }
}

DeduplicateFunctionHelper.processLastRowOnProcTime

/**
 * Processes element to deduplicate on keys with process time semantic, sends current element as last row,
 * retracts previous element if needed.
 *
 * @param currentRow latest row received by deduplicate function
 * @param generateUpdateBefore whether need to send UPDATE_BEFORE message for updates
 * @param state state of function, null if generateUpdateBefore is false
 * @param out underlying collector
 */
static void processLastRowOnProcTime(
        RowData currentRow,
        boolean generateUpdateBefore,
        boolean generateInsert,
        ValueState<RowData> state,
        Collector<RowData> out) throws Exception {

    // 檢測為只寫的
    checkInsertOnly(currentRow);
    // 是否更新上一條數據,是否寫數據
    if (generateUpdateBefore || generateInsert) {
        // use state to keep the previous row content if we need to generate UPDATE_BEFORE
        // or use to distinguish the first row, if we need to generate INSERT
        // 取去重狀態數據
        RowData preRow = state.value();
        state.update(currentRow);
        // 沒有上一條,直接輸出當前這條
        if (preRow == null) {
            // the first row, send INSERT message 輸出第一條數據是 INSERT
            currentRow.setRowKind(RowKind.INSERT);
            out.collect(currentRow);
        } else {
            // 如果存在上一條數據,配置為更新上一條,會輸出上一條數據(方便下游可以更新就的數據)
            if (generateUpdateBefore) {
                preRow.setRowKind(RowKind.UPDATE_BEFORE);
                out.collect(preRow);
            }
            // 再輸出當前數據
            currentRow.setRowKind(RowKind.UPDATE_AFTER);
            out.collect(currentRow);
        }
    } else {
        // 如果不更新上一條,不是 insert,就輸出一個 更新
        // always send UPDATE_AFTER if INSERT is not needed
        currentRow.setRowKind(RowKind.UPDATE_AFTER);
        out.collect(currentRow);
    }
}

事件時間的去重

事件時間的代碼和處理時間的代碼不同,將取第一條和最后一條合並在了一起,用了個 boolean 值的變量 “keepLastRow” 標識

事件時間去重類

public class RowTimeDeduplicateFunction
        extends DeduplicateFunctionBase<RowData, RowData, RowData, RowData> {

    private static final long serialVersionUID = 1L;

    private final boolean generateUpdateBefore;
    private final boolean generateInsert;
    private final int rowtimeIndex;
    private final boolean keepLastRow;

    public RowTimeDeduplicateFunction(
            InternalTypeInfo<RowData> typeInfo,
            long minRetentionTime,
            int rowtimeIndex,
            boolean generateUpdateBefore,
            boolean generateInsert,
            boolean keepLastRow) {
        super(typeInfo, null, minRetentionTime);
        // 是否更新前一條
        this.generateUpdateBefore = generateUpdateBefore;
        // 是否是 INSERT
        this.generateInsert = generateInsert;
        // 事件時間列的 index
        this.rowtimeIndex = rowtimeIndex;
        // 保留第一條還是最后一條
        this.keepLastRow = keepLastRow;
    }

    @Override
    public void processElement(RowData input, Context ctx, Collector<RowData> out) throws Exception {
        deduplicateOnRowTime(
                state,
                input,
                out,
                generateUpdateBefore,
                generateInsert,
                rowtimeIndex,
                keepLastRow);
    }

    /**
     * Processes element to deduplicate on keys with row time semantic, sends current element if it is last
     * or first row, retracts previous element if needed.
     *
     * @param state                 state of function
     * @param currentRow            latest row received by deduplicate function
     * @param out                   underlying collector
     * @param generateUpdateBefore  flag to generate UPDATE_BEFORE message or not
     * @param generateInsert        flag to gennerate INSERT message or not
     * @param rowtimeIndex          the index of rowtime field
     * @param keepLastRow            flag to keep last row or keep first row
     */
    public static void deduplicateOnRowTime(
            ValueState<RowData> state,
            RowData currentRow,
            Collector<RowData> out,
            boolean generateUpdateBefore,
            boolean generateInsert,
            int rowtimeIndex,
            boolean keepLastRow) throws Exception {
        checkInsertOnly(currentRow);
        RowData preRow = state.value();

        if (isDuplicate(preRow, currentRow, rowtimeIndex, keepLastRow)) {
            // 不是重復的,判斷更新重復數據
            updateDeduplicateResult(
                    generateUpdateBefore,
                    generateInsert,
                    preRow,
                    currentRow,
                    out);
            // 將當前數據寫到狀態中
            state.update(currentRow);
        }
    }
}

事件時間判斷重復方法

static boolean isDuplicate(RowData preRow, RowData currentRow, int rowtimeIndex, boolean keepLastRow) {
    if (keepLastRow) {
        // 保留最后一條: 去重狀態為 null, 上一條數據時間 <= 當前數據的 時間 
        return preRow == null || getRowtime(preRow, rowtimeIndex) <= getRowtime(currentRow, rowtimeIndex);
    } else {
        // 保留第一條: 去重狀態為 null, 當前數據時間 < 上一條數據的 時間 
        return preRow == null || getRowtime(currentRow, rowtimeIndex) < getRowtime(preRow, rowtimeIndex);
    }
}
// 只反序列化 事件時間列
private static long getRowtime(RowData input, int rowtimeIndex) {
        return input.getLong(rowtimeIndex);
    }

DeduplicateFunctionHelper.updateDeduplicateResult

static void updateDeduplicateResult(
            boolean generateUpdateBefore,
            boolean generateInsert,
            RowData preRow,
            RowData currentRow,
            Collector<RowData> out) {

        // 更新前面的一條 或 是 INSERT
        if (generateUpdateBefore || generateInsert) {
            // 前一條數據為 null
            if (preRow == null) {
                // the first row, send INSERT message 直接輸出 INSERT
                currentRow.setRowKind(RowKind.INSERT);
                out.collect(currentRow);
            } else {
                // 如果要更新上一條數據
                if (generateUpdateBefore) {
                    final RowKind preRowKind = preRow.getRowKind();
                    // 上一條數據的狀態設為 UPDATE_BEFORE
                    preRow.setRowKind(RowKind.UPDATE_BEFORE);
                    out.collect(preRow);
                    preRow.setRowKind(preRowKind);
                }
                // 輸出當前數據 狀態: UPDATE_AFTER
                currentRow.setRowKind(RowKind.UPDATE_AFTER);
                out.collect(currentRow);
            }
        } else {
            // 輸出當前數據 狀態: UPDATE_AFTER
            currentRow.setRowKind(RowKind.UPDATE_AFTER);
            out.collect(currentRow);
        }
    }

從代碼可以清楚的看到 去重的邏輯,需要注意的是去重狀態是有有 ttl 的,ttl 的默認時間是 36000 s,所以默認情況下,取第一條的情況下,在狀態還沒過期的情況下,只會在啟動的時候輸出一條數據(這時候會給人一種是基於全局去重的錯覺)。
調整狀態的時間可以設置參數: table.exec.state.ttl=60s 參見代碼: DeduplicateFunctionBase 成員變量 stateRetentionTime

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM