【Flink系列八】構建實時計算平台——動態加載UDF


前言

本文首發自https://www.cnblogs.com/slankka/ 轉載請注明出處。

本文的主要內容是介紹如何動態加載Flink作業的UDF。

Classloader

加載UDF一定是classLoader做的,而作業啟動的過程中,App ClassLoader的具體實現類為:

static class AppClassLoader extends URLClassLoader

而URLClassLoader 看名字是支持URL的。

實際情況

已知在Flink的啟動參數-C中加入Flink的UDF可以成功執行作業提交過程。

static final Option CLASSPATH_OPTION =
            new Option(
                    "C",
                    "classpath",
                    true,
                    "Adds a URL to each user code "
                            + "classloader  on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be "
                            + "accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple "
                            + "times for specifying more than one URL. The protocol must be supported by the "
                            + "{@link java.net.URLClassLoader}.");

但是Flink(1.13以前)會報錯:

Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. Can't resolve udf class com.slankka.flink.udf.PlusTwoFunc
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:564)
	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248)
	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:733)
	at com.slankka.rtc.flinkplatform.sql.SqlJob.lambda$start$0(SqlJob.java:123)
	at java.lang.Iterable.forEach(Iterable.java:75)
	at com.slankka.rtc.flinkplatform.sql.SqlJob.start(SqlJob.java:116)
	at com.slankka.rtc.flinkplatform.sql.SqlJobDriver.main(SqlJobDriver.java:14)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
	... 11 more
Caused by: java.lang.RuntimeException: Can't resolve udf class com.slankka.flink.udf.PlusTwoFunc
	at org.apache.flink.table.catalog.CatalogFunctionImpl.isGeneric(CatalogFunctionImpl.java:77)
	at org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFactory.createFunctionDefinition(HiveFunctionDefinitionFactory.java:63)
	at org.apache.flink.table.catalog.FunctionCatalog.resolvePreciseFunctionReference(FunctionCatalog.java:570)
	at org.apache.flink.table.catalog.FunctionCatalog.lambda$resolveAmbiguousFunctionReference$2(FunctionCatalog.java:614)
	at java.util.Optional.orElseGet(Optional.java:267)
	at org.apache.flink.table.catalog.FunctionCatalog.resolveAmbiguousFunctionReference(FunctionCatalog.java:614)
	at org.apache.flink.table.catalog.FunctionCatalog.lookupFunction(FunctionCatalog.java:361)
	at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:97)
	at org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)
	... 26 more
Caused by: java.lang.ClassNotFoundException: com.slankka.flink.udf.PlusTwoFunc
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:264)
	at org.apache.flink.table.catalog.CatalogFunctionImpl.isGeneric(CatalogFunctionImpl.java:72)
	... 40 more

這個調用鏈非常清楚,提交作業的程序是:

private final URLClassLoader userCodeClassLoader;

org.apache.flink.client.program.PackagedProgram#callMainMethod

這里已經進入作業的main函數了。使用的類加載器是:UserCodeClassLoader

this.extractedTempLibraries =
                this.jarFile == null
                        ? Collections.emptyList()
                        : extractContainedLibraries(this.jarFile);

this.userCodeClassLoader =
                ClientUtils.buildUserCodeClassLoader(
                        getJobJarAndDependencies(),
                        classpaths,
                        getClass().getClassLoader(),
                        configuration);


再看 UserCodeClassLoader的構建過程:
實際上已經加載了-C參數指定的JAR了

public static URLClassLoader buildUserCodeClassLoader(
            List<URL> jars, List<URL> classpaths, ClassLoader parent, Configuration configuration) {
        URL[] urls = new URL[jars.size() + classpaths.size()];
        for (int i = 0; i < jars.size(); i++) {
            urls[i] = jars.get(i);
        }
        for (int i = 0; i < classpaths.size(); i++) {
            urls[i + jars.size()] = classpaths.get(i);
        }
        final String[] alwaysParentFirstLoaderPatterns =
                CoreOptions.getParentFirstLoaderPatterns(configuration);
        final String classLoaderResolveOrder =
                configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
        FlinkUserCodeClassLoaders.ResolveOrder resolveOrder =
                FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder);
        final boolean checkClassloaderLeak =
                configuration.getBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER);
        return FlinkUserCodeClassLoaders.create(
                resolveOrder,
                urls,
                parent,
                alwaysParentFirstLoaderPatterns,
                NOOP_EXCEPTION_HANDLER,
                checkClassloaderLeak);
    }

但不知為何,仍舊出現上述錯誤。

臨時解決方案

可以自行編譯CatalogFunctionImpl.java 適用於Flink-1.13以下所有版本

	@Override
	public boolean isGeneric() {
		if (functionLanguage == FunctionLanguage.PYTHON) {
			return true;
		}
		try {
			ClassLoader cl = Thread.currentThread().getContextClassLoader();
			Class c = Class.forName(className, true, cl);
			if (UserDefinedFunction.class.isAssignableFrom(c)) {
				return true;
			}
		} catch (ClassNotFoundException e) {
			throw new RuntimeException(String.format("Can't resolve udf class %s", className), e);
		}
		return false;
	}

在Flink作業的main函數的開頭自行加載UDF Flink-1.13 以前有此問題。

//動態加載Jar
  public static void loadJar(List<URL> jarUrl) {
    //從URLClassLoader類加載器中獲取類的addURL方法
    Method method = null;
    try {
      method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
    } catch (NoSuchMethodException | SecurityException ignore) {
    }
    // 獲取方法的訪問權限
    assert method != null;
    boolean accessible = method.isAccessible();
    try {
      //修改訪問權限為可寫
      if (!accessible) {
        method.setAccessible(true);
      }
      // 獲取系統類加載器
      URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
      //jar路徑加入到系統url路徑里
      for (URL jar : jarUrl) {
        method.invoke(classLoader, jar);
      }
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      method.setAccessible(accessible);
    }
  }

新解決方案

[Flink-20606] Flink-1.13 已經修復此問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM