摘要:Spark目前支持UDF,UDTF,UDAF三種類型的自定義函數。
1. 簡介
Spark目前支持UDF,UDTF,UDAF三種類型的自定義函數。UDF使用場景:輸入一行,返回一個結果,一對一,比如定義一個函數,功能是輸入一個IP地址,返回一個對應的省份。UDTF使用場景: 輸入一行,返回多行(hive),一對多, 而sparkSQL中沒有UDTF, spark中用flatMap即可實現該功能。UDAF: 輸入多行,返回一行, aggregate(主要用於聚合功能,比如groupBy,count,sum), 這些是spark自帶的聚合函數,但是復雜相對復雜。
Spark底層其實以CatalogFunction結構封裝了一個函數,其中FunctionIdentifier描述了函數名字等基本信息,FunctionResource描述了文件類型(jar或者file)和文件路徑;Spark的SessionCatalog提供了函數注冊,刪除,獲取等一些列接口,Spark的Executor在接收到函數執行sql請求時,通過緩存的CatalogFunction信息,找到CatalogFunction中對應的jar地址以及ClassName, JVM動態加載jar,並通過ClassName反射執行對應的函數。
圖1. CatalogFunction結構體
圖2. 注冊加載函數邏輯
Hive的HiveSessionCatalog是繼承Spark的SessionCatalog,對Spark的基本功能做了一層裝飾以適配Hive的基本功能,其中包括函數功能。HiveSimpleUDF對應UDF,HiveGenericUDF對應GenericUDF,HiveUDAFFunction對應AbstractGenericUDAFResolve以及UDAF,HiveGenericUDTF對應GenericUDTF
圖3. Hive裝飾spark函數邏輯
2. UDF
UDF是最常用的函數,使用起來相對比較簡單,主要分為兩類UDF:簡單數據類型,繼承UDF接口;復雜數據類型,如Map,List,Struct等數據類型,繼承GenericUDF接口。
簡單類型實現UDF時,可自定義若干個名字evaluate為的方法,參數和返回類型根據需要自己設置。因為UDF接口默認使用DefaultUDFMethodResolver去方法解析器獲取方法,解析器是根據用戶輸入參數和寫死的名字evaluate去反射尋找方法元數據。當然用戶也可以自定義解析器解析方法。
圖4. 自定義UDF簡單示例
圖5.默認UDF方法解析器
3. UDAF
UDAF是聚合函數,目前實現方式主要有三種:實現UDAF接口,比較老的簡答實現方式,目前已經被廢棄;實現UserDefinedAggregateFunction,目前使用比較普遍方式,按階段實現接口聚集數據;實現AbstractGenericUDAFResolver,實現相對UserDefinedAggregateFunction方式稍微復雜點,還需要實現一個計算器Evaluator(如通用計算器GenericUDAFEvaluator),UDAF的邏輯處理主要發生在Evaluator。
UserDefinedAggregateFunction定義輸入輸出數據結構,實現初始化緩沖區(initialize),聚合單條數據(update),聚合緩存區(merge)以及計算最終結果(evaluate)。
圖6.自定義UDAF簡單示例
4. UDTF
UDTF簡單粗暴的理解是一行生成多行的自動函數,可以生成多行多列,又被稱為表生成函數。目前實現方式是實現GenericUDTF接口,實現2個接口,initialize接口參數校驗,列的定義,process接口接受一行數據,切割數據。
圖7.自定義UDTF簡單示例
