關鍵字:Hive udf、UDF、GenericUDF
Hive中,除了提供豐富的內置函數(見[一起學Hive]之二–Hive函數大全-完整版)之外,還允許用戶使用Java開發自定義的UDF函數。
開發自定義UDF函數有兩種方式,一個是繼承org.apache.hadoop.hive.ql.exec.UDF,另一個是繼承org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
如果是針對簡單的數據類型(比如String、Integer等)可以使用UDF,如果是針對復雜的數據類型(比如Array、Map、 Struct等),可以使用GenericUDF,另外,GenericUDF還可以在函數開始之前和結束之后做一些初始化和關閉的處理操作。
UDF
使用UDF非常簡單,只需要繼承org.apache.hadoop.hive.ql.exec.UDF,並定義
public Object evaluate(Object args) {} 方法即可。
比如,下面的UDF函數實現了對一個String類型的字符串取HashMD5:
- package com.lxw1234.hive.udf;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.hadoop.hbase.util.MD5Hash;
- import org.apache.hadoop.hive.ql.exec.UDF;
- public class HashMd5 extends UDF {
- public String evaluate(String cookie) {
- return MD5Hash.getMD5AsHex(Bytes.toBytes(cookie));
- }
- }
將上面的HashMd5類打成jar包,udf.jar
使用時候,在Hive命令行執行:
- add jar file:///tmp/udf.jar;
- CREATE temporary function str_md5 as 'com.lxw1234.hive.udf.HashMd5';
- select str_md5(‘lxw1234.com’) from dual;
GenericUDF
繼承org.apache.hadoop.hive.ql.udf.generic.GenericUDF之后,需要重寫幾個重要的方法:
public void configure(MapredContext context) {}
//可選,該方法中可以通過context.getJobConf()獲取job執行時候的Configuration;
//可以通過Configuration傳遞參數值
public ObjectInspector initialize(ObjectInspector[] arguments)
//必選,該方法用於函數初始化操作,並定義函數的返回值類型;
//比如,在該方法中可以初始化對象實例,初始化數據庫鏈接,初始化讀取文件等;
public Object evaluate(DeferredObject[] args){}
//必選,函數處理的核心方法,用途和UDF中的evaluate一樣;
public String getDisplayString(String[] children)
//必選,顯示函數的幫助信息
public void close(){}
//可選,map完成后,執行關閉操作
下面的程序將一個以逗號分隔的字符串,切分成List,並返回:
- package com.lxw1234.hive.udf;
- import java.util.ArrayList;
- import java.util.Date;
- import org.apache.hadoop.hive.ql.exec.MapredContext;
- import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
- import org.apache.hadoop.hive.ql.metadata.HiveException;
- import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
- import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
- import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
- import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
- import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
- /**
- * http://lxw1234.com
- * lxw的大數據田地
- * @author lxw1234
- * 該函數用於將字符串切分成List,並返回
- */
- public class Lxw1234GenericUDF extends GenericUDF {
- private static int mapTasks = 0;
- private static String init = "";
- private transient ArrayList ret = new ArrayList();
- @Override
- public void configure(MapredContext context) {
- System.out.println(new Date() + "######## configure");
- if(null != context) {
- //從jobConf中獲取map數
- mapTasks = context.getJobConf().getNumMapTasks();
- }
- System.out.println(new Date() + "######## mapTasks [" + mapTasks + "] ..");
- }
- @Override
- public ObjectInspector initialize(ObjectInspector[] arguments)
- throws UDFArgumentException {
- System.out.println(new Date() + "######## initialize");
- //初始化文件系統,可以在這里初始化讀取文件等
- init = "init";
- //定義函數的返回類型為java的List
- ObjectInspector returnOI = PrimitiveObjectInspectorFactory
- .getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.STRING);
- return ObjectInspectorFactory.getStandardListObjectInspector(returnOI);
- }
- @Override
- public Object evaluate(DeferredObject[] args) throws HiveException {
- ret.clear();
- if(args.length < 1) return ret;
- //獲取第一個參數
- String str = args[0].get().toString();
- String[] s = str.split(",",-1);
- for(String word : s) {
- ret.add(word);
- }
- return ret;
- }
- @Override
- public String getDisplayString(String[] children) {
- return "Usage: Lxw1234GenericUDF(String str)";
- }
- }
其中,在configure方法中,獲取了本次任務的Map Task數目;
在initialize方法中,初始化了一個變量init,並定義了返回類型為java的List類型;
getDisplayString方法中顯示函數的用法;
evaluate是核心的邏輯處理;
需要特別注意的是,configure方法,“This is only called in runtime of MapRedTask”,該方法只有在運行map task時候才被執行。它和initialize用法不一樣,如果在initialize時候去使用MapredContext,則會報Null,因為此 時MapredContext還是Null。
上面的函數執行后,在MapReduce的日志中打印出了以下內容:
即在MapReduce階段,GenericUDF幾個方法的執行順序為:
configure–>initialize–>evaluate–>close