(轉)Hive自定義UDAF詳解


UDAF有兩種,第一種是比較簡單的形式,利用抽象類UDAF和UDAFEvaluator,暫不做討論。主要說一下第二種形式,利用接口GenericUDAFResolver2(或者抽象類AbstractGenericUDAFResolver)和抽象類GenericUDAFEvaluator。
        這里用AbstractGenericUDAFResolver做說明。

 
public abstract class AbstractGenericUDAFResolver implements GenericUDAFResolver2 {

  @SuppressWarnings("deprecation")
  @Override
  public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
    throws SemanticException {

    if (info.isAllColumns()) {
      throw new SemanticException(
          "The specified syntax for UDAF invocation is invalid.");
    }

    return getEvaluator(info.getParameters());
  }

  @Override
  public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) 
    throws SemanticException {
    throw new SemanticException(
          "This UDAF does not support the deprecated getEvaluator() method.");
  }
}

        可以看到,該抽象類有兩個方法,其中一個已經被棄用,所以只需要實現參數類型為TypeInfo的getEvaluator方法即可。

        該方法其實相當於一個工廠,TypeInfo表示在使用時傳入該UDAF的參數的類型。該方法主要做的工作有:

  • 檢查參數長度和類型
  • 根據參數返回對應的實際處理對象

        返回的對象類型為GenericUDAFEvaluator,這是一個抽象類:

public abstract class GenericUDAFEvaluator implements Closeable {

    ......

    public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
        // This function should be overriden in every sub class
        // And the sub class should call super.init(m, parameters) to get mode set.
        mode = m;
        return null;
    }

    public abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;

    public abstract void reset(AggregationBuffer agg) throws HiveException;

    public abstract void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;

    public abstract Object terminatePartial(AggregationBuffer agg) throws HiveException;

    public abstract void merge(AggregationBuffer agg, Object partial) throws HiveException;

    public abstract Object terminate(AggregationBuffer agg) throws HiveException;
    ......
}

        說明上述方法的之前,需要提一個GenericUDAFEvaluator的內部枚舉類Mode

 
public static enum Mode {
    /**
     * 相當於map階段,調用iterate()和terminatePartial()
     */
    PARTIAL1,
    /**
     * 相當於combiner階段,調用merge()和terminatePartial()
     */
    PARTIAL2,
    /**
     * 相當於reduce階段調用merge()和terminate()
     */
    FINAL,
    /**
     * COMPLETE: 相當於沒有reduce階段map,調用iterate()和terminate()
     */
    COMPLETE
  };

        可以看到,UDAF將任務分成了幾種類型,PARTIAL1相當於MR程序的map階段,負責迭代處理記錄並返回該階段的中間結果。PARTIAL2相當於Combiner,對map階段的結果進行一次聚合。FINAL是reduce階段,進行整體聚合以及返回最終結果。COMPLETE有點特殊,是一個沒有reduce階段的map過程,所以在進行記錄迭代之后,直接返回最終結果。
        再來看GenericUDAFEvaluator中的各方法

 
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {...}

        初始化方法,在Mode的每一個階段啟動時會執行init方法。該方法有兩個參數,第一個參數是Mode,可以根據此參數判斷當前執行的是哪個階段,進行該階段相應的初始化工作。ObjectInspector是一個抽象的類型描述,例如:當參數類型是原生類型時,可以轉化為PrimitiveObjectInspector,除此之外還有StructObjectInspector等等。ObjectInspector只是描述類型,並不存儲實際數據。后面的具體例子中會有一些使用說明。

        ObjectInspector[]的長度不是固定的,要看當前是處於哪個階段。如果是PARTIAL1,那么與使用時傳入該UDAF的參數個數一致;如果是FINAL階段,長度就是1了,因為map階段返回的結果只有一個對象。

 
public abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;

public abstract void reset(AggregationBuffer agg) throws HiveException;

        AggregationBuffer是一個標識接口,沒有任何需要實現的方法。實現該接口的類被用於暫存中間結果。reset是為了重置AggregationBuffer,但是在實際應用場景中沒有發現單獨調用該方法進行重置,有可能是聚合key的數據量還不夠大,在后面會再說一下這個問題。        

public abstract void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;
public abstract Object terminatePartial(AggregationBuffer agg) throws HiveException;
public abstract void merge(AggregationBuffer agg, Object partial) throws HiveException;
public abstract Object terminate(AggregationBuffer agg) throws HiveException;
   

        iterate方法存在於MR的M階段,用於處理每一條輸入記錄。Object[]作為輸入傳入UFAF,AggregationBuffer作為中間緩存暫存結果。需要注意的是,每次調用iterate傳入的AggregationBuffer並不一定是同一個對象。Hive調用UDAF的時候會用一個Map來管理AggregationBuffer,Map的key即為需要聚合的key。就通過實際運行過程來看,在每一次iterate調用之前,會根據聚合key從Map中查找對應的AggregationBuffer,若能找到則直接返回AggregationBuffer對象,找不到則調用getNewAggregationBuffer方法新建並插入Map中並返回結果。

        terminatePartial方法在iterate處理完所有輸入后調用,用於返回初步的聚合結果。

        merge方法存在於MR的R階段(也同樣存在於Combine階段),用於最后的聚合。Object類型的partial參數與terminatePartial返回值一致,AggregationBuffer參數與上述一致。         terminate方法在merge方法執行完畢之后調用,用於進行最后的處理,並返回最后結果。        

像上面提到的Mode一樣,這些方法並不一定都會被調用,與Hive解析成的MR程序類型有關。例如解析后的MR程序只有M階段,則只會調用iterate和terminate。實際使用過程中,由於聚合key數據量有限,內存可以承載,所以沒有發現reset單獨調用的情況。每次遇到一個不同的key,則新建一個AggregationBuffer,沒有看源碼,不知道當聚合key很大的時候,是否會調用reset進行對象重用。

轉載地址:http://paddy-w.iteye.com/blog/2081409


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM