1. 概述
UDF函數其實就是一個簡單的函數,執行過程就是在Hive轉換成MapReduce程序后,執行java方法,類似於像MapReduce執行過程中加入一個插件,方便擴展。UDF只能實現一進一出的操作,如果需要實現多進一出,則需要實現UDAF。
Hive可以允許用戶編寫自己定義的函數UDF,來在查詢中使用。
2. UDF類型
Hive中有3種UDF:
UDF:操作單個數據行,產生單個數據行;
UDAF:操作多個數據行,產生一個數據行;
UDTF:操作一個數據行,產生多個數據行一個表作為輸出;
3. 如何構建UDF
用戶構建的UDF使用過程如下:
- 繼承UDF或者UDAF或者UDTF,實現特定的方法;
- 將寫好的類打包為jar,如LowerUDF.jar;
- 進入到Hive shell環境中,輸入命令add jar /home/hadoop/LowerUDF.jar注冊該jar文件;或者把LowerUDF.jar上傳到hdfs,hadoop fs -put LowerUDF.jar /home/hadoop/LowerUDF.jar,再輸入命令add jar hdfs://hadoop01:8020/user/home/LowerUDF.jar;
- 為該類起一個別名,create temporary function lower_udf as 'UDF.lowerUDF';注意,這里UDF只是為這個Hive會話臨時定義的;
- 在select中使用lower_udf();
4. 自定義UDF
4.1 pom.xml依賴
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> </dependency>
4.2 編寫UDF代碼
package UDF; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.io.Text; public class LowerUDF extends UDF{ /** * 1. Implement one or more methods named "evaluate" which will be called by Hive. * * 2. "evaluate" should never be a void method. However it can return "null" if needed. */ public Text evaluate(Text str){ // input parameter validate if(null == str){ return null ; } // validate if(StringUtils.isBlank(str.toString())){ return null ; } // lower return new Text(str.toString().toLowerCase()) ; } }
4.3 打包
注意:工程所用的jdk要與Hadoop集群使用的jdk是同一個版本。
4.4 注冊UDF
hive> add jar /home/hadoop/LowerUDF.jar hive> create temporary function lower_udf as "UDF.LowerUDF";
4.5 測試
hive> create table test (id int ,name string); hive> insert into test values(1,'TEST'); hive> select lower_udf(name) from test; OK test
注意事項:
- 一個用戶UDF必須org.apache.hadoop.hive.ql.exec.UDF;
- 一個UDF必須要包含有evaluate()方法,但是該方法並不存在於UDF中。evaluate的參數個數以及類型都是用戶自定義的。在使用的時候,Hive會調用UDF的evaluate()方法。
5. 自定義UDAF
UDAF是聚合函數,相當於reduce,將表中多行數據聚合成一行結果
UDAF是需要在hive的sql語句和group by聯合使用,hive的group by 對於每個分組,只能返回一條記錄,這點和mysql不一樣。
開發通用UDAF有兩個步驟:
- resolver負責類型檢查,操作符重載,里面創建evaluator類對象;
- evaluator真正實現UDAF的邏輯;
5.1 繼承AbstractAggregationBuffer和實現evaluator
package cn.wisec.meerkat.analyseOnHive; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; /** * @Author: Yang JianQiu * @Date: 2019/7/16 10:48 * * 開發通用UDAF有兩個步驟: * 第一個是編寫resolver類,第二個是編寫evaluator類 * resolver負責類型檢查,操作符重載 * evaluator真正實現UDAF的邏輯。 * 通常來說,頂層UDAF類繼承{@link org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2} * 里面編寫嵌套類evaluator實現UDAF的邏輯 * * resolver通常繼承org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2,但是更建議繼承AbstractGenericUDAFResolver,隔離將來hive接口的變化 * GenericUDAFResolver和GenericUDAFResolver2接口的區別是: 后面的允許evaluator實現利用GenericUDAFParameterInfo可以訪問更多的信息,例如DISTINCT限定符,通配符(*)。 */ public class CountUDAF extends AbstractGenericUDAFResolver { /** * 構建方法,傳入的是函數指定的列 * @param params * @return * @throws SemanticException */ @Override public GenericUDAFEvaluator getEvaluator(TypeInfo[] params) throws SemanticException { if (params.length > 1){ throw new UDFArgumentLengthException("Exactly one argument is expected"); } return new CountUDAFEvaluator(); } /** * 這個構建方法可以判輸入的參數是*號或者distinct * @param info * @return * @throws SemanticException */ @Override public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException { ObjectInspector[] parameters = info.getParameterObjectInspectors(); boolean isAllColumns = false; if (parameters.length == 0){ if (!info.isAllColumns()){ throw new UDFArgumentException("Argument expected"); } if (info.isDistinct()){ throw new UDFArgumentException("DISTINCT not supported with"); } isAllColumns = true; }else if (parameters.length != 1){ throw new UDFArgumentLengthException("Exactly one argument is expected."); } return new CountUDAFEvaluator(isAllColumns); } /** * GenericUDAFEvaluator類實現UDAF的邏輯 * * enum Mode運行階段枚舉類 * PARTIAL1; * 這個是mapreduce的map階段:從原始數據到部分數據聚合 * 將會調用iterate()和terminatePartial() * * PARTIAL2: * 這個是mapreduce的map端的Combiner階段,負責在map端合並map的數據:部分數據聚合 * 將會調用merge()和terminatePartial() * * FINAL: * mapreduce的reduce階段:從部分數據的聚合到完全聚合 * 將會調用merge()和terminate() * * COMPLETE: * 如果出現了這個階段,表示mapreduce只有map,沒有reduce,所以map端就直接出結果了;從原始數據直接到完全聚合 * 將會調用iterate()和terminate() */ public static class CountUDAFEvaluator extends GenericUDAFEvaluator{ private boolean isAllColumns = false; /** * 合並結果的類型 */ private LongObjectInspector aggOI; private LongWritable result; public CountUDAFEvaluator() { } public CountUDAFEvaluator(boolean isAllColumns) { this.isAllColumns = isAllColumns; } /** * 負責初始化計算函數並設置它的內部狀態,result是存放最終結果的 * @param m 代表此時在map-reduce哪個階段,因為不同的階段可能在不同的機器上執行,需要重新創建對象partial1,partial2,final,complete * @param parameters partial1或complete階段傳入的parameters類型是原始輸入數據的類型 * partial2和final階段(執行合並)的parameters類型是partial-aggregations(既合並返回結果的類型),此時parameters長度肯定只有1了 * @return ObjectInspector * 在partial1和partial2階段返回局部合並結果的類型,既terminatePartial的類型 * 在complete或final階段返回總結果的類型,既terminate的類型 * @throws HiveException */ @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters); //當是combiner和reduce階段時,獲取合並結果的類型,因為需要執行merge方法 //merge方法需要部分合並的結果類型來取得值 if (m == Mode.PARTIAL2 || m == Mode.FINAL){ aggOI = (LongObjectInspector) parameters[0]; } //保存總結果 result = new LongWritable(0); //局部合並結果的類型和總合並結果的類型都是long return PrimitiveObjectInspectorFactory.writableLongObjectInspector; } /** * 定義一個AbstractAggregationBuffer類來緩存合並值 */ static class CountAgg extends AbstractAggregationBuffer{ long value; /** * 返回類型占的字節數,long為8 * @return */ @Override public int estimate() { return JavaDataModel.PRIMITIVES2; } } /** * 創建緩存合並值的buffer * @return * @throws HiveException */ @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { CountAgg countAgg = new CountAgg(); reset(countAgg); return countAgg; } /** * 重置合並值 * @param agg * @throws HiveException */ @Override public void reset(AggregationBuffer agg) throws HiveException { ((CountAgg) agg).value = 0; } /** * map時執行,迭代數據 * @param agg * @param parameters * @throws HiveException */ @Override public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { //parameters為輸入數據 //parameters == null means the input table/split is empty if (parameters == null){ return; } if (isAllColumns){ ((CountAgg) agg).value ++; }else { boolean countThisRow = true; for (Object nextParam: parameters){ if (nextParam == null){ countThisRow = false; break; } } if (countThisRow){ ((CountAgg) agg).value++; } } } /** * 返回buffer中部分聚合結果,map結束和combiner結束執行 * @param agg * @return * @throws HiveException */ @Override public Object terminatePartial(AggregationBuffer agg) throws HiveException { return terminate(agg); } /** * 合並結果,combiner或reduce時執行 * @param agg * @param partial * @throws HiveException */ @Override public void merge(AggregationBuffer agg, Object partial) throws HiveException { if (partial != null){ //累加部分聚合的結果 ((CountAgg) agg).value += aggOI.get(partial); } } /** * 返回buffer中總結果,reduce結束執行或者沒有reduce時map結束執行 * @param agg * @return * @throws HiveException */ @Override public Object terminate(AggregationBuffer agg) throws HiveException { //每一組執行一次(group by) result.set(((CountAgg) agg).value); //返回writable類型 return result; } } }
使用:
hive> add jar /root/udf.jar hive> create temporary function mycount as 'udf.CountUDAF' hive> select call, mycount(*) as cn from beauty group by call order by cn desc hive> select tag, mycount(tag) as cn from beauty lateral view explode(tags) lve_beauty as tag group by tag order by cn desc
6. 自定義UDTF
UDTF用來解決輸入一行輸出多行的需求。
限制:
- No other expressions are allowed in SELECT不能和其他字段一起使用:SELECT pageid,explode(adid_list) AS myCol... is not supported
- UDTF's can't be nested 不能嵌套:SELECT explode(explode(adid_list)) AS myCol..... is not supported
- GROUP BY/ CLUSTER BY/ DISTRIBUTE BY/ SORT BY is not supported:SELECT explode(adid_list) AS myCol.....GROUP BY myCol is not supported
繼承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF,實現initialize,process,close三個方法。
執行過程:
- UDTF首先會調用initialize方法,此方法返回UDTF的輸出行的信息(輸出列個數與類型);
- 初始化完成后,會調用process方法,真正的處理過程在process函數中:在process中,每一次forward()調用產生一行;如果產生多列可以將多個列的值放在一個數組中,然后將該數組傳入到forward()函數。
- 最后close()方法調用,對需要清理的方法進行清理。
下面是實現一個explode函數的例子:
explode會將一個數組中每個元素都輸出一行,map中每對key-value都輸出一行,實現對數據展開
package cn.wisec.meerkat.analyseOnHive; import org.apache.hadoop.hive.ql.exec.TaskExecutionException; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.*; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * @Author: Yang JianQiu * @Date: 2019/7/16 17:08 */ public class MyExplodeUDTF extends GenericUDTF { private transient ObjectInspector inputOI = null; /** * 初始化 * 構建一個StructObjectInspector類型用於輸出 * 其中struct的字段構成輸出的一行 * 字段名稱不重要,因為它們將被用戶提供的列別名覆蓋 * @param argOIs * @return * @throws UDFArgumentException */ @Override public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { //得到結構體的字段 List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs(); ObjectInspector[] udfInputOIs = new ObjectInspector[inputFields.size()]; for (int i = 0; i < inputFields.size(); i++){ //字段類型 udfInputOIs[i] = inputFields.get(i).getFieldObjectInspector(); } if (udfInputOIs.length != 1){ throw new UDFArgumentLengthException("explode() takes only one argument"); } List<String> fieldNames = new ArrayList<>(); List<ObjectInspector> fieldOIs = new ArrayList<>(); switch (udfInputOIs[0].getCategory()){ case LIST: inputOI = udfInputOIs[0]; //指定list生成的列名,可在as后覆寫 fieldNames.add("col"); //獲取list元素的類型 fieldOIs.add(((ListObjectInspector) inputOI).getListElementObjectInspector()); break; case MAP: inputOI = udfInputOIs[0]; //指定map中key的生成的列名,可在as后覆寫 fieldNames.add("key"); //指定map中value的生成的列名,可在as后覆寫 fieldNames.add("value"); //得到map中key的類型 fieldOIs.add(((MapObjectInspector)inputOI).getMapKeyObjectInspector()); //得到map中value的類型 fieldOIs.add(((MapObjectInspector)inputOI).getMapValueObjectInspector()); break; default: throw new UDFArgumentException("explode() takes an array or a map as a parameter"); } //創建一個Struct類型返回 return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } //輸出list private transient Object[] forwardListObj = new Object[1]; //輸出map private transient Object[] forwardMapObj = new Object[2]; /** * 每行執行一次,輸入數據args * 每調用forward,輸出一行 * @param args * @throws HiveException */ @Override public void process(Object[] args) throws HiveException { switch (inputOI.getCategory()){ case LIST: ListObjectInspector listOI = (ListObjectInspector) inputOI; List<?> list = listOI.getList(args[0]); if (list == null){ return; } //list中每個元素輸出一行 for (Object o: list){ forwardListObj[0] = o; forward(forwardListObj); } break; case MAP: MapObjectInspector mapOI = (MapObjectInspector) inputOI; Map<?, ?> map = mapOI.getMap(args[0]); if (map == null){ return; } //map中每一對輸出一行 for (Map.Entry<?, ?> entry: map.entrySet()){ forwardMapObj[0] = entry.getKey(); forwardMapObj[1] = entry.getValue(); forward(forwardMapObj); } break; default: throw new TaskExecutionException("explode() can only operate on an array or a map"); } } @Override public void close() throws HiveException { } }
使用:
hive> add jar /root/udtf.jar hive> create temporary function myexplode as 'udf.MyExplodeUDTF' hive> select myexplode(tags) as tag from beauty hive> select myexplode(props) as (k,v) from beauty hive> select tag, count(tag) as cn from beauty lateral view myexplode(tags) lve_beauty as tag group by tag order by cn desc
【參考資料】
https://blog.csdn.net/wypersist/article/details/80314352
https://blog.csdn.net/zmywei_20160707/article/details/81698542