案例解析丨 Spark Hive 自定義函數應用


摘要:Spark目前支持UDF,UDTF,UDAF三種類型的自定義函數。

 

1. 簡介

 

Spark目前支持UDF,UDTF,UDAF三種類型的自定義函數。UDF使用場景:輸入一行,返回一個結果,一對一,比如定義一個函數,功能是輸入一個IP地址,返回一個對應的省份。UDTF使用場景: 輸入一行,返回多行(hive),一對多, 而sparkSQL中沒有UDTF, spark中用flatMap即可實現該功能。UDAF: 輸入多行,返回一行, aggregate(主要用於聚合功能,比如groupBy,count,sum), 這些是spark自帶的聚合函數,但是復雜相對復雜。

 

Spark底層其實以CatalogFunction結構封裝了一個函數,其中FunctionIdentifier描述了函數名字等基本信息,FunctionResource描述了文件類型(jar或者file)和文件路徑;Spark的SessionCatalog提供了函數注冊,刪除,獲取等一些列接口,Spark的Executor在接收到函數執行sql請求時,通過緩存的CatalogFunction信息,找到CatalogFunction中對應的jar地址以及ClassName, JVM動態加載jar,並通過ClassName反射執行對應的函數。

 

 

圖1. CatalogFunction結構體

 

 

圖2. 注冊加載函數邏輯

 

Hive的HiveSessionCatalog是繼承Spark的SessionCatalog,對Spark的基本功能做了一層裝飾以適配Hive的基本功能,其中包括函數功能。HiveSimpleUDF對應UDF,HiveGenericUDF對應GenericUDF,HiveUDAFFunction對應AbstractGenericUDAFResolve以及UDAF,HiveGenericUDTF對應GenericUDTF

 

 

圖3. Hive裝飾spark函數邏輯

 

2. UDF

 

UDF是最常用的函數,使用起來相對比較簡單,主要分為兩類UDF:簡單數據類型,繼承UDF接口;復雜數據類型,如Map,List,Struct等數據類型,繼承GenericUDF接口。

 

簡單類型實現UDF時,可自定義若干個名字evaluate為的方法,參數和返回類型根據需要自己設置。因為UDF接口默認使用DefaultUDFMethodResolver去方法解析器獲取方法,解析器是根據用戶輸入參數和寫死的名字evaluate去反射尋找方法元數據。當然用戶也可以自定義解析器解析方法。

 

 

圖4. 自定義UDF簡單示例

 

 

圖5.默認UDF方法解析器

 

3. UDAF

 

UDAF是聚合函數,目前實現方式主要有三種:實現UDAF接口,比較老的簡答實現方式,目前已經被廢棄;實現UserDefinedAggregateFunction,目前使用比較普遍方式,按階段實現接口聚集數據;實現AbstractGenericUDAFResolver,實現相對UserDefinedAggregateFunction方式稍微復雜點,還需要實現一個計算器Evaluator(如通用計算器GenericUDAFEvaluator),UDAF的邏輯處理主要發生在Evaluator。

 

UserDefinedAggregateFunction定義輸入輸出數據結構,實現初始化緩沖區(initialize),聚合單條數據(update),聚合緩存區(merge)以及計算最終結果(evaluate)。

 

 

 

圖6.自定義UDAF簡單示例

 

4. UDTF

 

UDTF簡單粗暴的理解是一行生成多行的自動函數,可以生成多行多列,又被稱為表生成函數。目前實現方式是實現GenericUDTF接口,實現2個接口,initialize接口參數校驗,列的定義,process接口接受一行數據,切割數據。

 

 

 

圖7.自定義UDTF簡單示例


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM