一、UDF介紹
UDF(User Define Function),即用戶自定義函數,Spark的官方文檔中沒有對UDF做過多介紹,猜想可能是認為比較簡單吧。
幾乎所有sql數據庫的實現都為用戶提供了擴展接口來增強sql語句的處理能力,這些擴展稱之為UDXXX,即用戶定義(User Define)的XXX,這個XXX可以是對單行操作的UDF,或者是對多行操作的UDAF,或者是UDTF,本次主要介紹UDF。
UDF的UD表示用戶定義,既然有用戶定義,就會有系統內建(built-in),一些系統內建的函數比如abs,接受一個數字返回它的絕對值,比如substr對字符串進行截取,它們的特點就是在執行sql語句的時候對每行記錄調用一次,每調用一次傳入一些參數,這些參數通常是表的某一列或者某幾列在當前行的值,然后產生一個輸出作為結果。
適用場景:UDF使用頻率極高,對於單條記錄進行比較復雜的操作,使用內置函數無法完成或者比較復雜的情況都比較適合使用UDF。
二、使用UDF
2.1 在SQL語句中使用UDF
在sql語句中使用UDF指的是在spark.sql("select udf_foo(…)")這種方式使用UDF,套路大致有以下幾步:
1. 實現UDF,可以是case class,可以是匿名類
2. 注冊到spark,將類綁定到一個name,后續會使用這個name來調用函數
3. 在sql語句中調用注冊的name調用UDF
下面是一個簡單的示例:
package cc11001100.spark.sql.udf
import org.apache.spark.sql.SparkSession
object SparkUdfInSqlBasicUsageStudy {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("SparkUdfStudy").getOrCreate()
import spark.implicits._
// 注冊可以在sql語句中使用的UDF
spark.udf.register("to_uppercase", (s: String) => s.toUpperCase())
// 創建一張表
Seq((1, "foo"), (2, "bar")).toDF("id", "text").createOrReplaceTempView("t_foo")
spark.sql("select id, to_uppercase(text) from t_foo").show()
}
}
運行結果:
2.2 直接對列應用UDF(脫離sql)
在sql語句中使用比較麻煩,還要進行注冊什么的,可以定義一個UDF然后將它應用到某個列上:
package cc11001100.spark.sql.udf
import org.apache.spark.sql.{SparkSession, functions}
object SparkUdfInFunctionBasicUsageStudy {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("SparkUdfStudy").getOrCreate()
import spark.implicits._
val ds = Seq((1, "foo"), (2, "bar")).toDF("id", "text")
val toUpperCase = functions.udf((s: String) => s.toUpperCase)
ds.withColumn("text", toUpperCase('text)).show()
}
}
運行效果:
需要注意的是受Scala limit 22限制,自定義UDF最多接受22個參數,不過正常情況下完全夠用了。
.
