UDF是SQL中很常見的功能,但在Spark-1.6及之前的版本,只能創建臨時UDF,不支持創建持久化的UDF,除非修改Spark源碼。從Spark-2.0開始,SparkSQL終於支持持久化的UDF。本文基於當前最新的Spark-2.0.2版本,講解SparkSQL中使用UDF和底層實現的原理。
1. 臨時UDF
創建和使用方法:
create temporary function tmp_trans_array as ''com.test.spark.udf.TransArray' using jar 'spark-test-udf-1.0.0.jar'; select tmp_trans_array (1, '\\|' , id, position) as (id0, position0) from test_udf limit 10;
實現原理,在org.apache.spark.sql.execution.command.CreateFunctionCommand類的run方法中,會判斷創建的Function是否是臨時方法,若是,則會創建一個臨時Function。從下面的代碼我可以看到,臨時函數直接注冊到functionRegistry(實現類是SimpleFunctionRegistry),即內存中。
def createTempFunction(
name: String,
info: ExpressionInfo,
funcDefinition: FunctionBuilder,
ignoreIfExists: Boolean): Unit = {
if (functionRegistry.lookupFunctionBuilder(name).isDefined && !ignoreIfExists) {
throw new TempFunctionAlreadyExistsException(name)
}
functionRegistry.registerFunction(name, info, funcDefinition)
}
下面是實際的注冊代碼,所有需要的UDF都會加載到StringKeyHashMap。
protected val functionBuilders =
StringKeyHashMap[(ExpressionInfo, FunctionBuilder)](caseSensitive = false)
override def registerFunction(
name: String,
info: ExpressionInfo,
builder: FunctionBuilder): Unit = synchronized {
functionBuilders.put(name, (info, builder))
}
2. 持久化UDF
使用方法如下,注意jar包最好放在HDFS上,在其他機器上也能使用。
create function trans_array as 'com.test.spark.udf.TransArray' using jar 'hdfs://namenodeIP:9000/libs/spark-test-udf-1.0.0.jar'; select trans_array (1, ' \\|' , id, position) as (id0, position0) from test_spark limit 10;
實現原理
(1)創建永久函數時,在org.apache.spark.sql.execution.command.CreateFunctionCommand中,會調用SessionCatalog的createFunction,最終執行了HiveExternalCatalog的createFunction,這里可以看出,創建永久函數會在Hive元數據庫中創建相應的函數。通過查詢元數據庫我們可以看到如下記錄,說明函數已經創建到元數據庫中。
mysql> select * from FUNCS; | FUNC_ID | CLASS_NAME | CREATE_TIME | DB_ID | FUNC_NAME | FUNC_TYPE | OWNER_NAME | OWNER_TYPE | | 96 | com.test.spark.udf.TransArray | 1481459766 | 1 | trans_array | 1 | NULL | USER | mysql> select * from FUNC_RU; | FUNC_ID | RESOURCE_TYPE | RESOURCE_URI | INTEGER_IDX | | 96 | 1 | hdfs://namenodeIP:9000/libs/spark-test-udf-1.0.0.jar | 0 |
(2)使用永久函數,在解析SQL中的UDF時,會調用SessionCatalog的lookupFunction0方法,在此方法中,首先會檢查內存中是否存在,如果不存在則會加載此UDF,加載時會把RESOURCE_URI發到ClassLoader的路徑中,如果把UDF注冊到內存的functionRegistry中。主要代碼在SessionCatalog,如下:
def lookupFunction(
name: FunctionIdentifier,
children: Seq[Expression]): Expression = synchronized {
// Note: the implementation of this function is a little bit convoluted.
// We probably shouldn't use a single FunctionRegistry to register all three kinds of functions
// (built-in, temp, and external).
if (name.database.isEmpty && functionRegistry.functionExists(name.funcName)) {
// This function has been already loaded into the function registry.
return functionRegistry.lookupFunction(name.funcName, children)
}
// If the name itself is not qualified, add the current database to it.
val database = name.database.orElse(Some(currentDb)).map(formatDatabaseName)
val qualifiedName = name.copy(database = database)
if (functionRegistry.functionExists(qualifiedName.unquotedString)) {
// This function has been already loaded into the function registry.
// Unlike the above block, we find this function by using the qualified name.
return functionRegistry.lookupFunction(qualifiedName.unquotedString, children)
}
// The function has not been loaded to the function registry, which means
// that the function is a permanent function (if it actually has been registered
// in the metastore). We need to first put the function in the FunctionRegistry.
// TODO: why not just check whether the function exists first?
val catalogFunction = try {
externalCatalog.getFunction(currentDb, name.funcName)
} catch {
case e: AnalysisException => failFunctionLookup(name.funcName)
case e: NoSuchPermanentFunctionException => failFunctionLookup(name.funcName)
}
loadFunctionResources(catalogFunction.resources)
// Please note that qualifiedName is provided by the user. However,
// catalogFunction.identifier.unquotedString is returned by the underlying
// catalog. So, it is possible that qualifiedName is not exactly the same as
// catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
// At here, we preserve the input from the user.
val info = new ExpressionInfo(catalogFunction.className, qualifiedName.unquotedString)
val builder = makeFunctionBuilder(qualifiedName.unquotedString, catalogFunction.className)
createTempFunction(qualifiedName.unquotedString, info, builder, ignoreIfExists = false)
// Now, we need to create the Expression.
functionRegistry.lookupFunction(qualifiedName.unquotedString, children)
}

