HIVE UDF函數和Transform


1、編寫UDF函數,來將原來創建的buck_ip_test表中的英文國籍轉換成中文

iptest.txt文件內容:

1	張三	192.168.1.1	china
2	李四	192.168.1.2	china
3	王五	192.168.1.3	china
4	makjon	192.168.1.4	china
1	aa	192.168.1.1	japan
2	bb	192.168.1.2	japan
3	cc	192.168.1.3	japan
4	makjon	192.168.1.4	japan

表數據截圖:

UdfTest.java代碼如下:

import java.util.HashMap;

import org.apache.hadoop.hive.ql.exec.UDF;

public class UdfTest extends UDF{
	
	private static HashMap<String,String> countryMap = new HashMap();
	
	static {
		countryMap.put("china", "中國");
		countryMap.put("japan", "日本");
		
	}
	
	//此段代碼進行國家的轉換
	public  String evaluate(String str){
		String country  = countryMap.get(str);
		if(country ==null){
			return "其他";
		}else{
			return country;
		}
	}
    
	//在函數中可以定義多個evaluate方法,進行重載
	//此段代碼進行國家和IP的拼接,測試重載用
	public  String evaluate(String country,String ip){
		
			return country+"_"+ip;
	}
	
	/*
	 *
	 *此段代碼用於測試上面編寫的方法是否正確
	public static void main(String[] args) {
		UdfTest ut = new UdfTest();
		// TODO Auto-generated method stub
		String aa = ut.evaluate("AAAAAA");
        System.out.println(aa);
	}
	*/

}

在eclipse測試無問題后,導出成utftest.jar並上傳到服務器的/opt目錄

進入hive,執行:
add jar /opt/udftest.jar;
將jar包導入到hive中
再執行create temporary function convert as  'UdfTest';
創建convert方法
執行結果如下圖:

然后在Hive中進行查詢:

 select country,convert(country,ip),convert(country) from buck_ip_test;

執行結果如下圖:

這樣一個簡單的udf就開發完成啦

 

2、Hive中使用udf對JSON進行處理

 數據文件movie.txt內容如下:

{"movie":"2797","rate":"4","timeStamp":"978302039","uid":"1"}
{"movie":"2321","rate":"3","timeStamp":"978302205","uid":"1"}
{"movie":"720","rate":"3","timeStamp":"978300760","uid":"1"}
{"movie":"1270","rate":"5","timeStamp":"978300055","uid":"1"}
{"movie":"527","rate":"5","timeStamp":"978824195","uid":"1"}
{"movie":"2340","rate":"3","timeStamp":"978300103","uid":"1"}
{"movie":"48","rate":"5","timeStamp":"978824351","uid":"1"}
{"movie":"1097","rate":"4","timeStamp":"978301953","uid":"1"}
{"movie":"1721","rate":"4","timeStamp":"978300055","uid":"1"}
{"movie":"1545","rate":"4","timeStamp":"978824139","uid":"1"}

將數據導入到hive中的rating表中:

create table rating(rate string);
load data local inpath '/opt/movie.txt' overwrite into table rating;
select * from rating;

結果如下圖:

在本例中我們使用ObjectMapper來處理json的數據,

首先創建MovierateBean.java,代碼如下:

import java.sql.Timestamp;

public class MovierateBean {
	private String movie;
	private String rate;
	private Timestamp timeStamp;
	private String uid;
	
	public String getMovie() {
		return movie;
	}

	public void setMovie(String movie) {
		this.movie = movie;
	}

	public String getRate() {
		return rate;
	}

	public void setRate(String rate) {
		this.rate = rate;
	}
	
	public Timestamp getTimeStamp() {
		return timeStamp;
	}

	public void setTimeStamp(Timestamp timeStamp) {
		this.timeStamp = timeStamp;
	}

	public String getUid() {
		return uid;
	}

	public void setUid(String uid) {
		this.uid = uid;
	}

	@Override
	public String toString() {
		// TODO Auto-generated method stub
		return movie+"\t"+rate+"\t"+timeStamp+"\t"+uid;
	}
	
	
}

  

然后創建MovieJsonTest.java,代碼如下:

import org.apache.hadoop.hive.ql.exec.UDF;
import org.codehaus.jackson.map.ObjectMapper;

public class MovieJsonTest extends UDF {
	
	
	public String evaluate(String jsonline){
		ObjectMapper om = new ObjectMapper();
		try{
			MovierateBean  bean = om.readValue(jsonline,MovierateBean.class);
			return bean.toString();
		}catch(Exception e){
			return(jsonline);
		}	
		
	}
	
	/*
	public static void main(String[] args){
		MovieJsonTest mt = new MovieJsonTest();
		String jsonline="{\"movie\":\"527\",\"rate\":\"5\",\"timeStamp\":\"978824195\",\"uid\":\"1\"}";
		System.out.println(mt.evaluate(jsonline));
	}
	*/


}

將上述文件打包成movie.jar,並上傳到服務器的/opt目錄下,並執行如下代碼:

 

add jar /opt/movie.jar;
create temporary function movie_convert as 'MovieJsonTest';
select movie_convert(rate) from rating;

執行結果如下:

可以看到原來的json格式以及被解析成對應的字段了

 

 

3、Hive Transform簡單介紹

Hive的UDF、UDAF需要通過java語言編寫。Hive提供了另一種方式,達到自定義UDF和UDAF的目的,但使用方法更簡單。這就是TRANSFORM。TRANSFORM語言支持通過多種語言,實現類似於UDF的功能。

Hive還提供了MAP和REDUCE這兩個關鍵字。但MAP和REDUCE一般可理解為只是TRANSFORM的別名。並不代表一般是在map階段或者是在reduce階段調用。詳見官網說明。

 

我們可以使用如下的python腳本代替上面的UDF函數:

服務器端/opt/movie_trans.py腳本內容如下:

import sys
import datetime
import json

for line in sys.stdin:
    #line='{"movie":"2797","rate":"4","timeStamp":"978302039","uid":"1"}'
    line = line.strip()
    hjson = json.loads(line)
    movie = hjson['movie']
    rate = hjson['rate']
    timeStamp = hjson['timeStamp']
    uid = hjson['uid']
    timeStamp = datetime.datetime.fromtimestamp(float(timeStamp))
    print '\t'.join([movie, rate, str(timeStamp),uid])

在hive中執行如下腳本:

ADD FILE /opt/movie_trans.py;

SELECT
  TRANSFORM (rate)
  USING 'python movie_trans.py'
  AS (movie,rate, timeStamp, uid)
FROM rating;

執行結果如下圖:

可以看到我們使用transform實現了上述UDF實現的功能


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM