Hive UDTF開發指南


 

在這篇文章中,我們將深入了解用戶定義表函數(UDTF),該函數的實現是通過繼承org.apache.Hadoop.hive.ql.udf.generic.GenericUDTF這個抽象通用類,UDTF相對UDF更為復雜,但是通過它,我們讀入一個數據域,輸出多行多列,而UDF只能輸出單行單列。

 

代碼

文章中所有的代碼可以在這里找到:hive examplesGitHub repository

 

示例數據

首先先創建一張包含示例數據的表:people,該表只有name一列,該列中包含了一個或多個名字,該表數據保存在people.txt文件中。
[plain]  view plain  copy
 
  1. ~$ cat ./people.txt  
  2.   
  3. John Smith  
  4. John and Ann White  
  5. Ted Green  
  6. Dorothy  
把該文件上載到hdfs目錄/user/matthew/people中:
[plain]  view plain  copy
 
  1. hadoop fs -mkdir people  
  2. hadoop fs -put ./people.txt people  

下面要創建hive外部表,在hive shell中執行
[sql]  view plain  copy
 
  1. CREATE EXTERNAL TABLE people (name string)  
  2. ROW FORMAT DELIMITED FIELDS   
  3.     TERMINATED BY '\t'   
  4.     ESCAPED BY ''   
  5.     LINES TERMINATED BY '\n'  
  6. STORED AS TEXTFILE   
  7. LOCATION '/user/matthew/people';  

UDTF的輸出值

上一文章講解的UDF與GenericUDF函數是操作單個數據域。它們必須要返回一個值。但是這並不適用於所用的數據處理任務。Hive可以存儲許多類型的數據,而有時候我們並不想單數據域輸入、單數據域輸出。對於每一行的輸入,可能我們想輸出多行,又或是不輸出,舉個例子,想一下函數explode(一個hive內置函數)的作用。
同樣,可能我們也想輸出多列,而不是輸出單列。
以上所有的要求我們可以用UDTF去完成。
 

實例

首先我們先假設我們想清洗people這張表中的人名,這個新的表有:
1、姓和名 兩個分開的列
2、所有記錄都包含姓名
3、每條記錄或有包含多個人名(eg Nick and Nicole Smith)
為了達到這個實例目的,我們將實現以下API:
[java]  view plain  copy
 
  1. org.apache.hadoop.hive.ql.udf.generic.GenericUDTF  
我們將覆蓋以下三個方法:
[java]  view plain  copy
 
  1. //該方法中,我們將指定輸入輸出參數:輸入參數的ObjectInspector與輸出參數的StructObjectInspector  
  2. abstract StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException;   
  3.   
  4. //我們將處理一條輸入記錄,輸出若干條結果記錄  
  5. abstract void process(Object[] record) throws HiveException;  
  6.   
  7. //當沒有記錄處理的時候該方法會被調用,用來清理代碼或者產生額外的輸出  
  8. abstract void close() throws HiveException;  

代碼實現

 

完整代碼

[java]  view plain  copy
 
  1. public class NameParserGenericUDTF extends GenericUDTF {  
  2.   
  3.       private PrimitiveObjectInspector stringOI = null;  
  4.   
  5.       @Override  
  6.       public StructObjectInspector initialize(ObjectInspector[] args) UDFArgumentException {  
  7.   
  8.         if (args.length != 1) {  
  9.           throw new UDFArgumentException("NameParserGenericUDTF() takes exactly one argument");  
  10.         }  
  11.   
  12.         if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE  
  13.             && ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {  
  14.           throw new UDFArgumentException("NameParserGenericUDTF() takes a string as a parameter");  
  15.         }  
  16.           
  17.         // 輸入格式(inspectors)  
  18.         stringOI = (PrimitiveObjectInspector) args[0];  
  19.   
  20.         // 輸出格式(inspectors) -- 有兩個屬性的對象  
  21.         List<String> fieldNames = new ArrayList<String>(2);  
  22.         List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2);  
  23.         fieldNames.add("name");  
  24.         fieldNames.add("surname");  
  25.         fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);  
  26.         fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);  
  27.         return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);  
  28.       }  
  29.             
  30.       public ArrayList<Object[]> processInputRecord(String name){  
  31.             ArrayList<Object[]> result = new ArrayList<Object[]>();  
  32.             
  33.             // 忽略null值與空值  
  34.             if (name == null || name.isEmpty()) {  
  35.               return result;  
  36.             }  
  37.               
  38.             String[] tokens = name.split("\\s+");  
  39.               
  40.             if (tokens.length == 2){  
  41.                 result.add(new Object[] { tokens[0], tokens[1] });  
  42.             }else if (tokens.length == 4 && tokens[1].equals("and")){  
  43.                 result.add(new Object[] { tokens[0], tokens[3] });  
  44.                 result.add(new Object[] { tokens[2], tokens[3] });  
  45.             }  
  46.               
  47.             return result;  
  48.       }  
  49.         
  50.       @Override  
  51.       public void process(Object[] record) throws HiveException {  
  52.   
  53.         final String name = stringOI.getPrimitiveJavaObject(record[0]).toString();  
  54.   
  55.         ArrayList<Object[]> results = processInputRecord(name);  
  56.   
  57.         Iterator<Object[]> it = results.iterator();  
  58.           
  59.         while (it.hasNext()){  
  60.             Object[] r = it.next();  
  61.             forward(r);  
  62.         }  
  63.       }  
  64.   
  65.       @Override  
  66.       public void close() throws HiveException {  
  67.         // do nothing  
  68.       }  
  69. }  
以上代碼可以從:github目錄 check 下來。
 

代碼走讀

該UDTF以string類型作為參數,返回一個擁有兩個屬性的對象,與GenericUDF比較相似,指定輸入輸出數據格式(objectinspector),以便hive能識別輸入與輸出。

我們為輸入的string參數定義了數據格式PrimitiveObjectInspector
[java]  view plain  copy
 
  1. stringOI = (PrimitiveObjectInspector) args[0]  

定義輸出數據格式(objectinspectors) 需要我們先定義兩個屬性名稱,因為(objectinspectors)需要讀取每一個屬性(在這個實例中,兩個屬性都是string類型)。
[java]  view plain  copy
 
  1. List<String> fieldNames = new ArrayList<String>(2);  
  2. fieldNames.add("name");  
  3. fieldNames.add("surname");  
  4.   
  5. List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2);  
  6. fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);  
  7. fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);  
  8.   
  9. return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);  

我們主要的處理邏輯放在這個比較直觀的processInputRecord方法當中。分開邏輯處理有利我們進行更簡單的單元測試,而不用涉及到繁瑣的objectinspector。
最后,一旦得到結果就可以對其進行forward,把基注冊為hive處理后的輸出記錄對象。
[java]  view plain  copy
 
  1. while (it.hasNext()){  
  2.             Object[] r = it.next();  
  3.             forward(r);  
  4.     }  
  5. }  

使用該UDTF函數

我們可以在hive中創建我們自己的函數
[plain]  view plain  copy
 
  1. mvn package  
  2. cp target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar ./ext.jar  
然后在hive中使用
[sql]  view plain  copy
 
  1. ADD JAR ./ext.jar;  
  2.   
  3. CREATE TEMPORARY FUNCTION process_names as 'com.matthewrathbone.example.NameParserGenericUDTF';   
  4.   
  5. SELECT   
  6.     adTable.name,  
  7.     adTable.surname   
  8. FROM people   
  9.     lateral view process_names(name) adTable as name, surname;  
輸出
[plain]  view plain  copy
 
  1. OK  
  2. John    Smith  
  3. John    White  
  4. Ann     White  
  5. Ted     Green  

原文鏈接


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM