一、UDF介紹
UDF(User Define Function),即用戶自定義函數,Spark的官方文檔中沒有對UDF做過多介紹,猜想可能是認為比較簡單吧。
幾乎所有sql數據庫的實現都為用戶提供了擴展接口來增強sql語句的處理能力,這些擴展稱之為UDXXX,即用戶定義(User Define)的XXX,這個XXX可以是對單行操作的UDF,或者是對多行操作的UDAF,或者是UDTF,本次主要介紹UDF。
UDF的UD表示用戶定義,既然有用戶定義,就會有系統內建(built-in),一些系統內建的函數比如abs,接受一個數字返回它的絕對值,比如substr對字符串進行截取,它們的特點就是在執行sql語句的時候對每行記錄調用一次,每調用一次傳入一些參數,這些參數通常是表的某一列或者某幾列在當前行的值,然后產生一個輸出作為結果。
適用場景:UDF使用頻率極高,對於單條記錄進行比較復雜的操作,使用內置函數無法完成或者比較復雜的情況都比較適合使用UDF。
二、使用UDF
2.1 在SQL語句中使用UDF
在sql語句中使用UDF指的是在spark.sql("select udf_foo(…)")這種方式使用UDF,套路大致有以下幾步:
1. 實現UDF,可以是case class,可以是匿名類
2. 注冊到spark,將類綁定到一個name,后續會使用這個name來調用函數
3. 在sql語句中調用注冊的name調用UDF
下面是一個簡單的示例:
package cc11001100.spark.sql.udf import org.apache.spark.sql.SparkSession object SparkUdfInSqlBasicUsageStudy { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[*]").appName("SparkUdfStudy").getOrCreate() import spark.implicits._ // 注冊可以在sql語句中使用的UDF spark.udf.register("to_uppercase", (s: String) => s.toUpperCase()) // 創建一張表 Seq((1, "foo"), (2, "bar")).toDF("id", "text").createOrReplaceTempView("t_foo") spark.sql("select id, to_uppercase(text) from t_foo").show() } }
運行結果:
2.2 直接對列應用UDF(脫離sql)
在sql語句中使用比較麻煩,還要進行注冊什么的,可以定義一個UDF然后將它應用到某個列上:
package cc11001100.spark.sql.udf import org.apache.spark.sql.{SparkSession, functions} object SparkUdfInFunctionBasicUsageStudy { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[*]").appName("SparkUdfStudy").getOrCreate() import spark.implicits._ val ds = Seq((1, "foo"), (2, "bar")).toDF("id", "text") val toUpperCase = functions.udf((s: String) => s.toUpperCase) ds.withColumn("text", toUpperCase('text)).show() } }
運行效果:
需要注意的是受Scala limit 22限制,自定義UDF最多接受22個參數,不過正常情況下完全夠用了。
.