udf函數


數倉面試重災區-Generic User-defined Table Generating Function

UDTF 這玩意對數倉同學來講,熟悉又陌生,主要一方面是大量接觸,另一方面是理解上有誤導,還一個就是不是太明白里頭到底咋回事。

場景切入

關於UDTF面試場景大概有以下的問題:1、hive的udf你了解么,常用都有哪些類型 2、行專列操作了解么,里頭是怎么實現的 3、比較直白的問法,udtf你了解么 4、關於hive的優化方式,udtf其實是考察之一 背后的原因:1、首先實際線上濫用很多,數據膨脹、傾斜等,導數很嚴重的問題,實際點的例子,線上碰到因為udtf膨脹4-5個小時的運行時間,優化之后直接提到2分鍾,是很吐槽的一點 2、期望對udf了解的全面性,因為實際考察的時候大家覺得知道了解的時候,還以為面試官為啥問那么簡單的問題,還有點暗喜 3、相對來說是比較實用的etl中的操作

進場

UDTF其實在源碼中就有直接的描述:

1 org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
2 
3 /**
4  * A Generic User-defined Table Generating Function (UDTF)
5  * Generates a variable number of output rows for a single input row. Useful for
6  * explode(array)...
7  */
 

其實字面上的含義就是用戶定義表的一個函數,這個函數實現的是單行輸入的情況下輸出多行的結果,udtf中有兩個最基礎的接口定義,那就是定義輸出表中的的字段,輸出的數據內容

    public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException
public void process(Object[] args) throws HiveException

我們期望udf直接幫我們產生這么一個表格圖片為了封裝表格的列信息,我們進行封裝

public Schema(String column, JavaStringObjectInspector type) {
this.column = column;
this.type = type;
}

這樣一來,我們就可以對輸出列進行定義了

 public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
Schema[] schema={
new Schema("student_id",PrimitiveObjectInspectorFactory.javaStringObjectInspector),
new Schema("student_name",PrimitiveObjectInspectorFactory.javaStringObjectInspector),
new Schema("age",PrimitiveObjectInspectorFactory.javaStringObjectInspector),
new Schema("gender",PrimitiveObjectInspectorFactory.javaStringObjectInspector)
};

ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
for(int i = 0 ; i < schema.length; i++){
fieldNames.add(schema[i].column);
fieldOIs.add(schema[i].type);
}

return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}

其實就是定義出結構的對象。下一步,我們進行數據輸出

public void process(Object[] args) throws HiveException {
String[][] results={
{"S001","李強","12","男"},
{"S002","李強","15","男"},
{"S003","李軍","13","男"},
{"S004","王倩雪","12","女"},
{"S005","汪玉珍","12","女"},

};

for(int i = 0; i < results.length; i ++){
forward(results[i]);
}
}

來點簡單又不失風度的小代碼測試一下

create temporary function  show as 'org.apache.spark.udf.UDTF01';
select show();

結果如下:

+----------+------------+---+------+
|student_id|student_name|age|gender|
+----------+------------+---+------+
| S001| 李強| 12| 男|
| S002| 李強| 15| 男|
| S003| 李軍| 13| 男|
| S004| 王倩雪| 12| 女|
| S005| 汪玉珍| 12| 女|
+----------+------------+---+------+

可不是咋滴,這玩意不就是一個表么,這樣也可以表達出這類函數本來的功能了。

進擊

當然,前面為了表達式udtf的本來作用,直接數據上定死了,這樣的做法就是永遠都是輸出5行一樣的數據,這是表示一行輸入之后的結果,當我們有多行查詢的時候,結果就不斷重復了。我們再來點小測試:

drop table if  exists t;
create table if not exists t (id int);
insert into t select 1;
insert into t select 2;
select id,show() from t;

結果如下:

+---+----------+------------+---+------+
| id|student_id|student_name|age|gender|
+---+----------+------------+---+------+
| 2| S001| 李強| 12| 男|
| 2| S002| 李強| 15| 男|
| 2| S003| 李軍| 13| 男|
| 2| S004| 王倩雪| 12| 女|
| 2| S005| 汪玉珍| 12| 女|
| 1| S001| 李強| 12| 男|
| 1| S002| 李強| 15| 男|
| 1| S003| 李軍| 13| 男|
| 1| S004| 王倩雪| 12| 女|
| 1| S005| 汪玉珍| 12| 女|
+---+----------+------------+---+------+

這個結果就是為每一行輸出了udtf中的數據。實際的結果我們是基於原有的數據進行加工,再輸出,由於加工的結果是一行輸出多行的,所以這類的需求底層就得udtf的實現。我們看一下hive源碼中為我們提供了哪些udtf函數,位於:org.apache.hadoop.hive.ql.exec.FunctionRegistry 中,我們定位到這么幾行 Generic UDTF's:

 // Generic UDTF's
system.registerGenericUDTF("explode", GenericUDTFExplode.class);
system.registerGenericUDTF("replicate_rows", GenericUDTFReplicateRows.class);
system.registerGenericUDTF("inline", GenericUDTFInline.class);
system.registerGenericUDTF("json_tuple", GenericUDTFJSONTuple.class);
system.registerGenericUDTF("parse_url_tuple", GenericUDTFParseUrlTuple.class);
system.registerGenericUDTF("posexplode", GenericUDTFPosExplode.class);
system.registerGenericUDTF("stack", GenericUDTFStack.class);
system.registerGenericUDTF("get_splits", GenericUDTFGetSplits.class);

這部分就是我們內置的udtf定義了,我們很常見的explode也在里面,我們進行一下explode源碼閱讀 initialize部分如下:

    if (args.length != 1) {
throw new UDFArgumentException("explode() takes only one argument");
}

ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

switch (args[0].getCategory()) {
case LIST:
inputOI = args[0];
fieldNames.add("col");
fieldOIs.add(((ListObjectInspector)inputOI).getListElementObjectInspector());
break;
case MAP:
inputOI = args[0];
fieldNames.add("key");
fieldNames.add("value");
fieldOIs.add(((MapObjectInspector)inputOI).getMapKeyObjectInspector());
fieldOIs.add(((MapObjectInspector)inputOI).getMapValueObjectInspector());
break;
default:
throw new UDFArgumentException("explode() takes an array or a map as a parameter");

這里我們可以了解兩個信息:1、explode可以接收兩種類型,一種其實是list,還一種是map,這里可以解答為什么我們使用explode的時候常常看到的是組合的使用,因為我們實際的數據類型比較少是直接定義成LIST或者Map類型的,例如和字符串的切割組合 hexplode(split(...)) 2、傳遞LIST的時候我們的結果列名就是col,Map類型的則是key和value兩個列

我們再看看process的邏輯,其實相對比較簡單,因為輸入的是LIST或者MAP,輸出結果便是把結果打平輸出


switch (inputOI.getCategory()) {
case LIST:
ListObjectInspector listOI = (ListObjectInspector)inputOI;
List<?> list = listOI.getList(o[0]);
if (list == null) {
return;
}
for (Object r : list) {
forwardListObj[0] = r;
forward(forwardListObj);
}
break;
case MAP:
MapObjectInspector mapOI = (MapObjectInspector)inputOI;
Map<?,?> map = mapOI.getMap(o[0]);
if (map == null) {
return;
}
for (Entry<?,?> r : map.entrySet()) {
forwardMapObj[0] = r.getKey();
forwardMapObj[1] = r.getValue();
forward(forwardMapObj);
}
break;
default:
throw new TaskExecutionException("explode() can only operate on an array or a map");
}

到此為止,我們了解了explode工作的全部過程了。

進一步的思考

udtf本質上就是為我們每一行生成一個表,這個數據膨脹是完全是N*M的等級,在大量數據規模下運算,如果一行有10行輸出的話,整個表的數據會變成10倍,另外如果一行中的LITS過長的話,勢必是更加嚴重的情形和擴展。怎么去規避呢?

其實了解底層原理就很好說了,減少數據規模,過濾無用的膨脹,LIST切割邏輯進行分段,辦法總比困難多,只是底層跑的內容真正了解的情況下,辦法也才是有效的 

轉自 敏叔的技術札記  這塊 我也不行 我也得加油多看


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM