Hive中的UDF詳解



hive作為一個sql查詢引擎,自帶了一些基本的函數,比如 count(計數), sum(求和),有時候這些基本函數滿足不了我們的需求,這時候就要寫 hive hdf(user defined funation),又叫用戶自定義函數。

UDF 創建與使用步驟

  • 繼承org.apache.hadoop.hive.ql.exec.UDF類,實現evaluate方法;
  • jar包上傳到集群,通過create temporary function創建臨時函數,不加temporary就創建了一個永久函數;
  • 通過select 語句使用;

例一

下面是一個判斷hive表字段是否包含’100’這個子串的簡單udf:

package com.js.dataclean.hive.udf.hm2

import org.apache.hadoop.hive.ql.exec.UDF;

public class IsContains100 extends UDF{

	public String evaluate(String s){

        if(s == null || s.length() == 0){
        	return "0";
        }

        return s.contains("100") ? "1" : "0";
    }
}

使用maven將其打包,進入hive cli,輸入命令:

add jar /home/hadoop/codejar/flash_format.jar;
create temporary function isContains100 as 'com.js.dataclean.hive.udf.hm2.IsContains100';

創建完臨時函數,即可使用這個函數了:

select isContains100('abc100def') from table limit 1;
1

例二

通過讀取mysql數據庫中的規則,為hive中的workflow返回對應的,類型:

type workflow
a	1
a	2
b	11
b	22
b	33

需求:我們希望,將hive的workflow字段取值為,1,2的變為類型(type)a,取值為11,22,33的全部變為b,就是歸類的意思。

這個udf可以這么實現:

package com.js.dataclean.hive.udf.hm2.workflow;

import org.apache.hadoop.hive.ql.exec.UDF;

import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @ Author: keguang
 * @ Date: 2018/12/13 16:24
 * @ version: v1.0.0
 * @ description:
 */
public class GetWorkflow extends UDF{

    private static final String host = "0.0.0.0";
    private static final String port = "3306";
    private static final String database = "root";
    private static final String userName = "root";
    private static final String password = "123456";
    private static String url = "";
    private static final String driver = "com.mysql.jdbc.Driver";
    private static Connection conn = null;
    private static Map<String, List<String>> workflowType = null;

    static {
        url = "jdbc:mysql://" + host + ":" + port + "/" + database;
        try {
            // Class.forName(driver);
            conn = DriverManager.getConnection(url, userName, password);
            workflowType = getWorkflowType(conn);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    private static Map<String, List<String>> getWorkflowType(Connection conn){
        Map<String, List<String>> workflowType = new HashMap<>();
        String sql = "select * from flash_player_workflow";
        PreparedStatement ps = null;
        try {
            ps = conn.prepareStatement(sql);
            ResultSet rs = ps.executeQuery();
            while (rs.next()){
                String workflow = rs.getString("workflow");
                String type = rs.getString("flag");

                List<String> workflows = workflowType.get(type);
                if(workflows == null){
                    workflows = new ArrayList<>();
                }
                workflows.add(workflow);
                workflowType.put(type, workflows);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {

            // 關閉鏈接
            if(conn != null){
                try {
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
        return workflowType;

    }

    public String evaluate(String s){
        assert workflowType != null;

        for(String type:workflowType.keySet()){
            List<String> workflows = workflowType.get(type);
            if(workflows.contains(s)){
                return type;
            }
        }

        return s;
    }

}

打好jar包,創建函數: workflow2type(省略),然后使用:

select workflow2type(workflow) from table;

a
a
b
b
b

這樣就把很多取值歸為幾個大類了。

查看hive function的用法

查month 相關的函數

show functions like '*month*';

查看 add_months 函數的用法

desc function add_months;

查看 add_months 函數的詳細說明並舉例

desc function extended add_months;

hive 中的 UDAF

可以看出,udf就是一個輸入一個輸出,輸入一個性別,返回’男’或者’女’,如果我們想實現select date,count(1) from table,統計每天的流量呢?這就是一個分組統計,顯然是多個輸入,一個輸出,這時候udf已經不能滿足我們的需要,就需要寫udaf,user defined aggregare function(用戶自定義聚合函數)。

這里寫一個字符串連接函數,相當於concat的功能,將多行輸入,合並為一個字符串,當然了hive中有字符串連接函數,這里是舉例說明UDAF的用法:

package com.js.dataclean.hive.udaf.hm2;

import com.js.dataclean.utils.StringUtil;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

/**
 * 實現字符串連接聚合的UDAF
 * @version v1.0.0
 * @Author:keguang
 * @Date:2018/10/22 14:36
 */
public class MutiStringConcat extends UDAF{
    public static class SumState{
        private String sumStr;
    }

    public static class SumEvaluator implements UDAFEvaluator{
        SumState sumState;

        public SumEvaluator(){
            super();
            sumState = new SumState();
            init();
        }

        @Override
        public void init() {
            sumState.sumStr = "";
        }

        /**
         * 來了一行數據
         * @param s
         * @return
         */
        public boolean iterate(String s){
            if(!StringUtil.isNull(s)){
                sumState.sumStr += s;
            }
            return true;
        }

        /**
         * 狀態傳遞
         * @return
         */
        public SumState terminatePartial() {
            return sumState;
        }

        /**
         * 子任務合並
         * @param state
         * @return
         */
        public boolean merge(SumState state){
            if(state != null){
                sumState.sumStr += state.sumStr;
            }
            return true;
        }

        /**
         * 返回最終結果
         * @return
         */
        public String terminate(){
            return sumState.sumStr;
        }
    }
}

用法,與udf一樣,還是需要打包並且到hive cli中注冊使用。

關於UDAF開發注意點:

  • 需要import org.apache.hadoop.hive.ql.exec.UDAF以及org.apache.hadoop.hive.ql.exec.UDAFEvaluator,這兩個包都是必須的
  • 函數類需要繼承UDAF類,內部類Evaluator實現UDAFEvaluator接口
  • Evaluator需要實現 init、iterate、terminatePartial、merge、terminate這幾個函數
    • init函數類似於構造函數,用於UDAF的初始化
    • iterate接收傳入的參數,並進行內部的輪轉。其返回類型為boolean
    • terminatePartial無參數,其為iterate函數輪轉結束后,返回亂轉數據,iterate和terminatePartial類似於hadoop的Combiner
    • merge接收terminatePartial的返回結果,進行數據merge操作,其返回類型為boolean
    • terminate返回最終的聚集函數結果

臨時與永久函數

Hive自定義函數分為臨時與永久函數,顧名思義,分別是臨時使用和永久有效使用的意思。

臨時函數

臨時函數,關閉會話就結束了生命周期,下次要想使用,需要重新注冊。

add jar /path/xx.jar(存儲在本地磁盤)

// 臨時注冊UDF函數(hive會話生效)
create temporary function 函數名 as '包名.類名';

刪除臨時函數:

  • drop temporary function 數據庫名.函數名;

永久函數

永久函數一旦注冊,可以在hive cli,遠程連接hiveserver2等地方永久使用,步驟為:

  • 先上傳jar包到HDFS

  • 永久注冊:

CREATE FUNCTION 函數名 AS '包名.類名' USING JAR 'hdfs:///path/xxxx.jar';

注意:指定jar包路徑需要是hdfs路徑。

  • 刪除永久函數:
drop function 數據庫名.函數名字;

新增的永久函數,比如在hive cli命令行注冊的,可能會在beeline或者hiveserver2遠程連接時,提示不存在該函數。解決辦法是,在無法使用UDF的HiveServer2上,執行reload function命令,將MetaStore中新增的UDF信息同步到HiveServer2內存中。

場景

UDF在hive中使用場景廣泛,這里列舉常用的使用場景。

IP 轉化為地址

分詞

SQL 分析UDF

Hive系列文章

Hive表的基本操作
Hive中的集合數據類型
Hive動態分區詳解
hive中orc格式表的數據導入
Java通過jdbc連接hive
通過HiveServer2訪問Hive
SpringBoot連接Hive實現自助取數
hive關聯hbase表
Hive udf 使用方法
Hive基於UDF進行文本分詞
Hive窗口函數row number的用法
數據倉庫之拉鏈表


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM