Hive 簡單udf入門--自然周差異計算


  Hive sql與我們普通使用的sql基本差異不大,但在大數據領域往往存在很多未知的需求,所以往往都有一個支持自定義功能函數編寫的口子,讓用戶實現其特定的需求。(這往往並非hive獨有,幾乎都是標配)

  而要寫udf往往也是比較簡單,看幾個例子,依葫蘆畫瓢總能搞幾個。

  今天我們就來簡單寫一個“自然周差異計算”week_diff函數吧。

 

1. pom依賴

  依賴是環境必備。實際上,hive udf 分為幾種類型,我們本文就來看看最簡單的一種實現, 繼承 UDF 類。

  pom.xml 必備依賴:

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>1.2.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.7.3</version>
</dependency>

  以上依賴,也就是一些接口定義,以及必備環境的類庫引入,然后你就可以進行編寫自己的UDF了。

 

2. 編寫UDF實現

  這是用戶要做的一件事也是唯一件可做的事,本篇是實現 UDF 功能。 UDF 是hive中一對一關系的函數調用,即給一個輸入,給出一個輸出。樣例如下:

import com.y.udf.exception.UdfDataException;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;

import java.time.DayOfWeek;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Date;

/**
 * 功能描述: 自然周偏移計算函數
 *          <p>周偏移計算</p>
 *
 */
@Description(name = "week_diff",
        value = "_FUNC_(week_diff(date dayForJudge [, date dateReferer]) - Returns day1 與 day2 的自然周差異數, 如 -3, -1, 0, n... \n"
                + "_FUNC_(week_diff('2020-07-30')) - Returns 0 \n"
                + "_FUNC_(week_diff('2020-01-01', '2020-01-08 10:00:01')) - Returns -1 \n" +
                "_FUNC_(week_diff(to_date(from_unixtime(UNIX_TIMESTAMP('2020-01-01','yyyy-MM-dd'))), current_date))")
public class WeekDiffUdf extends UDF {

    /**
     * 一天的毫秒數常量
     */
    private static final long ONE_DAY_MILLIS = 3600_000 * 24;

    /**
     * 日期格式定義
     */
    private final DateTimeFormatter dayFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");

    /**
     * 與當前日期為依據,計算日期偏移 (結果一般都為 -xx)
     *
     * @param weekDayForCompare 要比較的日期格式
     * @return 周差異(-1, 0, n...)
     */
    public int evaluate(String weekDayForCompare) {
        if(weekDayForCompare.length() < 10) {
            throw new UdfDataException("要比較的日期 day1 數據格式錯誤, 請確認是否為 yyyy-MM-dd 格式");
        }
        weekDayForCompare = weekDayForCompare.substring(0, 10);
        LocalDate day1 = LocalDate.parse(weekDayForCompare, dayFormatter);
        return evaluate(day1, LocalDate.now());
    }

    /**
     * 日期格式入參調用計算周偏移
     */
    public int evaluate(Date weekDayForCompare) {
        LocalDate day1 = weekDayForCompare.toInstant()
                            .atZone(ZoneOffset.ofHours(8)).toLocalDate();
        return evaluate(day1, LocalDate.now());
    }

    /**
     * 兩個日期比較周差異 (string -> string)
     *
     * @param weekDayForCompare 被比較的日期
     * @param weekDayRef 參照日期
     * @return day1與day2 的周差異
     * @throws UdfDataException 格式錯誤時拋出
     */
    public int evaluate(String weekDayForCompare, String weekDayRef) throws Exception {
        if(weekDayForCompare.length() < 10) {
            throw new UdfDataException("要比較的日期 day1 數據格式錯誤, 請確認是否為 yyyy-MM-dd 格式");
        }
        if(weekDayRef.length() < 10) {
            throw new UdfDataException("參考日期 day2 數據格式錯誤, 請確認是否為 yyyy-MM-dd 格式");
        }
        weekDayForCompare = weekDayForCompare.substring(0, 10);
        weekDayRef = weekDayRef.substring(0, 10);
        LocalDate day1 = LocalDate.parse(weekDayForCompare, dayFormatter);
        LocalDate day2 = LocalDate.parse(weekDayRef);
        return evaluate(day1, day2);
    }

    /**
     * 兩個日期比較周差異 (date -> date)
     */
    public int evaluate(Date weekDayForCompare, Date weekDayRef) {
        LocalDate day1 = weekDayForCompare.toInstant()
                            .atZone(ZoneOffset.ofHours(8)).toLocalDate();
        LocalDate day2 = weekDayRef.toInstant()
                            .atZone(ZoneOffset.ofHours(8)).toLocalDate();
        long day1WeekFirstTimestamp = getDayOfWeekFirstTimestamp(day1);
        long day2WeekFirstTimestamp = getDayOfWeekFirstTimestamp(day2);
        // 計算周差異算法很簡單,就是獲取日期所在周的第一天的時間戳相減,然后除以周單位即可得到周差異
        long diffWeeks = (day1WeekFirstTimestamp - day2WeekFirstTimestamp)
                / (ONE_DAY_MILLIS * 7);
        return (int) diffWeeks;
    }

    public int evaluate(LocalDate day1, LocalDate day2) {
        long day1WeekFirstTimestamp = getDayOfWeekFirstTimestamp(day1);
        long day2WeekFirstTimestamp = getDayOfWeekFirstTimestamp(day2);
        long diffWeeks = (day1WeekFirstTimestamp - day2WeekFirstTimestamp)
                / (ONE_DAY_MILLIS * 7);
        return (int) diffWeeks;
    }

    /**
     * 獲取指定日期所在自然周的 第一天的時間戳 (周一為第1天)
     * localDate 的周起始時間計算
     *
     * @param day 指定日期
     * @return 1434543543 時間戳
     * @see #getDayOfWeekFirstTimestamp(LocalDate)
     */
    private long getDayOfWeekFirstTimestamp(LocalDate day) {
        DayOfWeek dayOfWeek = day.getDayOfWeek();
        // 以周一為起始點 日_周 偏移, 周一: 2, 周三: 4, SUNDAY=7,MONDAY=1
        int realOffsetFromMonday = dayOfWeek.getValue() - 1;
        return day.atStartOfDay(ZoneOffset.ofHours(8)).toInstant().toEpochMilli()
                    - realOffsetFromMonday * ONE_DAY_MILLIS;
    }

}

  從上面可以看出,我們寫了n個 evaluate() 方法,而這些方法都是可能被hive作為函數入口調用的,我們可以簡單認為就是evaluate的重載函數。所以,不需要向外暴露的方法,就不要命名為 evaluate了。上面實現了這么多,主要就是考慮外部可能傳入的不同數據類型,做的適配工作。可以適當推測,hive是通過硬反射調用 udf 的。

  可以看到,具體的函數實現比較簡單,因為我們的需求就在那兒。倒也不是想炫技什么的,主要是hive也不會支持你這種需求,所以簡單也還得自己來。

 

3. 編寫udf單元測試

  這准確的說,是java基礎知識,但這里的單元測試遠比我們在hive進行函數測試來得容易,所以是有必要的。

import org.junit.Assert;
import org.junit.Test;

import java.text.SimpleDateFormat;

/**
 * 功能描述: 周函數單元測試
 *
 */
public class WeekDiffUdfTest {

    @Test
    public void testEvaluate() throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
        WeekDiffUdf udf = new WeekDiffUdf();
        int weekOffset;

        // 單入參函數測試
        String day1 = "2020-07-18";
        weekOffset = udf.evaluate(format.parse(day1));
        Assert.assertTrue("__FUNC__(string)周偏移計算錯誤",
                weekOffset <= -2);
        int weekOffset2 = udf.evaluate(day1);
        Assert.assertEquals("__FUNC__(string) != __FUNC__(date)",
                        weekOffset, weekOffset2);

        day1 = "2020-08-02";
        weekOffset = udf.evaluate(format.parse(day1));
        Assert.assertTrue("__FUNC__(string)周末邊界偏移計算錯誤",
                weekOffset <= 0);

        day1 = "2020-07-27";
        weekOffset = udf.evaluate(format.parse(day1));
        Assert.assertTrue("__FUNC__(string)周一邊界偏移計算錯誤",
                weekOffset <= 0);


        // 兩個函數參數入參測試
        day1 = "2020-08-02";
        String day2 = "2020-07-25 10:09:01";
        weekOffset = udf.evaluate(day1, day2);
        Assert.assertEquals("__FUNC__(string, string)周偏移計算錯誤",
                            1, weekOffset);

        day1 = "2020-07-27";
        day2 = "2020-07-30 10:00:01";
        weekOffset = udf.evaluate(day1, day2);
        Assert.assertEquals("__FUNC__(string, string)周偏移計算錯誤",
                        0, weekOffset);

        day1 = "2020-07-27";
        day2 = "2020-08-02";
        weekOffset = udf.evaluate(format.parse(day1), format.parse(day2));
        Assert.assertEquals("__FUNC__(date, date)周一周末偏移計算錯誤",
                        0, weekOffset);

        day1 = "2019-12-30";
        day2 = "2020-01-02";
        weekOffset = udf.evaluate(day1, day2);
        Assert.assertEquals("__FUNC__(string, string)跨年周偏移計算錯誤",
                            0, weekOffset);

        day1 = "2019-12-20";
        day2 = "2020-01-01";
        weekOffset = udf.evaluate(day1, day2);
        Assert.assertEquals("__FUNC__(string, string)跨年周偏移計算錯誤",
                            -2, weekOffset);
        System.out.println("ok。offset:" + weekOffset);
    }
}

  測試通過,核心功能無誤,可以准備打包發布hive環境了。當然是打jar包了。

 

4. 注冊udf並測試

  將前面打好的包放到hive環境可觸達的地方,運行加載命令!

add jar /home/hadoop/WeekDiffUdf.jar

  運行hive測試用命:(即相當於將前面的單元測試,翻譯成sql在hive中進行測試)

# 創建臨時函數,以便進行測試
create temporary function week_diff as "com.y.udf.WeekDiffUdf";    
select week_diff('2020-07-29') = 0 from default.dual;
select week_diff('2020-07-20') = -1 from default.dual;
select week_diff('2020-01-01', '2020-01-08 10:00:01') = -1 from default.dual;
select week_diff('2020-01-01', '2019-12-30 10:00:01') = 1 from default.dual;
select week_diff(to_date(from_unixtime(UNIX_TIMESTAMP('2020-07-28',"yyyy-MM-dd")))) = 0  from default.dual;
select week_diff(to_date(from_unixtime(UNIX_TIMESTAMP('2020-07-28',"yyyy-MM-dd"))), current_date) = 0 from default.dual;
# hive 外部會解析好字段值,再代入計算的
select my_date,week_diff(my_date) from default.account_for_test;

  如上結果,你應該會得到n個true返回值,否則單測不通過。最后一個sql只是為了驗證實際運行時被代入變量的情況,意義不大。

  運行單測完成后,功能就算完成了。我們可以正式發布了,進行永久注冊!如下:

CREATE FUNCTION week_diff AS 'com.y.udf.WeekDiffUdf' USING JAR 'hdfs://hadoop001:9000/lib/hadoop/WeekDiffUdf.jar';

  如此,一個自然周偏移函數udf 就完成了,你就可以像使用hive通用sql也一樣去寫業務了。

  可以通過 show functions; 查看已經注冊了的函數列表。

  要刪除已經注冊的函數:

drop temporary function week_diff;
drop function week_diff;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM