Spark筆記之使用UDF(User Define Function)


 

一、UDF介紹

UDF(User Define Function),即用戶自定義函數,Spark的官方文檔中沒有對UDF做過多介紹,猜想可能是認為比較簡單吧。

幾乎所有sql數據庫的實現都為用戶提供了擴展接口來增強sql語句的處理能力,這些擴展稱之為UDXXX,即用戶定義(User Define)的XXX,這個XXX可以是對單行操作的UDF,或者是對多行操作的UDAF,或者是UDTF,本次主要介紹UDF。

UDF的UD表示用戶定義,既然有用戶定義,就會有系統內建(built-in),一些系統內建的函數比如abs,接受一個數字返回它的絕對值,比如substr對字符串進行截取,它們的特點就是在執行sql語句的時候對每行記錄調用一次,每調用一次傳入一些參數,這些參數通常是表的某一列或者某幾列在當前行的值,然后產生一個輸出作為結果。

適用場景:UDF使用頻率極高,對於單條記錄進行比較復雜的操作,使用內置函數無法完成或者比較復雜的情況都比較適合使用UDF。

 

二、使用UDF

2.1 在SQL語句中使用UDF

在sql語句中使用UDF指的是在spark.sql("select udf_foo(…)")這種方式使用UDF,套路大致有以下幾步:

1. 實現UDF,可以是case class,可以是匿名類

2. 注冊到spark,將類綁定到一個name,后續會使用這個name來調用函數

3. 在sql語句中調用注冊的name調用UDF

下面是一個簡單的示例:

package cc11001100.spark.sql.udf

import org.apache.spark.sql.SparkSession

object SparkUdfInSqlBasicUsageStudy {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().master("local[*]").appName("SparkUdfStudy").getOrCreate()
    import spark.implicits._
    // 注冊可以在sql語句中使用的UDF
    spark.udf.register("to_uppercase", (s: String) => s.toUpperCase())
    // 創建一張表
    Seq((1, "foo"), (2, "bar")).toDF("id", "text").createOrReplaceTempView("t_foo")
    spark.sql("select id, to_uppercase(text) from t_foo").show()

  }

}

運行結果:

image 

 

2.2 直接對列應用UDF(脫離sql)

在sql語句中使用比較麻煩,還要進行注冊什么的,可以定義一個UDF然后將它應用到某個列上:

package cc11001100.spark.sql.udf

import org.apache.spark.sql.{SparkSession, functions}

object SparkUdfInFunctionBasicUsageStudy {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().master("local[*]").appName("SparkUdfStudy").getOrCreate()

    import spark.implicits._
    val ds = Seq((1, "foo"), (2, "bar")).toDF("id", "text")
    val toUpperCase = functions.udf((s: String) => s.toUpperCase)
    ds.withColumn("text", toUpperCase('text)).show()

  }

}

運行效果:

image

 

需要注意的是受Scala limit 22限制,自定義UDF最多接受22個參數,不過正常情況下完全夠用了。

 

.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM