Flink快速構建項目quickstart - 自定義UDF函數


官方推薦兩種構建方式,第一

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java  -DarchetypeVersion=1.12.0

創建flink的UDF需要Flink-table的jar包,ScalarFunction在table的依賴中

通過quickstart構建后,在pom.xml中加入依賴

        <dependency>
            <groupId>org.apache.flink</groupId>
            <!--<artifactId>flink-table_${scala.binary.version}</artifactId>-->
            <artifactId>flink-table_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

創建java類

 

package department.jr;

import org.apache.flink.table.functions.ScalarFunction;

public class HashCode extends ScalarFunction {

    /**
     * udf的這個方法必須是public的
     * @param s
     * @return
     */
    public int eval(String s){
        return s.hashCode();
    }
    
//    public static void main(String[] args) {
//        HashCode hashCode = new HashCode();
//        System.out.println(hashCode.eval("abc"));
//    }
}

 

 

package department.jr;

import org.apache.flink.table.functions.ScalarFunction;

/**
 * 給一個數值乘以一個因子
 */
public class Multiply extends ScalarFunction {
    public Integer factor;

    public Multiply() {
    }
    public Multiply(int factor) {
        this.factor = factor;
    }
    public int eval(Integer s){
        return s * factor;
    }
}

 

 

 

 

運行一下試試,發現報錯

java.lang.NoClassDefFoundError: org/apache/flink/api/common/functions/InvalidTypesException
    at java.lang.Class.getDeclaredMethods0(Native Method)
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
    at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
    at java.lang.Class.getMethod0(Class.java:3018)
    at java.lang.Class.getMethod(Class.java:1784)
    at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
    at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.functions.InvalidTypesException
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 7 more
Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" 
Process finished with exit code 1

原因:pom.xml中的核心包都是<provided>的,調試時候注釋掉<provieded>,采用默認值。

成功!

注釋掉main方法,打包UDF類:

jr-flinkudf-0.1.jar

 上傳 jr-flinkudf-0.1.jar 到Flink的lib目錄下。

修改 Flink的conf的目錄下的 sql-client-defaults.yaml 文件

 

 啟動Flink-sql-client

./sql-client.sh embedded

發現報錯:

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.
        at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)
        at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)
Caused by: org.apache.flink.table.api.ValidationException: Cannot find a public constructor with parameter types 'java.lang.Integer' for 'department.jr.Multiply'.
        at org.apache.flink.table.functions.FunctionService.generateInstance(FunctionService.java:205)
        at org.apache.flink.table.functions.FunctionService.createFunction(FunctionService.java:115)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$registerFunctions$10(ExecutionContext.java:704)
        at java.util.HashMap.forEach(HashMap.java:1288)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.registerFunctions(ExecutionContext.java:703)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:185)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:138)
        at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867)
        ... 3 more

原因:Multiply 類的構造器是int類型的參數,但是FlinkSQL只允許是Interger類型,見官方文檔

ref: https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/sqlClient.html

 

修改 Multiply 類的構造器 參數為Integer,重新打包上傳。

成功啟動sql-client端。使用兩個自定義UDF函數,沒問題!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM