相對於使用MapReduce或者Spark Application的方式進行數據分析,使用Hive SQL或Spark SQL能為我們省去不少的代碼工作量,而Hive SQL或Spark SQL本身內置的各類UDF也為我們的數據處理提供了不少便利的工具,當這些內置的UDF不能滿足於我們的需要時,Hive SQL或Spark SQL還為我們提供了自定義UDF的相關接口,方便我們根據自己的需求進行擴展。
在Hive的世界里使用自定義UDF的過程是比較復雜的。我們需要根據需求使用Java語言開發相應的UDF(UDAF、UDTF),然后將UDF的代碼及其依賴編譯打包為Jar,使用方法有兩種:
(1)臨時函數
在一次會話(Session)中使用如下語句創建臨時函數:
ADD JAR /run/jar/udf_test.jar;
CREATE TEMPORARY FUNCTION my_add AS 'com.hive.udf.Add';
這種方式有一個缺點:每一次會話過程中使用函數時都需要創建,而且僅在當前會話中有效。
(2)永久函數
這個特性需要高版本的Hive支持,它的好處是可以將UDF Jar存放至HDFS,函數僅需要創建一次即可以永久使用,如下:
CREATE FUNCTION func.ipToLocationBySina AS 'com.sina.dip.hive.function.IPToLocationBySina' USING JAR 'hdfs://dip.cdh5.dev:8020/user/hdfs/func/location.jar';
雖然永久函數相對於臨時函數有一定優勢,但Java語言的開發門檻很大程度上妨礙了UDF在實際數據分析過程中使用,畢竟我們的數據分析師多數是以Python、SQL為主要分析工具的,每一次UDF的開發都需要工程師的參與,開發效率與應用效果都是不是很好(可能需要頻繁更新UDF的問題),PySpark的出現確很好地解決了這個問題:它可以非常方便地將一個普通的Python函數注冊為一個UDF。
為了說明如何在Spark(Hive) SQL中的使用Python UDF,我們首先模擬一張數據表,為了簡單起見,該表僅有一行一列數據:

我們模擬了一張數據表temp_table,該表僅有一列,其中列名稱為col,列類型為字符串且不允許包含Null,輸出結果:

我們在表temp_table的基礎之上演示UDF的使用方法:

首先我們定義一個普通的Python函數:func_string,為了簡單起見它沒有任何參數,僅僅返回一個簡單的字符串;
然后我們通過HiveContext registerFunction即可以將函數func_string注冊為UDF,registerFunction接收兩個參數:UDF名稱、UDF關聯的Python函數;
最后我們可以在Spark(Hive) SQL中使用這個UDF,輸出結果:

我們需要注意的是,HiveContext registerFunction實際上有三個參數:

name:UDF名稱;
f:UDF關聯的Python函數;
returnType:UDF(Python函數)返回值類型,默認為StringType()。
上述示例中因為我們的UDF函數的返回值類型為字符串,因此使用Hive registerFunction注冊UDF時省略了參數returnType,即returnType默認值為StringType(),如果UDF(Python函數)的返回值類型不為字符串,則需要顯式為其指定returnType。
我們以類型IntegerType、ArrayType、StructType、MapType為例演示需要顯式指定returnType的情況。
(1)IntegerType


(2)ArrayType


注意:ArrayType(數組)必須確保元素類型的一致性,如指定UDF返回值類型為ArrayType(IntegerType()),則函數func_array的返回值類型必須為list或tuple,其中的元素類型必須為int。
(3)StructType


注意:StructType必須確保函數的返回值類型為tuple,而且使用HiveContext registerFunction注冊UDF時需要依次為其中的元素指定名稱各類型,如上述示例中每一個元素的名稱為first,類型為IntegerType;第二個元素的名稱為second,類型為FloatType;第三個元素的名稱為third,類型為StringType。
(4)MapType


注意:MapType必須確保函數的返回值類型為dict,而且所有的“key”應保持類型一致,“value”也就保持類型一致。