Hive UDF函數構建


1. 概述

   UDF函數其實就是一個簡單的函數,執行過程就是在Hive轉換成MapReduce程序后,執行java方法,類似於像MapReduce執行過程中加入一個插件,方便擴展。UDF只能實現一進一出的操作,如果需要實現多進一出,則需要實現UDAF。

  Hive可以允許用戶編寫自己定義的函數UDF,來在查詢中使用。

2. UDF類型

  Hive中有3種UDF:

  UDF:操作單個數據行,產生單個數據行;

  UDAF:操作多個數據行,產生一個數據行;

  UDTF:操作一個數據行,產生多個數據行一個表作為輸出;

3. 如何構建UDF

  用戶構建的UDF使用過程如下:

  1. 繼承UDF或者UDAF或者UDTF,實現特定的方法;
  2. 將寫好的類打包為jar,如LowerUDF.jar;
  3. 進入到Hive shell環境中,輸入命令add jar /home/hadoop/LowerUDF.jar注冊該jar文件;或者把LowerUDF.jar上傳到hdfs,hadoop fs -put LowerUDF.jar /home/hadoop/LowerUDF.jar,再輸入命令add jar hdfs://hadoop01:8020/user/home/LowerUDF.jar;
  4. 為該類起一個別名,create temporary function lower_udf as 'UDF.lowerUDF';注意,這里UDF只是為這個Hive會話臨時定義的;
  5. 在select中使用lower_udf();

4. 自定義UDF

4.1 pom.xml依賴

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>1.2.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.7.3</version>
</dependency>

4.2 編寫UDF代碼

package UDF;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class LowerUDF extends UDF{
    /**
     * 1. Implement one or more methods named "evaluate" which will be called by Hive.
     *
     * 2. "evaluate" should never be a void method. However it can return "null" if needed.
     */
    public Text evaluate(Text str){
        // input parameter validate
        if(null == str){
            return null ;
        }

        // validate
        if(StringUtils.isBlank(str.toString())){
            return null ;
        }

        // lower
        return new Text(str.toString().toLowerCase()) ;
    }

}

4.3 打包

  注意:工程所用的jdk要與Hadoop集群使用的jdk是同一個版本。

4.4 注冊UDF

hive> add jar /home/hadoop/LowerUDF.jar
hive> create temporary function lower_udf as "UDF.LowerUDF";

4.5 測試

hive> create table test (id int ,name string);
hive> insert into test values(1,'TEST');
hive> select lower_udf(name) from test;
OK
test

注意事項:

  1. 一個用戶UDF必須org.apache.hadoop.hive.ql.exec.UDF;
  2. 一個UDF必須要包含有evaluate()方法,但是該方法並不存在於UDF中。evaluate的參數個數以及類型都是用戶自定義的。在使用的時候,Hive會調用UDF的evaluate()方法。

5. 自定義UDAF

  UDAF是聚合函數,相當於reduce,將表中多行數據聚合成一行結果

  UDAF是需要在hive的sql語句和group by聯合使用,hive的group by 對於每個分組,只能返回一條記錄,這點和mysql不一樣。

   開發通用UDAF有兩個步驟:

  1. resolver負責類型檢查,操作符重載,里面創建evaluator類對象;
  2. evaluator真正實現UDAF的邏輯;

5.1 繼承AbstractAggregationBuffer和實現evaluator

package cn.wisec.meerkat.analyseOnHive;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;

/**
 * @Author: Yang JianQiu
 * @Date: 2019/7/16 10:48
 *
 * 開發通用UDAF有兩個步驟:
 * 第一個是編寫resolver類,第二個是編寫evaluator類
 * resolver負責類型檢查,操作符重載
 * evaluator真正實現UDAF的邏輯。
 * 通常來說,頂層UDAF類繼承{@link org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2}
 * 里面編寫嵌套類evaluator實現UDAF的邏輯
 *
 * resolver通常繼承org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2,但是更建議繼承AbstractGenericUDAFResolver,隔離將來hive接口的變化
 * GenericUDAFResolver和GenericUDAFResolver2接口的區別是:  后面的允許evaluator實現利用GenericUDAFParameterInfo可以訪問更多的信息,例如DISTINCT限定符,通配符(*)。
 */
public class CountUDAF extends AbstractGenericUDAFResolver {

    /**
     * 構建方法,傳入的是函數指定的列
     * @param params
     * @return
     * @throws SemanticException
     */
    @Override
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] params) throws SemanticException {
        if (params.length > 1){
            throw new UDFArgumentLengthException("Exactly one argument is expected");
        }
        return new CountUDAFEvaluator();
    }

    /**
     * 這個構建方法可以判輸入的參數是*號或者distinct
     * @param info
     * @return
     * @throws SemanticException
     */
    @Override
    public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {

        ObjectInspector[] parameters = info.getParameterObjectInspectors();
        boolean isAllColumns = false;
        if (parameters.length == 0){
            if (!info.isAllColumns()){
                throw new UDFArgumentException("Argument expected");
            }

            if (info.isDistinct()){
                throw new UDFArgumentException("DISTINCT not supported with");
            }
            isAllColumns = true;
        }else if (parameters.length != 1){
            throw new UDFArgumentLengthException("Exactly one argument is expected.");
        }
        return new CountUDAFEvaluator(isAllColumns);
    }

    /**
     * GenericUDAFEvaluator類實現UDAF的邏輯
     *
     * enum Mode運行階段枚舉類
     * PARTIAL1;
     * 這個是mapreduce的map階段:從原始數據到部分數據聚合
     * 將會調用iterate()和terminatePartial()
     *
     * PARTIAL2:
     * 這個是mapreduce的map端的Combiner階段,負責在map端合並map的數據:部分數據聚合
     * 將會調用merge()和terminatePartial()
     *
     * FINAL:
     * mapreduce的reduce階段:從部分數據的聚合到完全聚合
     * 將會調用merge()和terminate()
     *
     * COMPLETE:
     * 如果出現了這個階段,表示mapreduce只有map,沒有reduce,所以map端就直接出結果了;從原始數據直接到完全聚合
     * 將會調用iterate()和terminate()
     */
    public static class CountUDAFEvaluator extends GenericUDAFEvaluator{

        private boolean isAllColumns = false;

        /**
         * 合並結果的類型
         */
        private LongObjectInspector aggOI;

        private LongWritable result;

        public CountUDAFEvaluator() {
        }

        public CountUDAFEvaluator(boolean isAllColumns) {
            this.isAllColumns = isAllColumns;
        }

        /**
         * 負責初始化計算函數並設置它的內部狀態,result是存放最終結果的
         * @param m 代表此時在map-reduce哪個階段,因為不同的階段可能在不同的機器上執行,需要重新創建對象partial1,partial2,final,complete
         * @param parameters partial1或complete階段傳入的parameters類型是原始輸入數據的類型
         *                   partial2和final階段(執行合並)的parameters類型是partial-aggregations(既合並返回結果的類型),此時parameters長度肯定只有1了
         * @return ObjectInspector
         *  在partial1和partial2階段返回局部合並結果的類型,既terminatePartial的類型
         *  在complete或final階段返回總結果的類型,既terminate的類型
         * @throws HiveException
         */
        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            super.init(m, parameters);
            //當是combiner和reduce階段時,獲取合並結果的類型,因為需要執行merge方法
            //merge方法需要部分合並的結果類型來取得值
            if (m == Mode.PARTIAL2 || m == Mode.FINAL){
                aggOI = (LongObjectInspector) parameters[0];
            }

            //保存總結果
            result = new LongWritable(0);
            //局部合並結果的類型和總合並結果的類型都是long
            return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
        }

        /**
         * 定義一個AbstractAggregationBuffer類來緩存合並值
         */
        static class CountAgg extends AbstractAggregationBuffer{
            long value;

            /**
             * 返回類型占的字節數,long為8
             * @return
             */
            @Override
            public int estimate() {
                return JavaDataModel.PRIMITIVES2;
            }
        }

        /**
         * 創建緩存合並值的buffer
         * @return
         * @throws HiveException
         */
        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            CountAgg countAgg = new CountAgg();
            reset(countAgg);
            return countAgg;
        }

        /**
         * 重置合並值
         * @param agg
         * @throws HiveException
         */
        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
            ((CountAgg) agg).value = 0;
        }

        /**
         * map時執行,迭代數據
         * @param agg
         * @param parameters
         * @throws HiveException
         */
        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
            //parameters為輸入數據
            //parameters == null means the input table/split is empty
            if (parameters == null){
                return;
            }
            if (isAllColumns){
                ((CountAgg) agg).value ++;
            }else {
                boolean countThisRow = true;
                for (Object nextParam: parameters){
                    if (nextParam == null){
                        countThisRow = false;
                        break;
                    }
                }
                if (countThisRow){
                    ((CountAgg) agg).value++;
                }
            }
        }

        /**
         * 返回buffer中部分聚合結果,map結束和combiner結束執行
         * @param agg
         * @return
         * @throws HiveException
         */
        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
            return terminate(agg);
        }

        /**
         * 合並結果,combiner或reduce時執行
         * @param agg
         * @param partial
         * @throws HiveException
         */
        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
            if (partial != null){
                //累加部分聚合的結果
                ((CountAgg) agg).value += aggOI.get(partial);
            }
        }

        /**
         * 返回buffer中總結果,reduce結束執行或者沒有reduce時map結束執行
         * @param agg
         * @return
         * @throws HiveException
         */
        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            //每一組執行一次(group by)
            result.set(((CountAgg) agg).value);
            //返回writable類型
            return result;
        }
    }
}

  使用:

hive> add jar /root/udf.jar
hive> create temporary function mycount as 'udf.CountUDAF'
hive> select call, mycount(*) as cn from beauty group by call order by cn desc
hive> select tag, mycount(tag) as cn from beauty lateral view explode(tags) lve_beauty as tag group by tag order by cn desc

6. 自定義UDTF

  UDTF用來解決輸入一行輸出多行的需求。

  限制:

  1. No other expressions are allowed in SELECT不能和其他字段一起使用:SELECT pageid,explode(adid_list) AS myCol... is not supported
  2. UDTF's can't be nested 不能嵌套:SELECT explode(explode(adid_list)) AS myCol..... is not supported
  3. GROUP BY/ CLUSTER BY/ DISTRIBUTE BY/ SORT BY is not supported:SELECT explode(adid_list) AS myCol.....GROUP BY myCol is not supported

  繼承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF,實現initialize,process,close三個方法。

  執行過程:

  1. UDTF首先會調用initialize方法,此方法返回UDTF的輸出行的信息(輸出列個數與類型);
  2. 初始化完成后,會調用process方法,真正的處理過程在process函數中:在process中,每一次forward()調用產生一行;如果產生多列可以將多個列的值放在一個數組中,然后將該數組傳入到forward()函數。
  3. 最后close()方法調用,對需要清理的方法進行清理。

下面是實現一個explode函數的例子:

  explode會將一個數組中每個元素都輸出一行,map中每對key-value都輸出一行,實現對數據展開

package cn.wisec.meerkat.analyseOnHive;

import org.apache.hadoop.hive.ql.exec.TaskExecutionException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @Author: Yang JianQiu
 * @Date: 2019/7/16 17:08
 */
public class MyExplodeUDTF extends GenericUDTF {

    private transient ObjectInspector inputOI = null;

    /**
     * 初始化
     * 構建一個StructObjectInspector類型用於輸出
     * 其中struct的字段構成輸出的一行
     * 字段名稱不重要,因為它們將被用戶提供的列別名覆蓋
     * @param argOIs
     * @return
     * @throws UDFArgumentException
     */
    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        //得到結構體的字段
        List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();
        ObjectInspector[] udfInputOIs = new ObjectInspector[inputFields.size()];
        for (int i = 0; i < inputFields.size(); i++){
            //字段類型
            udfInputOIs[i] = inputFields.get(i).getFieldObjectInspector();
        }

        if (udfInputOIs.length != 1){
            throw new UDFArgumentLengthException("explode() takes only one argument");
        }

        List<String> fieldNames = new ArrayList<>();
        List<ObjectInspector> fieldOIs = new ArrayList<>();
        switch (udfInputOIs[0].getCategory()){
            case LIST:
                inputOI = udfInputOIs[0];
                //指定list生成的列名,可在as后覆寫
                fieldNames.add("col");
                //獲取list元素的類型
                fieldOIs.add(((ListObjectInspector) inputOI).getListElementObjectInspector());
                break;
            case MAP:
                inputOI = udfInputOIs[0];
                //指定map中key的生成的列名,可在as后覆寫
                fieldNames.add("key");
                //指定map中value的生成的列名,可在as后覆寫
                fieldNames.add("value");
                //得到map中key的類型
                fieldOIs.add(((MapObjectInspector)inputOI).getMapKeyObjectInspector());
                //得到map中value的類型
                fieldOIs.add(((MapObjectInspector)inputOI).getMapValueObjectInspector());
                break;
            default:
                throw new UDFArgumentException("explode() takes an array or a map as a parameter");
        }
        //創建一個Struct類型返回
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }

    //輸出list
    private transient Object[] forwardListObj = new Object[1];
    //輸出map
    private transient Object[] forwardMapObj = new Object[2];

    /**
     * 每行執行一次,輸入數據args
     * 每調用forward,輸出一行
     * @param args
     * @throws HiveException
     */
    @Override
    public void process(Object[] args) throws HiveException {
        switch (inputOI.getCategory()){
            case LIST:
                ListObjectInspector listOI = (ListObjectInspector) inputOI;
                List<?> list = listOI.getList(args[0]);
                if (list == null){
                    return;
                }

                //list中每個元素輸出一行
                for (Object o: list){
                    forwardListObj[0] = o;
                    forward(forwardListObj);
                }
                break;
            case MAP:
                MapObjectInspector mapOI = (MapObjectInspector) inputOI;
                Map<?, ?> map = mapOI.getMap(args[0]);
                if (map == null){
                    return;
                }
                //map中每一對輸出一行
                for (Map.Entry<?, ?> entry: map.entrySet()){
                    forwardMapObj[0] = entry.getKey();
                    forwardMapObj[1] = entry.getValue();
                    forward(forwardMapObj);
                }
                break;
            default:
                throw new TaskExecutionException("explode() can only operate on an array or a map");
        }
    }

    @Override
    public void close() throws HiveException {

    }
}

  使用:

hive> add jar /root/udtf.jar
hive> create temporary function myexplode as 'udf.MyExplodeUDTF'
hive> select myexplode(tags) as tag from beauty
hive> select myexplode(props) as (k,v) from beauty
hive> select tag, count(tag) as cn from beauty lateral view myexplode(tags) lve_beauty as tag group by tag order by cn desc 

 

【參考資料】

https://blog.csdn.net/wypersist/article/details/80314352

https://blog.csdn.net/zmywei_20160707/article/details/81698542

https://imcoder.site/article/detail?aid=131


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM