UDF/UDAF開發總結


參考文章:

1.UDF,UDAF,UDTF區別

UDF:最簡單的自定義,實現一對一,輸入一行數據輸出一行數據  
UDAF:自定義聚合函數,實現多對一,輸入多行數據輸出一行數  
UDTF:用來實現一行輸入多行輸出,這次先不講 

2.UDF開發

要點:1.UDF類需要繼承org.apache.hadoop.hive.ql.exec.UDF.

2.UDF類需要實現evaluate類.

UDF開發實例:

開發一個udf getdate以返回當前系統時間

package udf.test;
import org.apache.hadoop.hive.ql.exec.UDF;

import java.text.SimpleDateFormat;
import java.util.Date;

public class Getdate extends UDF {
    public String evaluate(){
        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
    }
}

然后maven打包:mvn clean compile.package
接着把包放到服務器上,比如放到/home/azkaban/UDF/udf-jar.1.1.0
進入hive shell,執行add jar /home/azkaban/UDF/udf-jar.1.1.0
接着執行create tempopary function getdate as 'udf.test.Getdate';
這里的getdate就是function名稱。在hive shell中執行select getdate()就會返回當前的系統時間。

待解決:hive中類似於bigint的類型,在udf的evaluate方法中如何返回,改成long?

3.UDAF開發

Hive的UDAF分為兩種:

  • Simple。即繼承org.apache.hadoop.hive.ql.exec.UDAF類,並在派生類中以靜態內部類的方式實現 org.apache.hadoop.hive.ql.exec.UDAFEvaluator接口。這種方式簡單直接,但是在使用過程中需要依賴JAVA反射機制,因此性能相對較低。在Hive源碼包org.apache.hadoop.hive.contrib.udaf.example中包含幾個示例。可以直接參閱。但是這些接口已經被注解為Deprecated,建議不要使用這種方式開發新的UDAF函數。
  • Generic。這是Hive社區推薦的新的寫法,以抽象類代替原有的接口。新的抽象類 org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver替代老的UDAF接口,新的抽象類 org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator替代老的UDAFEvaluator接口。

UDAF的運行流程簡介

其實hive就是對MapReduce的一層包裝,所以我們寫UDAF的時候可以通過對應到Map Reduce進行理解。

UDAF的四個階段

  • PARTIAL1:原始數據到部分聚合,調用iterate和terminatePartial --> map階段
  • PARTIAL2: 部分聚合到部分聚合,調用merge和terminatePartial --> combine階段
  • FINAL: 部分聚合到完全聚合,調用merge和terminate --> reduce階段
  • COMPLETE: 從原始數據直接到完全聚合 --> map階段,並且沒有reduce
    除了上面提到的iterate,merge,terminatePartial以外,還有init(初始化並返回,返回值的類型) ,getNewAggregationBuffer(獲取新的buffer,也就是方法間傳遞參數的對象),reset(重置buffer對象)

UDAF需要實現的方法

在四個階段中,我們可以得知,需要實現7個方法

  • init:這個方法不寫會報錯:fatal: nullpointexception null
  • getNewAggregationBuffer:我們定義一個對象,在這個方法里面實現該對象以用於參數傳遞
  • reset:重置buffer對象
  • iterate:類似於map()
  • merge:類似於Reduce()
  • terminatePartial:返回部分聚合數據的持久化對象。因為調用這個方法時,說明已經是map或者combine的結束了,必須將數據持久化以后交給reduce(也就是調用merge)進行處理。
  • terminate:結束,生成最終結果。

對象實例

  • 現要求實現某個字段以","進行提取的函數wm_concat,比如.

table:customers

name gender age
張三 23
李氏 26
王婆 54
尼古拉斯-趙六 43

select wm_concat(name) from customers;
返回的是 "張三,李氏,王婆,尼古拉斯-趙六"

  • 代碼如下:
package com.maihaoche.baiyan.UDF;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;

public class Wm_concat extends AbstractGenericUDAFResolver{
    @Override
    public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
        return new GenericUDAFWmconcatEvaluator();
    }



    public static class GenericUDAFWmconcatEvaluator extends GenericUDAFEvaluator{

        static class stringagg implements AggregationBuffer{
            StringBuffer stringBuffer=new StringBuffer();
            String flag=null;
            boolean empty;
        }

        @Override
        /*
        init方法不寫的話會報nullpointexception null 的錯誤
         */
            public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            super.init(m, parameters);
            if(parameters.length!=1){
                throw new UDFArgumentException("Argument Exception");
            }
            return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
        }

        /*
        獲取存放中間結果的對象
         */
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
                stringagg sa=new stringagg();
                String str=null;
                return sa;
        }
        public void reset(AggregationBuffer aggregationBuffer) throws HiveException {
                stringagg sa=(stringagg)aggregationBuffer;
                sa.empty=true;
                sa.stringBuffer.delete(0,sa.stringBuffer.length());
        }

        public void iterate(AggregationBuffer aggregationBuffer, Object[] objects) throws HiveException {
            if(objects.length !=1 ){
                throw new UDFArgumentException("Argument Exception");
            }
            this.merge(aggregationBuffer,objects[0]);
        }

        public Object terminatePartial(AggregationBuffer aggregationBuffer) throws HiveException {
            return this.terminate(aggregationBuffer);
        }

        public void merge(AggregationBuffer aggregationBuffer, Object o) throws HiveException {
                stringagg sa=(stringagg)aggregationBuffer;
                if(o!=null){
                    sa.stringBuffer.append(o.toString());
                    sa.empty=false;
                }
        }

        public Object terminate(AggregationBuffer aggregationBuffer) throws HiveException {
            stringagg sa=(stringagg)aggregationBuffer;
            if(sa.empty==true) return null;
            int length=sa.stringBuffer.toString().length();
            return new Text(sa.stringBuffer.toString().substring(0,length-1));//通過substring解決最后一個字段跟着的分隔符
        }
    }
}

很明顯,我們可以看出來,AbstractGenericUDAFResolver就是一層皮,我們可以在里面加一寫驗證條件,比如:
檢測下面就進行檢測是否有2個參數以及判斷數據類型

 public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo parameters) throws SemanticException {
    if (parameters.length != 2) {
      throw new UDFArgumentTypeException(parameters.length - 1,
          "Please specify exactly two arguments.");
    }

    // validate the first parameter, which is the expression to compute over
    if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
      throw new UDFArgumentTypeException(0,
          "Only primitive type arguments are accepted but "
          + parameters[0].getTypeName() + " was passed as parameter 1.");
    }
    switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {
    case BYTE:
    case SHORT:
    case INT:
    case LONG:
    case FLOAT:
    case DOUBLE:
    case TIMESTAMP:
    case DECIMAL:
      break;
    case STRING:
    case BOOLEAN:
    case DATE:
    default:
      throw new UDFArgumentTypeException(0,
          "Only numeric type arguments are accepted but "
          + parameters[0].getTypeName() + " was passed as parameter 1.");
    }

待解決:如何寫希望輸入的是兩個參數的,比如現在希望自己指定wm_concat的分割符。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM