Hive UDAF介紹與開發
本文參考Hive社區wiki文檔中UDAF而來。原文鏈接。采用Hive 1.2.1版本進行說明與測試。
UDAF簡介
UDAF是用戶自定義聚合函數。Hive支持其用戶自行開發聚合函數完成業務邏輯。
通俗點說,就是你可能需要做一些特殊的甚至是非常扭曲的邏輯聚合,但是Hive自帶的聚合函數不夠玩,同時也還找不到高效的等價玩法,那么,這時候就該自己寫一個UDAF了。
而從實現上來看,Hive的UDAF分為兩種:
- Simple。即繼承
org.apache.hadoop.hive.ql.exec.UDAF
類,並在派生類中以靜態內部類的方式實現org.apache.hadoop.hive.ql.exec.UDAFEvaluator
接口。這種方式簡單直接,但是在使用過程中需要依賴JAVA反射機制,因此性能相對較低。在Hive源碼包org.apache.hadoop.hive.contrib.udaf.example
中包含幾個示例。可以直接參閱。但是這些接口已經被注解為Deprecated,建議不要使用這種方式開發新的UDAF函數。 - Generic。這是Hive社區推薦的新的寫法,以抽象類代替原有的接口。新的抽象類
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
替代老的UDAF接口,新的抽象類org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
替代老的UDAFEvaluator接口。
產生這兩種方式的原因並不高深,就是結構演進,歷史遺留。原文鏈接最后一段說明了一下演進的版本以及原因。
UDAF相關類和接口簡介
AbstractGenericUDAFResolver
:該抽象類實現了GenericUDAFResolver2
的接口。UDAF主類須繼承該抽象類,其主要作用是實現參數類型檢查和操作符重載。可以為同一個函數實現不同入參的版本。org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
:該抽象類為UDAF具體的邏輯處理,包括幾個必須實現的抽象方法,這幾個方法負責完成UDAF所需要處理的邏輯。
UDAF的運行流程簡介
抽象類GenericUDAFEvaluator
中,包含一個靜態內部枚舉類,和一系列抽象方法。這個枚舉類的注釋中,解釋了各個枚舉值的運行階段和運行內容。按照時間先后順序,分別有:
- PARTIAL1:原始數據到部分聚合,調用iterate和terminatePartial --> map階段
- PARTIAL2: 部分聚合到部分聚合,調用merge和terminatePartial --> combine階段
- FINAL: 部分聚合到完全聚合,調用merge和terminate --> reduce階段
- COMPLETE: 從原始數據直接到完全聚合 --> map階段,並且沒有reduce
那么,這幾個方法分別干了些啥呢?
-
init
: 實例化Evaluator類的時候調用的,在不同的階段需要返回不同的OI。其入參和返回值,以及Mode階段的關系如下表:入參 返回值的使用者 P1 原始數據 P2 部分聚合數據 F 部分聚合數據 C 原始數據 -
getNewAggregationBuffer
: 獲取存放中間結果的對象 -
iterate
:處理一行數據 -
terminatePartial
:返回部分聚合數據的持久化對象。因為調用這個方法時,說明已經是map或者combine的結束了,必須將數據持久化以后交給reduce進行處理。只支持JAVA原始數據類型及其封裝類型、HADOOP Writable類型、List、Map,不能返回自定義的類,即使實現了Serializable也不行,否則會出現問題或者錯誤的結果。 -
merge
:將terminatePartial
返回的部分聚合數據進行合並,需要使用到對應的OI。 -
terminate
:結束,生成最終結果。
兩類UDAF基本原理相同,下面以histogram_numeric
這個系統自帶的Generic UDAF為例,描述一下UDAF的運行和開發過程。這個函數涵蓋了UDAF多個特性,比如入參類型檢查並返回復雜數據類型。
UDAF開發
1. 構造UDAF代碼骨架部分
先搭建好代碼骨架,完成需要繼承的類和接口結構。
public class GenericUDAFHistogramNumeric extends AbstractGenericUDAFResolver {
static final Log LOG = LogFactory.getLog(GenericUDAFHistogramNumeric.class.getName());
@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
// TODO: 1. Type-checking goes here!
return new GenericUDAFHistogramNumericEvaluator();
}
public static class GenericUDAFHistogramNumericEvaluator extends GenericUDAFEvaluator {
// UDAF logic goes here!
}
}
2.實現getEvaluator方法
該方法非常簡單,其主要目的是校驗UDAF的入參個數和入參類型並返回Evaluator對象。調用者傳入不同的參數時,向其返回不同的Evaluator或者直接拋出異常。這部分代碼可以寫入骨架代碼中的TODO:1
處。例如本例中的實現,該UDAF不支持多種參數的版本,限定參數個數必須為2,並且第一個參數必須是簡單數據類型,第二個參數必須是int。
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
if (parameters.length != 2) {
throw new UDFArgumentTypeException(parameters.length - 1,
"Please specify exactly two arguments.");
}
// validate the first parameter, which is the expression to compute over
if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(0,
"Only primitive type arguments are accepted but "
+ parameters[0].getTypeName() + " was passed as parameter 1.");
}
switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {
case BYTE:
case SHORT:
case INT:
case LONG:
case FLOAT:
case DOUBLE:
case TIMESTAMP:
case DECIMAL:
break;
case STRING:
case BOOLEAN:
case DATE:
default:
throw new UDFArgumentTypeException(0,
"Only numeric type arguments are accepted but "
+ parameters[0].getTypeName() + " was passed as parameter 1.");
}
// validate the second parameter, which is the number of histogram bins
if (parameters[1].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(1,
"Only primitive type arguments are accepted but "
+ parameters[1].getTypeName() + " was passed as parameter 2.");
}
if( ((PrimitiveTypeInfo) parameters[1]).getPrimitiveCategory()
!= PrimitiveObjectInspector.PrimitiveCategory.INT) {
throw new UDFArgumentTypeException(1,
"Only an integer argument is accepted as parameter 2, but "
+ parameters[1].getTypeName() + " was passed instead.");
}
return new GenericUDAFHistogramNumericEvaluator();
}
3.實現Evaluator
從骨架代碼中,可以看到一個靜態內部類實現了Evaluator的抽象類,並且必須實現它的幾個抽象方法。這些方法的調用時機即意義參見上面的表格以及GenericUDAFEvaluator
類的靜態內部枚舉類Mode
。
4. 注冊函數
將函數直接寫入FunctionRegistry
類的靜態代碼塊中,system.registerGenericUDAF("histogram_numeric", new GenericUDAFHistogramNumeric());
,或者將UDAF代碼單獨打包成jar,采用CREATE FUNCTION
語句創建函數。