HIVE UDAF開發上手,你一看就懂!


單機跑一個腳本做數據處理,但是由於輸入數據實在太大,處理過程中占用大量內存經常被系統殺死,所以考慮放在hive中做數據聚合。借此機會研究下UDAF怎么寫,把踏坑的經驗寫出來,希望可以幫助大家少走彎路!嗯。。。就醬紫。

經常聽UDF,那么UDAF是什么鬼? 就是聚合功能的UDF啦~  比如hive內置的 count、sum、max、min、avg等。 但是內置的函數其實並不能滿足我們復雜的統計需求,就需要自己去實現一個方法。

有兩種實現方法,一種簡單的,一種通用的,簡單的方法據說有性能問題,我們就直接看通用的實現方法吧~

實現一個Generic UDAF有兩部分:

  1. resolver
  2. evaluator

這倆貨分別對應以下兩個抽象類:

import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;

resolver 主要用來做參數檢查和操作符重載,我們可以根據輸入參數的不同選擇相應的evaluator

evaluator 則是實現主要邏輯的地方,以靜態內部類的形式存在

#!Java
public class GenericUDAFHistogramNumeric extends AbstractGenericUDAFResolver {
  static final Log LOG = LogFactory.getLog(GenericUDAFHistogramNumeric.class.getName());
 
  @Override
  public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
    // 參數檢查
 
    return new GenericUDAFHistogramNumericEvaluator();
  }
  /**
   *這個靜態內部類就是寫我們自己邏輯的地方,這個類名根據需要改,這個是官方文檔寫的一個條形圖的例子
   */
  public static class GenericUDAFHistogramNumericEvaluator extends GenericUDAFEvaluator {
    // UDAF 邏輯
  }
}

這里需要介紹下這個例子的功能:hIve中的histogram_numeric函數,用來做直方圖的,比如我們要把年齡分30個桶構建直方圖就是SELECT histogram_numeric(age, 30) FROM employees;

下面我們繼續看例子

#!Java
  /**
  * 這個方法的參數新版的已經發生變化,直接就是TypeInfo [] parameters
  */
  public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
    TypeInfo [] parameters = info.getParameters();
    if (parameters.length != 2) {
      throw new UDFArgumentTypeException(parameters.length - 1,
          "Please specify exactly two arguments.");
    }
     
    // 檢查第一個參數類型,如果不是原始類型(基本類型)拋異常
    if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
      throw new UDFArgumentTypeException(0,
          "Only primitive type arguments are accepted but "
          + parameters[0].getTypeName() + " was passed as parameter 1.");
    }
    switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {
    case BYTE:
    case SHORT:
    case INT:
    case LONG:
    case FLOAT:
    case DOUBLE:
      break;
    case STRING:
    case BOOLEAN:
    default:
      throw new UDFArgumentTypeException(0,
          "Only numeric type arguments are accepted but "
          + parameters[0].getTypeName() + " was passed as parameter 1.");
    }
 
    // 檢查第二個參數類型,條形圖桶編號,假設這里要求是整型數
    if (parameters[1].getCategory() != ObjectInspector.Category.PRIMITIVE) {
      throw new UDFArgumentTypeException(1,
          "Only primitive type arguments are accepted but "
          + parameters[1].getTypeName() + " was passed as parameter 2.");
    }
    // 如果不是整型,拋異常
    if( ((PrimitiveTypeInfo) parameters[1]).getPrimitiveCategory()
        != PrimitiveObjectInspector.PrimitiveCategory.INT) {
      throw new UDFArgumentTypeException(1,
          "Only an integer argument is accepted as parameter 2, but "
          + parameters[1].getTypeName() + " was passed instead.");
    }
    //返回對應的處理類
    return new GenericUDAFHistogramNumericEvaluator();
  }

然后我們看看evaluator

#!Java
  public static class GenericUDAFHistogramNumericEvaluator extends GenericUDAFEvaluator {
 
    // For PARTIAL1 and COMPLETE: ObjectInspectors for original data,這倆貨是用來做類型轉換的
    private PrimitiveObjectInspector inputOI;
    private PrimitiveObjectInspector nbinsOI;
 
    // For PARTIAL2 and FINAL: ObjectInspectors for partial aggregations (list of doubles)
    private StandardListObjectInspector loi;
 
 
    @Override
    public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
      super.init(m, parameters);
      // return type goes here
    }
 
    @Override
    public Object terminatePartial(AggregationBuffer agg) throws HiveException {
      // return value goes here
    }
 
    @Override
    public Object terminate(AggregationBuffer agg) throws HiveException {
      // final return value goes here
    }
 
    @Override
    public void merge(AggregationBuffer agg, Object partial) throws HiveException {
    }
 
    @Override
    public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
    }
 
    // Aggregation buffer definition and manipulation methods
    static class StdAgg implements AggregationBuffer {
    };
 
    @Override
    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
    }
 
    @Override
    public void reset(AggregationBuffer agg) throws HiveException {
    }   
  }

 理解這個類我們首先需要了解一些事情,寫過Hadoop MapReduce 的同學應該知道,一個MapReduce job 分為  map、combine、reduce三個階段,map階段是把函數應用於輸入數據的每一條,構建key-value供后續聚合;combine階段是在mapper端局部進行聚合,聚合后的中間結果傳給reduce函數,輸入和reduce函數是一致的,被稱為mapper端的reduce。了解了這個過程后,我們來看evaluator的幾個方法,基本上是對應這幾個階段。

方法

作用

init

初始化函數

getNewAggregationBuffer

用來生成一個緩存對象,記錄臨時聚合結果

iterate

一條一條處理數據,將結果存入緩存

terminatePartial

這個方法意味着map階段結束,將緩存中的數據持久化存儲。這里返回的數據類型僅支持java基本類型、基本類型包裝類、數組以及Hadoop的Writables, Lists和Map,不要使用自定義類型

merge

接收terminatePartial返回的結果,合並局部聚合結果

terminate

返回最終結果,可以在這里實現最后的求值,比如計算平均值

在hive中,用一個枚舉類Mode來表示不同階段

  /**
   * Mode.
   *官方的注釋寫的挺詳細了^_^
   */
  public static enum Mode {
    /**
     * PARTIAL1: from original data to partial aggregation data: iterate() and
     * terminatePartial() will be called.
     */
    PARTIAL1,
        /**
     * PARTIAL2: from partial aggregation data to partial aggregation data:
     * merge() and terminatePartial() will be called.
     */
    PARTIAL2,
        /**
     * FINAL: from partial aggregation to full aggregation: merge() and
     * terminate() will be called.
     */
    FINAL,
        /**
     * COMPLETE: from original data directly to full aggregation: iterate() and
     * terminate() will be called.
     */
    COMPLETE
  };

嗯。。。 寫完后打個jar包出來,創建個臨時函數來使用既可以了

add jar hiveUDF.jar;
create temporary function test_udf as 'com.test.xxxx';
select test_udf(a,b) from table2 groupy by xxx.

好啦,先寫這么多,我寫的時候數據類型用的大部分是java的,所以產生了各種類型轉換錯誤,后面打算看看Hadoop的內置類型~ 希望能幫到大家~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM