1、編寫UDF函數,來將原來創建的buck_ip_test表中的英文國籍轉換成中文
iptest.txt文件內容:
1 張三 192.168.1.1 china 2 李四 192.168.1.2 china 3 王五 192.168.1.3 china 4 makjon 192.168.1.4 china 1 aa 192.168.1.1 japan 2 bb 192.168.1.2 japan 3 cc 192.168.1.3 japan 4 makjon 192.168.1.4 japan
表數據截圖:

UdfTest.java代碼如下:
import java.util.HashMap;
import org.apache.hadoop.hive.ql.exec.UDF;
public class UdfTest extends UDF{
private static HashMap<String,String> countryMap = new HashMap();
static {
countryMap.put("china", "中國");
countryMap.put("japan", "日本");
}
//此段代碼進行國家的轉換
public String evaluate(String str){
String country = countryMap.get(str);
if(country ==null){
return "其他";
}else{
return country;
}
}
//在函數中可以定義多個evaluate方法,進行重載
//此段代碼進行國家和IP的拼接,測試重載用
public String evaluate(String country,String ip){
return country+"_"+ip;
}
/*
*
*此段代碼用於測試上面編寫的方法是否正確
public static void main(String[] args) {
UdfTest ut = new UdfTest();
// TODO Auto-generated method stub
String aa = ut.evaluate("AAAAAA");
System.out.println(aa);
}
*/
}
在eclipse測試無問題后,導出成utftest.jar並上傳到服務器的/opt目錄
進入hive,執行: add jar /opt/udftest.jar; 將jar包導入到hive中 再執行create temporary function convert as 'UdfTest'; 創建convert方法 執行結果如下圖:

然后在Hive中進行查詢:
select country,convert(country,ip),convert(country) from buck_ip_test;
執行結果如下圖:

這樣一個簡單的udf就開發完成啦
2、Hive中使用udf對JSON進行處理
數據文件movie.txt內容如下:
{"movie":"2797","rate":"4","timeStamp":"978302039","uid":"1"}
{"movie":"2321","rate":"3","timeStamp":"978302205","uid":"1"}
{"movie":"720","rate":"3","timeStamp":"978300760","uid":"1"}
{"movie":"1270","rate":"5","timeStamp":"978300055","uid":"1"}
{"movie":"527","rate":"5","timeStamp":"978824195","uid":"1"}
{"movie":"2340","rate":"3","timeStamp":"978300103","uid":"1"}
{"movie":"48","rate":"5","timeStamp":"978824351","uid":"1"}
{"movie":"1097","rate":"4","timeStamp":"978301953","uid":"1"}
{"movie":"1721","rate":"4","timeStamp":"978300055","uid":"1"}
{"movie":"1545","rate":"4","timeStamp":"978824139","uid":"1"}
將數據導入到hive中的rating表中:
create table rating(rate string); load data local inpath '/opt/movie.txt' overwrite into table rating; select * from rating;
結果如下圖:

在本例中我們使用ObjectMapper來處理json的數據,
首先創建MovierateBean.java,代碼如下:
import java.sql.Timestamp;
public class MovierateBean {
private String movie;
private String rate;
private Timestamp timeStamp;
private String uid;
public String getMovie() {
return movie;
}
public void setMovie(String movie) {
this.movie = movie;
}
public String getRate() {
return rate;
}
public void setRate(String rate) {
this.rate = rate;
}
public Timestamp getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(Timestamp timeStamp) {
this.timeStamp = timeStamp;
}
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
@Override
public String toString() {
// TODO Auto-generated method stub
return movie+"\t"+rate+"\t"+timeStamp+"\t"+uid;
}
}
然后創建MovieJsonTest.java,代碼如下:
import org.apache.hadoop.hive.ql.exec.UDF;
import org.codehaus.jackson.map.ObjectMapper;
public class MovieJsonTest extends UDF {
public String evaluate(String jsonline){
ObjectMapper om = new ObjectMapper();
try{
MovierateBean bean = om.readValue(jsonline,MovierateBean.class);
return bean.toString();
}catch(Exception e){
return(jsonline);
}
}
/*
public static void main(String[] args){
MovieJsonTest mt = new MovieJsonTest();
String jsonline="{\"movie\":\"527\",\"rate\":\"5\",\"timeStamp\":\"978824195\",\"uid\":\"1\"}";
System.out.println(mt.evaluate(jsonline));
}
*/
}
將上述文件打包成movie.jar,並上傳到服務器的/opt目錄下,並執行如下代碼:
add jar /opt/movie.jar; create temporary function movie_convert as 'MovieJsonTest'; select movie_convert(rate) from rating;
執行結果如下:

可以看到原來的json格式以及被解析成對應的字段了
3、Hive Transform簡單介紹
Hive的UDF、UDAF需要通過java語言編寫。Hive提供了另一種方式,達到自定義UDF和UDAF的目的,但使用方法更簡單。這就是TRANSFORM。TRANSFORM語言支持通過多種語言,實現類似於UDF的功能。
Hive還提供了MAP和REDUCE這兩個關鍵字。但MAP和REDUCE一般可理解為只是TRANSFORM的別名。並不代表一般是在map階段或者是在reduce階段調用。詳見官網說明。
我們可以使用如下的python腳本代替上面的UDF函數:
服務器端/opt/movie_trans.py腳本內容如下:
import sys
import datetime
import json
for line in sys.stdin:
#line='{"movie":"2797","rate":"4","timeStamp":"978302039","uid":"1"}'
line = line.strip()
hjson = json.loads(line)
movie = hjson['movie']
rate = hjson['rate']
timeStamp = hjson['timeStamp']
uid = hjson['uid']
timeStamp = datetime.datetime.fromtimestamp(float(timeStamp))
print '\t'.join([movie, rate, str(timeStamp),uid])
在hive中執行如下腳本:
ADD FILE /opt/movie_trans.py; SELECT TRANSFORM (rate) USING 'python movie_trans.py' AS (movie,rate, timeStamp, uid) FROM rating;
執行結果如下圖:

可以看到我們使用transform實現了上述UDF實現的功能
