Hive sql與我們普通使用的sql基本差異不大,但在大數據領域往往存在很多未知的需求,所以往往都有一個支持自定義功能函數編寫的口子,讓用戶實現其特定的需求。(這往往並非hive獨有,幾乎都是標配)
而要寫udf往往也是比較簡單,看幾個例子,依葫蘆畫瓢總能搞幾個。
今天我們就來簡單寫一個“自然周差異計算”week_diff函數吧。
1. pom依賴
依賴是環境必備。實際上,hive udf 分為幾種類型,我們本文就來看看最簡單的一種實現, 繼承 UDF 類。
pom.xml 必備依賴:
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> </dependency>
以上依賴,也就是一些接口定義,以及必備環境的類庫引入,然后你就可以進行編寫自己的UDF了。
2. 編寫UDF實現
這是用戶要做的一件事也是唯一件可做的事,本篇是實現 UDF 功能。 UDF 是hive中一對一關系的函數調用,即給一個輸入,給出一個輸出。樣例如下:
import com.y.udf.exception.UdfDataException; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDF; import java.time.DayOfWeek; import java.time.LocalDate; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.Date; /** * 功能描述: 自然周偏移計算函數 * <p>周偏移計算</p> * */ @Description(name = "week_diff", value = "_FUNC_(week_diff(date dayForJudge [, date dateReferer]) - Returns day1 與 day2 的自然周差異數, 如 -3, -1, 0, n... \n" + "_FUNC_(week_diff('2020-07-30')) - Returns 0 \n" + "_FUNC_(week_diff('2020-01-01', '2020-01-08 10:00:01')) - Returns -1 \n" + "_FUNC_(week_diff(to_date(from_unixtime(UNIX_TIMESTAMP('2020-01-01','yyyy-MM-dd'))), current_date))") public class WeekDiffUdf extends UDF { /** * 一天的毫秒數常量 */ private static final long ONE_DAY_MILLIS = 3600_000 * 24; /** * 日期格式定義 */ private final DateTimeFormatter dayFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); /** * 與當前日期為依據,計算日期偏移 (結果一般都為 -xx) * * @param weekDayForCompare 要比較的日期格式 * @return 周差異(-1, 0, n...) */ public int evaluate(String weekDayForCompare) { if(weekDayForCompare.length() < 10) { throw new UdfDataException("要比較的日期 day1 數據格式錯誤, 請確認是否為 yyyy-MM-dd 格式"); } weekDayForCompare = weekDayForCompare.substring(0, 10); LocalDate day1 = LocalDate.parse(weekDayForCompare, dayFormatter); return evaluate(day1, LocalDate.now()); } /** * 日期格式入參調用計算周偏移 */ public int evaluate(Date weekDayForCompare) { LocalDate day1 = weekDayForCompare.toInstant() .atZone(ZoneOffset.ofHours(8)).toLocalDate(); return evaluate(day1, LocalDate.now()); } /** * 兩個日期比較周差異 (string -> string) * * @param weekDayForCompare 被比較的日期 * @param weekDayRef 參照日期 * @return day1與day2 的周差異 * @throws UdfDataException 格式錯誤時拋出 */ public int evaluate(String weekDayForCompare, String weekDayRef) throws Exception { if(weekDayForCompare.length() < 10) { throw new UdfDataException("要比較的日期 day1 數據格式錯誤, 請確認是否為 yyyy-MM-dd 格式"); } if(weekDayRef.length() < 10) { throw new UdfDataException("參考日期 day2 數據格式錯誤, 請確認是否為 yyyy-MM-dd 格式"); } weekDayForCompare = weekDayForCompare.substring(0, 10); weekDayRef = weekDayRef.substring(0, 10); LocalDate day1 = LocalDate.parse(weekDayForCompare, dayFormatter); LocalDate day2 = LocalDate.parse(weekDayRef); return evaluate(day1, day2); } /** * 兩個日期比較周差異 (date -> date) */ public int evaluate(Date weekDayForCompare, Date weekDayRef) { LocalDate day1 = weekDayForCompare.toInstant() .atZone(ZoneOffset.ofHours(8)).toLocalDate(); LocalDate day2 = weekDayRef.toInstant() .atZone(ZoneOffset.ofHours(8)).toLocalDate(); long day1WeekFirstTimestamp = getDayOfWeekFirstTimestamp(day1); long day2WeekFirstTimestamp = getDayOfWeekFirstTimestamp(day2); // 計算周差異算法很簡單,就是獲取日期所在周的第一天的時間戳相減,然后除以周單位即可得到周差異 long diffWeeks = (day1WeekFirstTimestamp - day2WeekFirstTimestamp) / (ONE_DAY_MILLIS * 7); return (int) diffWeeks; } public int evaluate(LocalDate day1, LocalDate day2) { long day1WeekFirstTimestamp = getDayOfWeekFirstTimestamp(day1); long day2WeekFirstTimestamp = getDayOfWeekFirstTimestamp(day2); long diffWeeks = (day1WeekFirstTimestamp - day2WeekFirstTimestamp) / (ONE_DAY_MILLIS * 7); return (int) diffWeeks; } /** * 獲取指定日期所在自然周的 第一天的時間戳 (周一為第1天) * localDate 的周起始時間計算 * * @param day 指定日期 * @return 1434543543 時間戳 * @see #getDayOfWeekFirstTimestamp(LocalDate) */ private long getDayOfWeekFirstTimestamp(LocalDate day) { DayOfWeek dayOfWeek = day.getDayOfWeek(); // 以周一為起始點 日_周 偏移, 周一: 2, 周三: 4, SUNDAY=7,MONDAY=1 int realOffsetFromMonday = dayOfWeek.getValue() - 1; return day.atStartOfDay(ZoneOffset.ofHours(8)).toInstant().toEpochMilli() - realOffsetFromMonday * ONE_DAY_MILLIS; } }
從上面可以看出,我們寫了n個 evaluate() 方法,而這些方法都是可能被hive作為函數入口調用的,我們可以簡單認為就是evaluate的重載函數。所以,不需要向外暴露的方法,就不要命名為 evaluate了。上面實現了這么多,主要就是考慮外部可能傳入的不同數據類型,做的適配工作。可以適當推測,hive是通過硬反射調用 udf 的。
可以看到,具體的函數實現比較簡單,因為我們的需求就在那兒。倒也不是想炫技什么的,主要是hive也不會支持你這種需求,所以簡單也還得自己來。
3. 編寫udf單元測試
這准確的說,是java基礎知識,但這里的單元測試遠比我們在hive進行函數測試來得容易,所以是有必要的。
import org.junit.Assert; import org.junit.Test; import java.text.SimpleDateFormat; /** * 功能描述: 周函數單元測試 * */ public class WeekDiffUdfTest { @Test public void testEvaluate() throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); WeekDiffUdf udf = new WeekDiffUdf(); int weekOffset; // 單入參函數測試 String day1 = "2020-07-18"; weekOffset = udf.evaluate(format.parse(day1)); Assert.assertTrue("__FUNC__(string)周偏移計算錯誤", weekOffset <= -2); int weekOffset2 = udf.evaluate(day1); Assert.assertEquals("__FUNC__(string) != __FUNC__(date)", weekOffset, weekOffset2); day1 = "2020-08-02"; weekOffset = udf.evaluate(format.parse(day1)); Assert.assertTrue("__FUNC__(string)周末邊界偏移計算錯誤", weekOffset <= 0); day1 = "2020-07-27"; weekOffset = udf.evaluate(format.parse(day1)); Assert.assertTrue("__FUNC__(string)周一邊界偏移計算錯誤", weekOffset <= 0); // 兩個函數參數入參測試 day1 = "2020-08-02"; String day2 = "2020-07-25 10:09:01"; weekOffset = udf.evaluate(day1, day2); Assert.assertEquals("__FUNC__(string, string)周偏移計算錯誤", 1, weekOffset); day1 = "2020-07-27"; day2 = "2020-07-30 10:00:01"; weekOffset = udf.evaluate(day1, day2); Assert.assertEquals("__FUNC__(string, string)周偏移計算錯誤", 0, weekOffset); day1 = "2020-07-27"; day2 = "2020-08-02"; weekOffset = udf.evaluate(format.parse(day1), format.parse(day2)); Assert.assertEquals("__FUNC__(date, date)周一周末偏移計算錯誤", 0, weekOffset); day1 = "2019-12-30"; day2 = "2020-01-02"; weekOffset = udf.evaluate(day1, day2); Assert.assertEquals("__FUNC__(string, string)跨年周偏移計算錯誤", 0, weekOffset); day1 = "2019-12-20"; day2 = "2020-01-01"; weekOffset = udf.evaluate(day1, day2); Assert.assertEquals("__FUNC__(string, string)跨年周偏移計算錯誤", -2, weekOffset); System.out.println("ok。offset:" + weekOffset); } }
測試通過,核心功能無誤,可以准備打包發布hive環境了。當然是打jar包了。
4. 注冊udf並測試
將前面打好的包放到hive環境可觸達的地方,運行加載命令!
add jar /home/hadoop/WeekDiffUdf.jar
運行hive測試用命:(即相當於將前面的單元測試,翻譯成sql在hive中進行測試)
# 創建臨時函數,以便進行測試 create temporary function week_diff as "com.y.udf.WeekDiffUdf"; select week_diff('2020-07-29') = 0 from default.dual; select week_diff('2020-07-20') = -1 from default.dual; select week_diff('2020-01-01', '2020-01-08 10:00:01') = -1 from default.dual; select week_diff('2020-01-01', '2019-12-30 10:00:01') = 1 from default.dual; select week_diff(to_date(from_unixtime(UNIX_TIMESTAMP('2020-07-28',"yyyy-MM-dd")))) = 0 from default.dual; select week_diff(to_date(from_unixtime(UNIX_TIMESTAMP('2020-07-28',"yyyy-MM-dd"))), current_date) = 0 from default.dual; # hive 外部會解析好字段值,再代入計算的 select my_date,week_diff(my_date) from default.account_for_test;
如上結果,你應該會得到n個true返回值,否則單測不通過。最后一個sql只是為了驗證實際運行時被代入變量的情況,意義不大。
運行單測完成后,功能就算完成了。我們可以正式發布了,進行永久注冊!如下:
CREATE FUNCTION week_diff AS 'com.y.udf.WeekDiffUdf' USING JAR 'hdfs://hadoop001:9000/lib/hadoop/WeekDiffUdf.jar';
如此,一個自然周偏移函數udf 就完成了,你就可以像使用hive通用sql也一樣去寫業務了。
可以通過 show functions; 查看已經注冊了的函數列表。
要刪除已經注冊的函數:
drop temporary function week_diff; drop function week_diff;