單機跑一個腳本做數據處理,但是由於輸入數據實在太大,處理過程中占用大量內存經常被系統殺死,所以考慮放在hive中做數據聚合。借此機會研究下UDAF怎么寫,把踏坑的經驗寫出來,希望可以幫助大家少走彎路!嗯。。。就醬紫。
經常聽UDF,那么UDAF是什么鬼? 就是聚合功能的UDF啦~ 比如hive內置的 count、sum、max、min、avg等。 但是內置的函數其實並不能滿足我們復雜的統計需求,就需要自己去實現一個方法。
有兩種實現方法,一種簡單的,一種通用的,簡單的方法據說有性能問題,我們就直接看通用的實現方法吧~
實現一個Generic UDAF有兩部分:
- resolver
- evaluator
這倆貨分別對應以下兩個抽象類:
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
resolver 主要用來做參數檢查和操作符重載,我們可以根據輸入參數的不同選擇相應的evaluator
evaluator 則是實現主要邏輯的地方,以靜態內部類的形式存在
#!Java public class GenericUDAFHistogramNumeric extends AbstractGenericUDAFResolver { static final Log LOG = LogFactory.getLog(GenericUDAFHistogramNumeric.class.getName()); @Override public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException { // 參數檢查 return new GenericUDAFHistogramNumericEvaluator(); } /** *這個靜態內部類就是寫我們自己邏輯的地方,這個類名根據需要改,這個是官方文檔寫的一個條形圖的例子 */ public static class GenericUDAFHistogramNumericEvaluator extends GenericUDAFEvaluator { // UDAF 邏輯 } }
這里需要介紹下這個例子的功能:hIve中的histogram_numeric函數,用來做直方圖的,比如我們要把年齡分30個桶構建直方圖就是SELECT histogram_numeric(age, 30) FROM employees;
下面我們繼續看例子
#!Java /** * 這個方法的參數新版的已經發生變化,直接就是TypeInfo [] parameters */ public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException { TypeInfo [] parameters = info.getParameters(); if (parameters.length != 2) { throw new UDFArgumentTypeException(parameters.length - 1, "Please specify exactly two arguments."); } // 檢查第一個參數類型,如果不是原始類型(基本類型)拋異常 if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + parameters[0].getTypeName() + " was passed as parameter 1."); } switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) { case BYTE: case SHORT: case INT: case LONG: case FLOAT: case DOUBLE: break; case STRING: case BOOLEAN: default: throw new UDFArgumentTypeException(0, "Only numeric type arguments are accepted but " + parameters[0].getTypeName() + " was passed as parameter 1."); } // 檢查第二個參數類型,條形圖桶編號,假設這里要求是整型數 if (parameters[1].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentTypeException(1, "Only primitive type arguments are accepted but " + parameters[1].getTypeName() + " was passed as parameter 2."); } // 如果不是整型,拋異常 if( ((PrimitiveTypeInfo) parameters[1]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.INT) { throw new UDFArgumentTypeException(1, "Only an integer argument is accepted as parameter 2, but " + parameters[1].getTypeName() + " was passed instead."); } //返回對應的處理類 return new GenericUDAFHistogramNumericEvaluator(); }
然后我們看看evaluator
#!Java public static class GenericUDAFHistogramNumericEvaluator extends GenericUDAFEvaluator { // For PARTIAL1 and COMPLETE: ObjectInspectors for original data,這倆貨是用來做類型轉換的 private PrimitiveObjectInspector inputOI; private PrimitiveObjectInspector nbinsOI; // For PARTIAL2 and FINAL: ObjectInspectors for partial aggregations (list of doubles) private StandardListObjectInspector loi; @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters); // return type goes here } @Override public Object terminatePartial(AggregationBuffer agg) throws HiveException { // return value goes here } @Override public Object terminate(AggregationBuffer agg) throws HiveException { // final return value goes here } @Override public void merge(AggregationBuffer agg, Object partial) throws HiveException { } @Override public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { } // Aggregation buffer definition and manipulation methods static class StdAgg implements AggregationBuffer { }; @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { } @Override public void reset(AggregationBuffer agg) throws HiveException { } }
理解這個類我們首先需要了解一些事情,寫過Hadoop MapReduce 的同學應該知道,一個MapReduce job 分為 map、combine、reduce三個階段,map階段是把函數應用於輸入數據的每一條,構建key-value供后續聚合;combine階段是在mapper端局部進行聚合,聚合后的中間結果傳給reduce函數,輸入和reduce函數是一致的,被稱為mapper端的reduce。了解了這個過程后,我們來看evaluator的幾個方法,基本上是對應這幾個階段。
| 方法 |
作用 |
| init |
初始化函數 |
| getNewAggregationBuffer |
用來生成一個緩存對象,記錄臨時聚合結果 |
| iterate |
一條一條處理數據,將結果存入緩存 |
| terminatePartial |
這個方法意味着map階段結束,將緩存中的數據持久化存儲。這里返回的數據類型僅支持java基本類型、基本類型包裝類、數組以及Hadoop的Writables, Lists和Map,不要使用自定義類型 |
| merge |
接收terminatePartial返回的結果,合並局部聚合結果 |
| terminate |
返回最終結果,可以在這里實現最后的求值,比如計算平均值 |
在hive中,用一個枚舉類Mode來表示不同階段
/** * Mode. *官方的注釋寫的挺詳細了^_^ */ public static enum Mode { /** * PARTIAL1: from original data to partial aggregation data: iterate() and * terminatePartial() will be called. */ PARTIAL1, /** * PARTIAL2: from partial aggregation data to partial aggregation data: * merge() and terminatePartial() will be called. */ PARTIAL2, /** * FINAL: from partial aggregation to full aggregation: merge() and * terminate() will be called. */ FINAL, /** * COMPLETE: from original data directly to full aggregation: iterate() and * terminate() will be called. */ COMPLETE };
嗯。。。 寫完后打個jar包出來,創建個臨時函數來使用既可以了
add jar hiveUDF.jar; create temporary function test_udf as 'com.test.xxxx';
select test_udf(a,b) from table2 groupy by xxx.
好啦,先寫這么多,我寫的時候數據類型用的大部分是java的,所以產生了各種類型轉換錯誤,后面打算看看Hadoop的內置類型~ 希望能幫到大家~
