UDAF有兩種,第一種是比較簡單的形式,利用抽象類UDAF和UDAFEvaluator,暫不做討論。主要說一下第二種形式,利用接口GenericUDAFResolver2(或者抽象類AbstractGenericUDAFResolver)和抽象類GenericUDAFEvaluator。
這里用AbstractGenericUDAFResolver做說明。
public abstract class AbstractGenericUDAFResolver implements GenericUDAFResolver2 { @SuppressWarnings("deprecation") @Override public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException { if (info.isAllColumns()) { throw new SemanticException( "The specified syntax for UDAF invocation is invalid."); } return getEvaluator(info.getParameters()); } @Override public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException { throw new SemanticException( "This UDAF does not support the deprecated getEvaluator() method."); } }
可以看到,該抽象類有兩個方法,其中一個已經被棄用,所以只需要實現參數類型為TypeInfo的getEvaluator方法即可。
該方法其實相當於一個工廠,TypeInfo表示在使用時傳入該UDAF的參數的類型。該方法主要做的工作有:
- 檢查參數長度和類型
- 根據參數返回對應的實際處理對象
返回的對象類型為GenericUDAFEvaluator,這是一個抽象類:
public abstract class GenericUDAFEvaluator implements Closeable { ...... public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { // This function should be overriden in every sub class // And the sub class should call super.init(m, parameters) to get mode set. mode = m; return null; } public abstract AggregationBuffer getNewAggregationBuffer() throws HiveException; public abstract void reset(AggregationBuffer agg) throws HiveException; public abstract void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException; public abstract Object terminatePartial(AggregationBuffer agg) throws HiveException; public abstract void merge(AggregationBuffer agg, Object partial) throws HiveException; public abstract Object terminate(AggregationBuffer agg) throws HiveException; ...... }
說明上述方法的之前,需要提一個GenericUDAFEvaluator的內部枚舉類Mode
public static enum Mode { /** * 相當於map階段,調用iterate()和terminatePartial() */ PARTIAL1, /** * 相當於combiner階段,調用merge()和terminatePartial() */ PARTIAL2, /** * 相當於reduce階段調用merge()和terminate() */ FINAL, /** * COMPLETE: 相當於沒有reduce階段map,調用iterate()和terminate() */ COMPLETE };
可以看到,UDAF將任務分成了幾種類型,PARTIAL1相當於MR程序的map階段,負責迭代處理記錄並返回該階段的中間結果。PARTIAL2相當於Combiner,對map階段的結果進行一次聚合。FINAL是reduce階段,進行整體聚合以及返回最終結果。COMPLETE有點特殊,是一個沒有reduce階段的map過程,所以在進行記錄迭代之后,直接返回最終結果。
再來看GenericUDAFEvaluator中的各方法
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {...}
初始化方法,在Mode的每一個階段啟動時會執行init方法。該方法有兩個參數,第一個參數是Mode,可以根據此參數判斷當前執行的是哪個階段,進行該階段相應的初始化工作。ObjectInspector是一個抽象的類型描述,例如:當參數類型是原生類型時,可以轉化為PrimitiveObjectInspector,除此之外還有StructObjectInspector等等。ObjectInspector只是描述類型,並不存儲實際數據。后面的具體例子中會有一些使用說明。
ObjectInspector[]的長度不是固定的,要看當前是處於哪個階段。如果是PARTIAL1,那么與使用時傳入該UDAF的參數個數一致;如果是FINAL階段,長度就是1了,因為map階段返回的結果只有一個對象。
public abstract AggregationBuffer getNewAggregationBuffer() throws HiveException; public abstract void reset(AggregationBuffer agg) throws HiveException;
AggregationBuffer是一個標識接口,沒有任何需要實現的方法。實現該接口的類被用於暫存中間結果。reset是為了重置AggregationBuffer,但是在實際應用場景中沒有發現單獨調用該方法進行重置,有可能是聚合key的數據量還不夠大,在后面會再說一下這個問題。
iterate方法存在於MR的M階段,用於處理每一條輸入記錄。Object[]作為輸入傳入UFAF,AggregationBuffer作為中間緩存暫存結果。需要注意的是,每次調用iterate傳入的AggregationBuffer並不一定是同一個對象。Hive調用UDAF的時候會用一個Map來管理AggregationBuffer,Map的key即為需要聚合的key。就通過實際運行過程來看,在每一次iterate調用之前,會根據聚合key從Map中查找對應的AggregationBuffer,若能找到則直接返回AggregationBuffer對象,找不到則調用getNewAggregationBuffer方法新建並插入Map中並返回結果。
terminatePartial方法在iterate處理完所有輸入后調用,用於返回初步的聚合結果。
merge方法存在於MR的R階段(也同樣存在於Combine階段),用於最后的聚合。Object類型的partial參數與terminatePartial返回值一致,AggregationBuffer參數與上述一致。 terminate方法在merge方法執行完畢之后調用,用於進行最后的處理,並返回最后結果。
像上面提到的Mode一樣,這些方法並不一定都會被調用,與Hive解析成的MR程序類型有關。例如解析后的MR程序只有M階段,則只會調用iterate和terminate。實際使用過程中,由於聚合key數據量有限,內存可以承載,所以沒有發現reset單獨調用的情況。每次遇到一個不同的key,則新建一個AggregationBuffer,沒有看源碼,不知道當聚合key很大的時候,是否會調用reset進行對象重用。
轉載地址:http://paddy-w.iteye.com/blog/2081409