官方推薦兩種構建方式,第一
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.12.0
創建flink的UDF需要Flink-table的jar包,ScalarFunction在table的依賴中
通過quickstart構建后,在pom.xml中加入依賴
<dependency> <groupId>org.apache.flink</groupId> <!--<artifactId>flink-table_${scala.binary.version}</artifactId>--> <artifactId>flink-table_2.11</artifactId> <version>${flink.version}</version> </dependency>
創建java類
package department.jr; import org.apache.flink.table.functions.ScalarFunction; public class HashCode extends ScalarFunction { /** * udf的這個方法必須是public的 * @param s * @return */ public int eval(String s){ return s.hashCode(); } // public static void main(String[] args) { // HashCode hashCode = new HashCode(); // System.out.println(hashCode.eval("abc")); // } }
package department.jr; import org.apache.flink.table.functions.ScalarFunction; /** * 給一個數值乘以一個因子 */ public class Multiply extends ScalarFunction { public Integer factor; public Multiply() { } public Multiply(int factor) { this.factor = factor; } public int eval(Integer s){ return s * factor; } }
運行一下試試,發現報錯
java.lang.NoClassDefFoundError: org/apache/flink/api/common/functions/InvalidTypesException at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) at java.lang.Class.privateGetMethodRecursive(Class.java:3048) at java.lang.Class.getMethod0(Class.java:3018) at java.lang.Class.getMethod(Class.java:1784) at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544) at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526) Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.functions.InvalidTypesException at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 7 more Error: A JNI error has occurred, please check your installation and try again Exception in thread "main" Process finished with exit code 1
原因:pom.xml中的核心包都是<provided>的,調試時候注釋掉<provieded>,采用默認值。
成功!
注釋掉main方法,打包UDF類:
jr-flinkudf-0.1.jar
上傳 jr-flinkudf-0.1.jar 到Flink的lib目錄下。
修改 Flink的conf的目錄下的 sql-client-defaults.yaml 文件
啟動Flink-sql-client
./sql-client.sh embedded
發現報錯:
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208) Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196) Caused by: org.apache.flink.table.api.ValidationException: Cannot find a public constructor with parameter types 'java.lang.Integer' for 'department.jr.Multiply'. at org.apache.flink.table.functions.FunctionService.generateInstance(FunctionService.java:205) at org.apache.flink.table.functions.FunctionService.createFunction(FunctionService.java:115) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$registerFunctions$10(ExecutionContext.java:704) at java.util.HashMap.forEach(HashMap.java:1288) at org.apache.flink.table.client.gateway.local.ExecutionContext.registerFunctions(ExecutionContext.java:703) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523) at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:185) at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:138) at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867) ... 3 more
原因:Multiply 類的構造器是int類型的參數,但是FlinkSQL只允許是Interger類型,見官方文檔
ref: https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/sqlClient.html
修改 Multiply 類的構造器 參數為Integer,重新打包上傳。
成功啟動sql-client端。使用兩個自定義UDF函數,沒問題!