hive學習筆記之十:用戶自定義聚合函數(UDAF)


歡迎訪問我的GitHub

這里分類和匯總了欣宸的全部原創(含配套源碼):https://github.com/zq2599/blog_demos

本篇概覽

  • 本文是《hive學習筆記》的第十篇,前文實踐過UDF的開發、部署、使用,那個UDF適用於一進一出的場景,例如將每條記錄的指定字段轉為大寫;
  • 除了一進一出,在使用group by的SQL中,多進一出也是常見場景,例如hive自帶的avg、sum都是多進一出,這個場景的自定義函數叫做用戶自定義聚合函數(User Defiend Aggregate Function,UDAF),UDAF的開發比一進一出要復雜一些,本篇文章就一起來實戰UDAF開發;
  • 本文開發的UDAF名為udf_fieldlength ,用於group by的時候,統計指定字段在每個分組中的總長度;

准備工作

  1. 在一些舊版的教程和文檔中,都會提到UDAF開發的關鍵是繼承UDAF.java;
  2. 打開hive-exec的1.2.2版本源碼,卻發現UDAF類已被注解為Deprecated
  3. UDAF類被廢棄后,推薦的替代品有兩種:實現GenericUDAFResolver2接口,或者繼承AbstractGenericUDAFResolver類;
  4. 現在新問題來了:上述兩種替代品,咱們在做UDAF的時候該用哪一種呢?
  5. 打開AbstractGenericUDAFResolver類的源碼瞅一眼,如下所示,是否有種恍然大悟的感覺,這個類自身就是GenericUDAFResolver2接口的實現類:
public abstract class AbstractGenericUDAFResolver
    implements GenericUDAFResolver2
{

  @SuppressWarnings("deprecation")
  @Override
  public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
    throws SemanticException {

    if (info.isAllColumns()) {
      throw new SemanticException(
          "The specified syntax for UDAF invocation is invalid.");
    }

    return getEvaluator(info.getParameters());
  }

  @Override
  public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) 
    throws SemanticException {
    throw new SemanticException(
          "This UDAF does not support the deprecated getEvaluator() method.");
  }
}
  1. 既然源碼都看了,也就沒啥好糾結的了,繼承父類還是實現接口都可以,您自己看着選吧,我這里選的是繼承AbstractGenericUDAFResolver類;

關於UDAF的四個階段

  • 在編碼前,要先了解UDAF的四個階段,定義在GenericUDAFEvaluator的Mode枚舉中:
  1. COMPLETE:如果mapreduce只有map而沒有reduce,就會進入這個階段;
  2. PARTIAL1:正常mapreduce的map階段;
  3. PARTIAL2:正常mapreduce的combiner階段;
  4. FINAL:正常mapreduce的reduce階段;

每個階段被調用的方法

  • 開發UDAF時,要繼承抽象類GenericUDAFEvaluator,里面有多個抽象方法,在不同的階段,會調用到這些方法中的一個或多個;
  • 下圖對每個階段調用了哪些方法說得很清楚:

在這里插入圖片描述

  • 下圖對順序執行的三個階段和涉及方法做了詳細說明:

在這里插入圖片描述

源碼下載

  1. 如果您不想編碼,可以在GitHub下載所有源碼,地址和鏈接信息如下表所示:
名稱 鏈接 備注
項目主頁 https://github.com/zq2599/blog_demos 該項目在GitHub上的主頁
git倉庫地址(https) https://github.com/zq2599/blog_demos.git 該項目源碼的倉庫地址,https協議
git倉庫地址(ssh) git@github.com:zq2599/blog_demos.git 該項目源碼的倉庫地址,ssh協議
  1. 這個git項目中有多個文件夾,本章的應用在hiveudf文件夾下,如下圖紅框所示:

在這里插入圖片描述

UDAF開發步驟簡述

開發UDAF分為以下幾步:

  1. 新建類FieldLengthAggregationBuffer,用於保存中間結果,該類需繼承AbstractAggregationBuffer;
  2. 新建類FieldLengthUDAFEvaluator,用於實現四個階段中會被調用的方法,該類需繼承GenericUDAFEvaluator;
  3. 新建類FieldLength,用於在hive中注冊UDAF,里面會實例化FieldLengthUDAFEvaluator,該類需繼承AbstractGenericUDAFResolver;
  4. 編譯構建,得到jar;
  5. 在hive添加jar;
  6. 在hive注冊函數;

接下來就按照上述步驟開始操作;

開發

  1. 打開前文新建的hiveudf工程,新建FieldLengthAggregationBuffer.java,這個類的作用是緩存中間計算結果,每次計算的結果都放入這里面,被傳遞給下個階段,其成員變量value用來保存累加數據:
package com.bolingcavalry.hiveudf.udaf;

import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.util.JavaDataModel;

public class FieldLengthAggregationBuffer extends GenericUDAFEvaluator.AbstractAggregationBuffer {

    private Integer value = 0;

    public Integer getValue() {
        return value;
    }

    public void setValue(Integer value) {
        this.value = value;
    }

    public void add(int addValue) {
        synchronized (value) {
            value += addValue;
        }
    }

    /**
     * 合並值緩沖區大小,這里是用來保存字符串長度,因此設為4byte
     * @return
     */
    @Override
    public int estimate() {
        return JavaDataModel.PRIMITIVES1;
    }
}
  1. 新建FieldLengthUDAFEvaluator.java,里面是整個UDAF邏輯實現,關鍵代碼已經添加了注釋,請結合前面的圖片來理解,核心思路是iterate將當前分組的字段處理完畢,merger把分散的數據合並起來,再由terminate決定當前分組計算結果:
package com.bolingcavalry.hiveudf.udaf;

import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;

/**
 * @Description: 這里是UDAF的實際處理類
 * @author: willzhao E-mail: zq2599@gmail.com
 * @date: 2020/11/4 9:57
 */
public class FieldLengthUDAFEvaluator extends GenericUDAFEvaluator {

    PrimitiveObjectInspector inputOI;

    ObjectInspector outputOI;

    PrimitiveObjectInspector integerOI;

    /**
     * 每個階段都會被執行的方法,
     * 這里面主要是把每個階段要用到的輸入輸出inspector好,其他方法被調用時就能直接使用了
     * @param m
     * @param parameters
     * @return
     * @throws HiveException
     */
    @Override
    public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
        super.init(m, parameters);

        // COMPLETE或者PARTIAL1,輸入的都是數據庫的原始數據
        if(Mode.PARTIAL1.equals(m) || Mode.COMPLETE.equals(m)) {
            inputOI = (PrimitiveObjectInspector) parameters[0];
        } else {
            // PARTIAL2和FINAL階段,都是基於前一個階段init返回值作為parameters入參
            integerOI = (PrimitiveObjectInspector) parameters[0];
        }

        outputOI = ObjectInspectorFactory.getReflectionObjectInspector(
                Integer.class,
                ObjectInspectorFactory.ObjectInspectorOptions.JAVA
        );

        // 給下一個階段用的,即告訴下一個階段,自己輸出數據的類型
        return outputOI;
    }

    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
        return new FieldLengthAggregationBuffer();
    }

    /**
     * 重置,將總數清理掉
     * @param agg
     * @throws HiveException
     */
    public void reset(AggregationBuffer agg) throws HiveException {
        ((FieldLengthAggregationBuffer)agg).setValue(0);
    }

    /**
     * 不斷被調用執行的方法,最終數據都保存在agg中
     * @param agg
     * @param parameters
     * @throws HiveException
     */
    public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
        if(null==parameters || parameters.length<1) {
            return;
        }

        Object javaObj = inputOI.getPrimitiveJavaObject(parameters[0]);

        ((FieldLengthAggregationBuffer)agg).add(String.valueOf(javaObj).length());
    }

    /**
     * group by的時候返回當前分組的最終結果
     * @param agg
     * @return
     * @throws HiveException
     */
    public Object terminate(AggregationBuffer agg) throws HiveException {
        return ((FieldLengthAggregationBuffer)agg).getValue();
    }

    /**
     * 當前階段結束時執行的方法,返回的是部分聚合的結果(map、combiner)
     * @param agg
     * @return
     * @throws HiveException
     */
    public Object terminatePartial(AggregationBuffer agg) throws HiveException {
        return terminate(agg);
    }

    /**
     * 合並數據,將總長度加入到緩存對象中(combiner或reduce)
     * @param agg
     * @param partial
     * @throws HiveException
     */
    public void merge(AggregationBuffer agg, Object partial) throws HiveException {

        ((FieldLengthAggregationBuffer) agg).add((Integer)integerOI.getPrimitiveJavaObject(partial));
    }
}
  1. 最后是FieldLength.java,該類注冊UDAF到hive時用到的,負責實例化FieldLengthUDAFEvaluator,給hive使用:
package com.bolingcavalry.hiveudf.udaf;

import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

public class FieldLength extends AbstractGenericUDAFResolver {
    @Override
    public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
        return new FieldLengthUDAFEvaluator();
    }

    @Override
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException {
        return new FieldLengthUDAFEvaluator();
    }
}

至此,編碼完成,接下來是部署和體驗;

部署和體驗

本次部署的注冊方式是臨時函數,如果您想注冊為永久函數,請參考前文;

  1. 在pom.xml所在目錄執行mvn clean package -U,即可編譯構建;
  2. 在target目錄得到文件hiveudf-1.0-SNAPSHOT.jar
  3. 上傳到hive服務器,我這里是放在/home/hadoop/udf目錄;
  4. 進入hive會話,執行以下命令添加jar:
add jar /home/hadoop/udf/hiveudf-1.0-SNAPSHOT.jar;
  1. 執行以下命令注冊:
create temporary function udf_fieldlength as 'com.bolingcavalry.hiveudf.udaf.FieldLength';
  1. 找一個適合執行group by的表試試,我這里是前面的文章中創建的address表,完整數據如下:
hive> select * from address;
OK
1	guangdong	guangzhou
2	guangdong	shenzhen
3	shanxi	xian
4	shanxi	hanzhong
6	jiangshu	nanjing
  1. 執行下面的SQL:
select province, count(city), udf_fieldlength(city) from address group by province;

執行結果如下,可見guangdong的guangzhou和shenzhen總長度為17,jiangsu的nanjing為7,shanxi的xian和hanzhong總長度12,符合預期:

Total MapReduce CPU Time Spent: 2 seconds 730 msec
OK
guangdong	2	17
jiangshu	1	7
shanxi	2	12
Time taken: 28.484 seconds, Fetched: 3 row(s)

至此,UDAF的學習和實踐就完成了,咱們掌握了多進一出的函數開發,由於涉及到多個階段和外部調用的邏輯,使得UDAF的開發難度略大,接下來的文章是一進多出的開發,會簡單一些。

你不孤單,欣宸原創一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 數據庫+中間件系列
  6. DevOps系列

歡迎關注公眾號:程序員欣宸

微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM