Flink udf的小問題:無參數的udf函數會被優化成常量表達式


這兩天有一個使用flink sql的實時流項目,需要在每個結果中標記當前時間。因為flink沒有獲取當前時間的函數,於是我自己定義了一個udf:

import org.apache.flink.table.functions.ScalarFunction;

public class CurrentUnixTimeFunction extends ScalarFunction {
    public Long eval() {
        return System.currentTimeMillis() / 1000;
    }
}

然后注冊成currentUnixTime()進行使用:

tableEnv.registerFunction("currentUnixTime", new CurrentUnixTimeFunction());

本來以為就是這么簡單的事情,結果看實時流數據的時候發現,這個時間一直都是任務啟動時間,有點摸不着頭腦。於是開啟了研究,以下為內容轉自:https://www.cnblogs.com/Springmoon-venn/p/12954824.html

在使用 Flink 1.10 的 SQL 的時候,遇到個小問題: 一個返回當前時間的函數返回的結果是啟動的時間,並且保持不變。

比如下面這個UDF,獲取當前時間的 時分秒(HH:mm:ss 格式)

 1 import org.apache.flink.api.common.typeinfo.TypeInformation;
 2 import org.apache.flink.api.common.typeinfo.Types;
 3 import org.apache.flink.table.functions.ScalarFunction;
 4 
 5 import java.text.SimpleDateFormat;
 6 
 7 /**
 8  * Sysdate 返回當前時間的 HH:mm:ss 格式的字符串
 9  */
10 public class Systime extends ScalarFunction {
11 
12     private static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
13 
14     public Systime() {
15     }
16 
17     public String eval() throws Exception {
18         // 當前時間的毫秒值,轉為 HH:mm:ss 
19         return sdf.format(System.currentTimeMillis());
20     }
21 
22 
23     public TypeInformation<?> getResultType(Class<?>[] signature) {
24         return Types.STRING;
25     }
26 }

最開始用的時候,直接寫了這么個UDF,還自我感覺很溜。。。

在測試的時候,也沒有留意到返回的值都是相同的,在線上跑了幾天才發現。。。

如sql :

INSERT INTO user_log_sink
SELECT user_id, systime(), item_id
FROM user_log a;

返回值如下:

全是一樣的

在代碼里面添加 日志信息,只有在啟動的時候輸出了一次日志信息(在客戶端),在 taskmanager.log 里沒有對應日志,說明確實沒有執行

對比其他的UDF 除了沒有參數,其他好像沒有什么不一樣,都是繼承的 ScalarFunction 、也都在客戶端注冊。。

在其他UDF 中也添加日志信息,測試發現:     

都是在注冊的時候,調用了對應的構造方法,但是無參的 UDF,在任務初始化階段,還調用了一次,而需要參數的UDF 並沒有;數據進入的時候,無參的UDF 並沒有調用 eval 方法,有參的UDF 就正常調用了。

直接debug 無參的UDF,從日志中發現除了在注冊的時候 執行了 構造方法,還調用了有一次  eval 方法,所以在 eval 中添加斷點:

 於是就找到這個類:

 1 package org.apache.flink.table.planner.codegen
 2 /**
 3   * Evaluates constant expressions with code generator.   計算常量表達式
 4   *
 5   * @param allowChangeNullability If the reduced expr's nullability can be changed, e.g. a null
 6   *                               literal is definitely nullable and the other literals are
 7   *                               not null.
 8   */
 9 class ExpressionReducer(
10     config: TableConfig,
11     allowChangeNullability: Boolean = false)
12   extends RexExecutor

從注釋就可以看出,Flink 在計算常量表達式,而我的 systime() 就被認為是常量表達式了,在客戶端執行一次,得到結果,之后的函數,直接使用對應常量,而不再進函數計算返回了。

 隨后的獲取到的SQL輸出結果:

 看到這里,問題就很清楚了,一些常量的表達式,Flink 在客戶端初始化的時候,直接執行一次,緩存了結果,之后就直接返回這個結果,而不是去執行表達式

 解決也很簡單,直接給 UDF 添加一個參數(注:必須是SQL的字段,如果是常量也會被Flink 優化)

public String eval(String input) throws Exception {
        // 當前時間的毫秒值,轉為 HH:mm:ss
        return sdf.format(System.currentTimeMillis());
    }

使用的SQL:

INSERT INTO user_log_sink
SELECT user_id, systime(), systime_param(user_id)
FROM user_log a;

systime_param 並不會再初始化的時候執行

返回結果如下:

 搞定


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM