直接上代碼,詳見注釋
import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkContext, SparkConf} /** * Created by zxh on 2016/6/10. */ object UDF_test { def main(args: Array[String]): Unit = { val conf = new SparkConf() implicit val sc = new SparkContext(conf) implicit val sqlContext = new HiveContext(sc) import sqlContext.implicits._ val data = sc.parallelize(Seq(("a", 1), ("bb", 5), ("cccc", 10), ("dddddd", 15))).toDF("a", "b") data.registerTempTable("data") { //函數體采用原生類型(非Column類型),使用udf包裝函數體,將函數體注冊到sqlContext.udf import org.apache.spark.sql.functions._ //函數體 val filter_length_f = (str: String, _length: Int) => { str.length > _length; } //注冊函數體到當前sqlContext,注意,注冊到sqlContext的函數體,參數不能為Column //注冊后,可以在以下地方使用:1、df.selectExpr 2、df.filter ,3、將該df注冊為temptable,之后在sql中使用 sqlContext.udf.register("filter_length", filter_length_f) val filter_length = udf(filter_length_f) //為方便使用Column,我們對函數體進行包裝,包裝后的輸入參數為Column data.select($"*", filter_length($"a", lit(2))).show //使用udf包裝過的,必須傳入Column,注意 lit(2) data.selectExpr("*", " filter_length(a,2) as ax").show //select 若寫表達式調用函數,則需要使用selectExpr data.filter(filter_length($"a", lit(2))).show //同select data.filter("filter_length(a,2)").show //filter調用表達式,可以直接使用df.filter函數, sqlContext.sql("select *,filter_length(a,2) from data").show sqlContext.sql("select *,filter_length(a,2) from data where filter_length(a,2)").show } { //函數體使用Column類型,無法注冊到sqlContext.udf //使用udf包裝后,每列都必須輸入column,能否我們自己定義呢,比如一個參數是Column,一個是其他類型 import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column val filter_length_f2 = (str: Column, _length: Int) => { length(str) > _length } sqlContext.udf.register("filter_length", filter_length_f2) //todo:不好意思,這里注冊不了,注冊到sqlContext.udf的函數,入參不支持Column類型 data.select($"*", filter_length_f2($"a", 2)).show //不用udf包裝,我們就可以完全自定義,這時 length 就可以傳入整型了 data.selectExpr("*", " filter_length_f2(a,2) as ax").show //todo:不好意思,這里用不了了, data.filter(filter_length_f2($"a", 2)).show //同select data.filter("filter_length(a,2)").show //todo:不好意思,這里用不了了 } //最后,我們寫一個相對通用的吧 { //定義兩個函數體,入參一個使用column類型,一個使用原生類型,將原生類型函數注冊到sqlContext.udf import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column //函數體 val filter_length_f = (str: String, _length: Int) => { str.length > _length; } //主函數,下面df.select df.filter 等中使用 val filter_length = (str: Column, _length: Int) => { length(str) > _length } //注冊函數體到當前sqlContext,注意,注冊到sqlContext的函數體,參數不能為Column //注冊后,可以在以下地方使用:1、df.selectExpr 2、df.filter ,3、將該df注冊為temptable,之后在sql中使用 sqlContext.udf.register("filter_length", filter_length_f) //這里我們不使用udf了,直接使用自己定義的支持Column的函數 //val filter_length = udf(filter_length_f) //為方便使用Column,我們對函數體進行包裝,包裝后的輸入參數為Column data.select($"*", filter_length($"a", 2)).show //使用udf包裝過的,必須傳入Column,注意 lit(2) data.selectExpr("*", " filter_length(a,2) as ax").show //select 若寫表達式調用函數,則需要使用selectExpr data.filter(filter_length($"a", 2)).show //同select data.filter("filter_length(a,2)").show //filter調用表達式,可以直接使用df.filter函數, sqlContext.sql("select *,filter_length(a,2) from data").show sqlContext.sql("select *,filter_length(a,2) from data where filter_length(a,2)").show } } }
