在這篇文章中,我們將深入了解用戶定義表函數(UDTF),該函數的實現是通過繼承org.apache.Hadoop.hive.ql.udf.generic.GenericUDTF這個抽象通用類,UDTF相對UDF更為復雜,但是通過它,我們讀入一個數據域,輸出多行多列,而UDF只能輸出單行單列。
代碼
文章中所有的代碼可以在這里找到:hive examples、GitHub repository
示例數據
首先先創建一張包含示例數據的表:people,該表只有name一列,該列中包含了一個或多個名字,該表數據保存在people.txt文件中。
- ~$ cat ./people.txt
- John Smith
- John and Ann White
- Ted Green
- Dorothy
- hadoop fs -mkdir people
- hadoop fs -put ./people.txt people
下面要創建hive外部表,在hive shell中執行
- CREATE EXTERNAL TABLE people (name string)
- ROW FORMAT DELIMITED FIELDS
- TERMINATED BY '\t'
- ESCAPED BY ''
- LINES TERMINATED BY '\n'
- STORED AS TEXTFILE
- LOCATION '/user/matthew/people';
UDTF的輸出值
上一文章講解的UDF與GenericUDF函數是操作單個數據域。它們必須要返回一個值。但是這並不適用於所用的數據處理任務。Hive可以存儲許多類型的數據,而有時候我們並不想單數據域輸入、單數據域輸出。對於每一行的輸入,可能我們想輸出多行,又或是不輸出,舉個例子,想一下函數explode(一個hive內置函數)的作用。
同樣,可能我們也想輸出多列,而不是輸出單列。
以上所有的要求我們可以用UDTF去完成。
以上所有的要求我們可以用UDTF去完成。
實例
首先我們先假設我們想清洗people這張表中的人名,這個新的表有:1、姓和名 兩個分開的列
2、所有記錄都包含姓名
3、每條記錄或有包含多個人名(eg Nick and Nicole Smith)
為了達到這個實例目的,我們將實現以下API:
- org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
- //該方法中,我們將指定輸入輸出參數:輸入參數的ObjectInspector與輸出參數的StructObjectInspector
- abstract StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException;
- //我們將處理一條輸入記錄,輸出若干條結果記錄
- abstract void process(Object[] record) throws HiveException;
- //當沒有記錄處理的時候該方法會被調用,用來清理代碼或者產生額外的輸出
- abstract void close() throws HiveException;
代碼實現
完整代碼
- public class NameParserGenericUDTF extends GenericUDTF {
- private PrimitiveObjectInspector stringOI = null;
- @Override
- public StructObjectInspector initialize(ObjectInspector[] args) UDFArgumentException {
- if (args.length != 1) {
- throw new UDFArgumentException("NameParserGenericUDTF() takes exactly one argument");
- }
- if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE
- && ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
- throw new UDFArgumentException("NameParserGenericUDTF() takes a string as a parameter");
- }
- // 輸入格式(inspectors)
- stringOI = (PrimitiveObjectInspector) args[0];
- // 輸出格式(inspectors) -- 有兩個屬性的對象
- List<String> fieldNames = new ArrayList<String>(2);
- List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2);
- fieldNames.add("name");
- fieldNames.add("surname");
- fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
- fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
- return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
- }
- public ArrayList<Object[]> processInputRecord(String name){
- ArrayList<Object[]> result = new ArrayList<Object[]>();
- // 忽略null值與空值
- if (name == null || name.isEmpty()) {
- return result;
- }
- String[] tokens = name.split("\\s+");
- if (tokens.length == 2){
- result.add(new Object[] { tokens[0], tokens[1] });
- }else if (tokens.length == 4 && tokens[1].equals("and")){
- result.add(new Object[] { tokens[0], tokens[3] });
- result.add(new Object[] { tokens[2], tokens[3] });
- }
- return result;
- }
- @Override
- public void process(Object[] record) throws HiveException {
- final String name = stringOI.getPrimitiveJavaObject(record[0]).toString();
- ArrayList<Object[]> results = processInputRecord(name);
- Iterator<Object[]> it = results.iterator();
- while (it.hasNext()){
- Object[] r = it.next();
- forward(r);
- }
- }
- @Override
- public void close() throws HiveException {
- // do nothing
- }
- }
代碼走讀
該UDTF以string類型作為參數,返回一個擁有兩個屬性的對象,與GenericUDF比較相似,指定輸入輸出數據格式(objectinspector),以便hive能識別輸入與輸出。
我們為輸入的string參數定義了數據格式PrimitiveObjectInspector
我們為輸入的string參數定義了數據格式PrimitiveObjectInspector
- stringOI = (PrimitiveObjectInspector) args[0]
定義輸出數據格式(objectinspectors) 需要我們先定義兩個屬性名稱,因為(objectinspectors)需要讀取每一個屬性(在這個實例中,兩個屬性都是string類型)。
- List<String> fieldNames = new ArrayList<String>(2);
- fieldNames.add("name");
- fieldNames.add("surname");
- List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2);
- fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
- fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
- return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
我們主要的處理邏輯放在這個比較直觀的processInputRecord方法當中。分開邏輯處理有利我們進行更簡單的單元測試,而不用涉及到繁瑣的objectinspector。
最后,一旦得到結果就可以對其進行forward,把基注冊為hive處理后的輸出記錄對象。
- mvn package
- cp target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar ./ext.jar
- ADD JAR ./ext.jar;
- CREATE TEMPORARY FUNCTION process_names as 'com.matthewrathbone.example.NameParserGenericUDTF';
- SELECT
- adTable.name,
- adTable.surname
- FROM people
- lateral view process_names(name) adTable as name, surname;