數倉面試重災區-Generic User-defined Table Generating Function
UDTF 這玩意對數倉同學來講,熟悉又陌生,主要一方面是大量接觸,另一方面是理解上有誤導,還一個就是不是太明白里頭到底咋回事。
場景切入
關於UDTF面試場景大概有以下的問題:1、hive的udf你了解么,常用都有哪些類型 2、行專列操作了解么,里頭是怎么實現的 3、比較直白的問法,udtf你了解么 4、關於hive的優化方式,udtf其實是考察之一 背后的原因:1、首先實際線上濫用很多,數據膨脹、傾斜等,導數很嚴重的問題,實際點的例子,線上碰到因為udtf膨脹4-5個小時的運行時間,優化之后直接提到2分鍾,是很吐槽的一點 2、期望對udf了解的全面性,因為實際考察的時候大家覺得知道了解的時候,還以為面試官為啥問那么簡單的問題,還有點暗喜 3、相對來說是比較實用的etl中的操作
進場
UDTF其實在源碼中就有直接的描述:
1 org.apache.hadoop.hive.ql.udf.generic.GenericUDTF 2 3 /** 4 * A Generic User-defined Table Generating Function (UDTF) 5 * Generates a variable number of output rows for a single input row. Useful for 6 * explode(array)... 7 */
其實字面上的含義就是用戶定義表的一個函數,這個函數實現的是單行輸入的情況下輸出多行的結果,udtf中有兩個最基礎的接口定義,那就是定義輸出表中的的字段,輸出的數據內容
public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException
public void process(Object[] args) throws HiveException
我們期望udf直接幫我們產生這么一個表格為了封裝表格的列信息,我們進行封裝
public Schema(String column, JavaStringObjectInspector type) {
this.column = column;
this.type = type;
}
這樣一來,我們就可以對輸出列進行定義了
public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
Schema[] schema={
new Schema("student_id",PrimitiveObjectInspectorFactory.javaStringObjectInspector),
new Schema("student_name",PrimitiveObjectInspectorFactory.javaStringObjectInspector),
new Schema("age",PrimitiveObjectInspectorFactory.javaStringObjectInspector),
new Schema("gender",PrimitiveObjectInspectorFactory.javaStringObjectInspector)
};
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
for(int i = 0 ; i < schema.length; i++){
fieldNames.add(schema[i].column);
fieldOIs.add(schema[i].type);
}
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
其實就是定義出結構的對象。下一步,我們進行數據輸出
public void process(Object[] args) throws HiveException {
String[][] results={
{"S001","李強","12","男"},
{"S002","李強","15","男"},
{"S003","李軍","13","男"},
{"S004","王倩雪","12","女"},
{"S005","汪玉珍","12","女"},
};
for(int i = 0; i < results.length; i ++){
forward(results[i]);
}
}
來點簡單又不失風度的小代碼測試一下
create temporary function show as 'org.apache.spark.udf.UDTF01';
select show();
結果如下:
+----------+------------+---+------+
|student_id|student_name|age|gender|
+----------+------------+---+------+
| S001| 李強| 12| 男|
| S002| 李強| 15| 男|
| S003| 李軍| 13| 男|
| S004| 王倩雪| 12| 女|
| S005| 汪玉珍| 12| 女|
+----------+------------+---+------+
可不是咋滴,這玩意不就是一個表么,這樣也可以表達出這類函數本來的功能了。
進擊
當然,前面為了表達式udtf的本來作用,直接數據上定死了,這樣的做法就是永遠都是輸出5行一樣的數據,這是表示一行輸入之后的結果,當我們有多行查詢的時候,結果就不斷重復了。我們再來點小測試:
drop table if exists t;
create table if not exists t (id int);
insert into t select 1;
insert into t select 2;
select id,show() from t;
結果如下:
+---+----------+------------+---+------+
| id|student_id|student_name|age|gender|
+---+----------+------------+---+------+
| 2| S001| 李強| 12| 男|
| 2| S002| 李強| 15| 男|
| 2| S003| 李軍| 13| 男|
| 2| S004| 王倩雪| 12| 女|
| 2| S005| 汪玉珍| 12| 女|
| 1| S001| 李強| 12| 男|
| 1| S002| 李強| 15| 男|
| 1| S003| 李軍| 13| 男|
| 1| S004| 王倩雪| 12| 女|
| 1| S005| 汪玉珍| 12| 女|
+---+----------+------------+---+------+
這個結果就是為每一行輸出了udtf中的數據。實際的結果我們是基於原有的數據進行加工,再輸出,由於加工的結果是一行輸出多行的,所以這類的需求底層就得udtf的實現。我們看一下hive源碼中為我們提供了哪些udtf函數,位於:org.apache.hadoop.hive.ql.exec.FunctionRegistry 中,我們定位到這么幾行 Generic UDTF's:
// Generic UDTF's
system.registerGenericUDTF("explode", GenericUDTFExplode.class);
system.registerGenericUDTF("replicate_rows", GenericUDTFReplicateRows.class);
system.registerGenericUDTF("inline", GenericUDTFInline.class);
system.registerGenericUDTF("json_tuple", GenericUDTFJSONTuple.class);
system.registerGenericUDTF("parse_url_tuple", GenericUDTFParseUrlTuple.class);
system.registerGenericUDTF("posexplode", GenericUDTFPosExplode.class);
system.registerGenericUDTF("stack", GenericUDTFStack.class);
system.registerGenericUDTF("get_splits", GenericUDTFGetSplits.class);
這部分就是我們內置的udtf定義了,我們很常見的explode也在里面,我們進行一下explode源碼閱讀 initialize部分如下:
if (args.length != 1) {
throw new UDFArgumentException("explode() takes only one argument");
}
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
switch (args[0].getCategory()) {
case LIST:
inputOI = args[0];
fieldNames.add("col");
fieldOIs.add(((ListObjectInspector)inputOI).getListElementObjectInspector());
break;
case MAP:
inputOI = args[0];
fieldNames.add("key");
fieldNames.add("value");
fieldOIs.add(((MapObjectInspector)inputOI).getMapKeyObjectInspector());
fieldOIs.add(((MapObjectInspector)inputOI).getMapValueObjectInspector());
break;
default:
throw new UDFArgumentException("explode() takes an array or a map as a parameter");
這里我們可以了解兩個信息:1、explode可以接收兩種類型,一種其實是list,還一種是map,這里可以解答為什么我們使用explode的時候常常看到的是組合的使用,因為我們實際的數據類型比較少是直接定義成LIST或者Map類型的,例如和字符串的切割組合 hexplode(split(...)) 2、傳遞LIST的時候我們的結果列名就是col,Map類型的則是key和value兩個列
我們再看看process的邏輯,其實相對比較簡單,因為輸入的是LIST或者MAP,輸出結果便是把結果打平輸出
switch (inputOI.getCategory()) {
case LIST:
ListObjectInspector listOI = (ListObjectInspector)inputOI;
List<?> list = listOI.getList(o[0]);
if (list == null) {
return;
}
for (Object r : list) {
forwardListObj[0] = r;
forward(forwardListObj);
}
break;
case MAP:
MapObjectInspector mapOI = (MapObjectInspector)inputOI;
Map<?,?> map = mapOI.getMap(o[0]);
if (map == null) {
return;
}
for (Entry<?,?> r : map.entrySet()) {
forwardMapObj[0] = r.getKey();
forwardMapObj[1] = r.getValue();
forward(forwardMapObj);
}
break;
default:
throw new TaskExecutionException("explode() can only operate on an array or a map");
}
到此為止,我們了解了explode工作的全部過程了。
進一步的思考
udtf本質上就是為我們每一行生成一個表,這個數據膨脹是完全是N*M的等級,在大量數據規模下運算,如果一行有10行輸出的話,整個表的數據會變成10倍,另外如果一行中的LITS過長的話,勢必是更加嚴重的情形和擴展。怎么去規避呢?
其實了解底層原理就很好說了,減少數據規模,過濾無用的膨脹,LIST切割邏輯進行分段,辦法總比困難多,只是底層跑的內容真正了解的情況下,辦法也才是有效的
轉自