大數據篇:Hive


大數據篇:Hive

hive.apache.org

Hive是什么?

Hive是Facebook開源的用於解決海量結構化日志的數據統計,是基於Hadoop的一個數據倉庫工具,可以將結構化的數據文件映射為一張表,並且提供類SQL查詢功能,本質是將HQL轉化成MapReduce程序。

數據存儲在HDFS,分析數據底層實現默認是MapReduce,執行程序運行在Yarn上。

如果沒有Hive

想象一下數據統計的時候寫大量的MapReduce程序,那會是多么痛苦。如果是寫SQL就開心多了,尤其是離線數據倉庫方面廣泛應用。

1 Hive和數據庫的區別

由於 Hive 采用了類似SQL 的查詢語言 HQL(hive query language),因此很容易將 Hive 理解為數據庫。其實從結構上來看,Hive 和數據庫除了擁有類似的查詢語言,再無類似之處。

  • 1 查詢語言
    • 由於SQL被廣泛的應用在數據倉庫中,因此,專門針對Hive的特性設計了類SQL的查詢語言HQL。熟悉SQL開發的開發者可以很方便的使用Hive進行開發。
  • 2 數據存儲位置
    • Hive 是建立在 Hadoop 之上的,所有 Hive 的數據都是存儲在 HDFS 中的。而數據庫則可以將數據保存在塊設備或者本地文件系統中。
  • 3 數據更新
    • 由於Hive是針對數據倉庫應用設計的,而數據倉庫的內容是讀多寫少的。因此,Hive中不支持對數據的改寫和添加,所有的數據都是在加載的時候中確定好的。而數據庫中的數據通常是需要經常進行修改的,因此可以使用 INSERT INTO … VALUES 添加數據,使用 UPDATE … SET修改數據。
  • 4 索引
    • Hive在加載數據的過程中不會對數據進行任何處理,甚至不會對數據進行掃描,因此也沒有對數據中的某些Key建立索引。Hive要訪問數據中滿足條件的特定值時,需要暴力掃描整個數據,因此訪問延遲較高。由於 MapReduce 的引入, Hive 可以並行訪問數據,因此即使沒有索引,對於大數據量的訪問,Hive 仍然可以體現出優勢。數據庫中,通常會針對一個或者幾個列建立索引,因此對於少量的特定條件的數據的訪問,數據庫可以有很高的效率,較低的延遲。由於數據的訪問延遲較高,決定了 Hive 不適合在線數據查詢。
  • 5 執行
    • Hive中大多數查詢的執行是通過 Hadoop 提供的 MapReduce 來實現的。而數據庫通常有自己的執行引擎。
  • 6 執行延遲
    • Hive 在查詢數據的時候,由於沒有索引,需要掃描整個表,因此延遲較高。另外一個導致 Hive 執行延遲高的因素是 MapReduce框架。由於MapReduce 本身具有較高的延遲,因此在利用MapReduce 執行Hive查詢時,也會有較高的延遲。相對的,數據庫的執行延遲較低。當然,這個低是有條件的,即數據規模較小,當數據規模大到超過數據庫的處理能力的時候,Hive的並行計算顯然能體現出優勢。
  • 7 可擴展性
    • 由於Hive是建立在Hadoop之上的,因此Hive的可擴展性是和Hadoop的可擴展性是一致的(世界上最大的Hadoop 集群在 Yahoo!,2009年的規模在4000 台節點左右)。而數據庫由於 ACID 語義的嚴格限制,擴展行非常有限。目前最先進的並行數據庫 Oracle 在理論上的擴展能力也只有100台左右。
  • 8 數據規模
    • 由於Hive建立在集群上並可以利用MapReduce進行並行計算,因此可以支持很大規模的數據;對應的,數據庫可以支持的數據規模較小。

2 Hive架構

Hive通過給用戶提供的一系列交互接口,接收到用戶的指令(SQL),使用自己的Driver,結合元數據(MetaStore),將這些指令翻譯成MapReduce,提交到Hadoop中執行,最后,將執行返回的結果輸出到用戶交互接口。

  • 1)用戶接口:Client
    • CLI(hive shell)、JDBC/ODBC(java訪問hive)、WEBUI(瀏覽器訪問hive,如:Hue)
  • 2)元數據:Metastore
    • 元數據包括:表名、表所屬的數據庫(默認是default)、表的擁有者、列/分區字段、表的類型(是否是外部表)、表的數據所在目錄等;
    • 默認存儲在自帶的derby數據庫中,推薦使用MySQL存儲Metastore
  • 3)Hadoop
    • 使用HDFS進行存儲,使用MapReduce進行計算。
  • 4)驅動器:Driver
    • (1)解析器(SQL Parser):將SQL字符串轉換成抽象語法樹AST,這一步一般都用第三方工具庫完成,比如antlr;對AST進行語法分析,比如表是否存在、字段是否存在、SQL語義是否有誤。
    • (2)編譯器(Physical Plan):將AST編譯生成邏輯執行計划。
    • (3)優化器(Query Optimizer):對邏輯執行計划進行優化。
    • (4)執行器(Execution):把邏輯執行計划轉換成可以運行的物理計划。對於Hive來說,就是MR/TEZ/Spark。

3 hive常用數據類型

mysql字段類型 hive:ods字段類型 hive:dwd字段類型
tinyint tinyint tinyint
int int int
bigint bigint bigint
varchar string string
datetime bigint string
bit boolean int
double double double

4 Hive常用命令

  • 鏈接hive的三種常用方式
  1. 使用hive腳本可以進入客戶端。我這里使用Hue進行演示。

  1. 使用hue直接操作hive

  1. hiveserver2 beeline鏈接方式
  • 啟動hiveserver2服務
hiveserver2
  • 啟動beeline
beeline
  • beeline鏈接hiveserver2,輸入用戶名密碼
!connect jdbc:hive2://cdh01.cm:10000

4.1 數據庫操作

  • 創建數據庫
CREATE DATABASE 數據庫名字

  • 查詢數據庫
show databases

  • 刪除數據庫
drop database 數據庫名字 
drop database 數據庫名字 cascade
  • 顯示數據庫信息
desc database 數據庫名字
desc database extended 數據庫名字
  • 切換數據庫
use 數據庫名字
  • 查詢數據庫擁有表
show tables
  • 查詢表信息
desc formatted 表名

4.2 創建刪除表

  • 建表語法

    • CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name 
          [(col_name data_type [COMMENT col_comment], ...)] 
          [COMMENT table_comment] 
          [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] 
          [CLUSTERED BY (col_name, col_name, ...) 
          [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] 
          [ROW FORMAT row_format] 
          [STORED AS file_format] 
          [LOCATION hdfs_path]
      
  • 建表常用參數

    • EXTERNAL:外部表關鍵字,Hive分內部表(元數據和數據會被一起刪除,一般不太重要的表如:中間臨時表)和外部表(只刪除元數據,不刪除數據,一般而言都采用外部表,因為數據安全些),一般而言采用外部表。

    • PARTITIONED BY:分區指定字段為'dt'。

      PARTITIONED BY (
        `dt` String COMMENT 'partition'
      )
      
    • row format delimited fields terminated by '\t' :數據使用什么做切分,這里使用制表符。

    • stored as parquet:文件存儲格式,推薦parquet(spark天然支持parquet)。

    • location '/warehouse/層名/庫名/表名' :文件存儲位置。

    • tblproperties ("parquet.compression"="snappy") :文件壓縮策略,推薦snappy。

    • CLUSTERED BY創建分桶表(抽樣場景使用)

    • SORTED BY排序(一般情況查詢語句都有排序,故不常用)

  • 建表例子

#刪除表
drop table if exists dwd.student

#創建表(庫名.表名)
CREATE EXTERNAL TABLE `dwd.student`(
  `ID` bigint COMMENT '',
  `CreatedBy` string COMMENT '創建人',
  `CreatedTime` string COMMENT '創建時間',
  `UpdatedBy`  string COMMENT '更新人',
  `UpdatedTime` string COMMENT '更新時間',
  `Version` int COMMENT '版本號',
  `name` string COMMENT '姓名'
  ) COMMENT '學生表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
location '/warehouse/dwd/test/student/'
stored as parquet
tblproperties ("parquet.compression"="snappy") 

創建內部表就是去掉EXTERNAL關鍵字,使用插入數據案例測試一下刪除表后重新建立的效果吧。

4.3 插入數據

INSERT INTO TABLE dwd.student partition(dt='2020-04-05') VALUES(1,"heaton","2020-04-05","","","1","zhangsan") 
INSERT INTO TABLE dwd.student partition(dt='2020-04-06') VALUES(2,"heaton","2020-04-06","","","1","lisi") 

4.4 查詢數據

SELECT * FROM dwd.student
SELECT * FROM dwd.student WHERE dt="2020-04-05"
SELECT * FROM dwd.student WHERE dt="2020-04-06"

4.4.1 分區解釋

  • 如上圖,2個不同的分區對應的都是HDFS上的獨立文件夾,該文件夾下是該分區所有的數據文件。Hive中的分區就是分目錄,把一個大的數據集根據業務需要分割成小的數據集。在查詢時通過WHERE字句中的表達式查詢指定的分區(如上面的WHERE dt="2020-04-05"),這樣的查詢效率會提高很多。
  • 多級分區,就是繼續分文件夾,在PARTITIONED BY 中用“,”號隔開分區字段即可,不建議超過2級分區。

4.5 本地load到hive

  • 根據上面建表語句建立student1表
  • 將student表2020-04-05分區中的數據加載到student1表2020-04-05分區中
LOAD DATA INPATH '/warehouse/dwd/test/student/dt=2020-04-05/000000_0' INTO TABLE dwd.student1 partition(dt='2020-04-05')

4.6 清除表數據

  • 注意:Truncate只能刪除內部表,不能刪除外部表數據
  • 注意:hive不支持delete,update(需要通過配置文件修改支持)
  • Truncate不經過事務,delete經過事務
TRUNCATE TABLE dwd.student1

4.7 將查詢結果插入到表中

  • INSERT INTO追加到表中
insert into table dwd.student1 partition(dt='2020-04-05') select id,createdby,createdtime,updatedby,updatedtime,version,name from dwd.student where dt="2020-04-06"

由於student1表2020-04-05分區中已經有一條張三數據,這時插入李四數據,查詢結果為2條數據

  • INSERT OVERWRITE復寫表數據
insert overwrite table dwd.student1 partition(dt='2020-04-05') select id,createdby,createdtime,updatedby,updatedtime,version,name from dwd.student where dt="2020-04-06"

由上面的例子使student1表2020-04-05分區中有2條數據,這時候在插入一條,發現只有一條,因為數據被復寫。

4.8 通過臨時表插入數據

  • with 臨時表名 as 查詢語句,如下例子為查詢兩張表數據,字段缺失可以補"0"或者"",插入結果表中
with

s1 as
(
 select id,createdby,createdtime,version,name from dwd.student where dt="2020-04-05"
),
s2 as
(
 select id,updatedby,updatedtime,version,name from dwd.student where dt="2020-04-06"
)

insert overwrite table dwd.student1 partition(dt='2020-04-06')
select 
	s1.id as id,
	s1.createdby as createdby,
	s1.createdtime as createdtime,
	"" as updatedby,
	"0" as updatedtime,
	s1.version as version,
	s1.name as name
from 
s1

union all

select 
	s2.id as id,
	"" as createdby,
	"0" as createdtime,
	s2.updatedby as updatedby,
	s2.updatedtime as updatedtime,
	s2.version as version,
	s2.name as name
from 
s2

4.9 Hive常用交互命令

  • hive -e “select * from xx”:hive命令行SQL語句

  • hive -f xx.hql:hive執行sql文件

4.10 分桶及抽樣查詢

  1. 分區針對的是數據的存儲路徑,分桶針對的是數據文件,分桶是將數據集分解成更容易管理的若干部分的一個技術。
  2. 創建表時使用 CLUSTERED BY(字段名) into 分桶數 buckets 指定分桶條件。
  3. load命令導入分桶表,數據不會分桶。
  4. 使用分桶功能需要設置hive屬性:set hive.enforce.bucketing=true; set mapreduce.job.reduces=-1;
  5. 對於非常大的數據集,有時用戶需要使用的是一個具有代表性的查詢結果,而不是全部結果,這時可以使用分桶來進行抽樣。抽樣語句:TABLESAMPLE(BUCKET x OUT OF y ON 分桶字段)

y必須是table總bucket數的倍數或者因子,根據Y值決定抽樣的比例,如:table共4個桶,當y=2時,抽取4/2=2個桶數據,等y=8時,抽取4/8=1/2的bucket數據。

x是從哪個桶開始抽,然后依次抽取x+y的桶數據,如:table共4個桶,當y=2,x=1時,抽取4/2=2個桶數據,桶數為1號,1+2=3號。

  • 案例
#建表sql
CREATE EXTERNAL TABLE `dwd.student_buck`(
  `ID` bigint COMMENT '',
  `name` string COMMENT '姓名'
  ) COMMENT '學生表'
CLUSTERED BY(ID) into 4 buckets
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/test/student_buck/'
tblproperties ("parquet.compression"="snappy") 

#插入數據
INSERT INTO TABLE dwd.student_buck VALUES(1,"zhangsan"),(2,"lisi") ,(3,"wangwu") ,(4,"zhaoliu") ,(5,"haqi") ,(6,"xiba") ,(7,"heijiu") ,(8,"washi") 

#抽樣查詢
SELECT * from dwd.student_buck TABLESAMPLE(BUCKET 1 OUT OF 4 ON id)

5 Hive 查詢操作特殊解釋

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select

5.1 join解釋

  1. Join語句只支持等值鏈接,不支持非等值鏈接。
  2. 除左、右、內鏈接外還支持全連接full join,但是效率較差,一般使用union all

5.2 排序分組操作

Order By 全局排序(全局一個Reducer)

Group By 分組(會根據跟的字段,發到不同的Reducer中)

Distribute By 類似MR中的partition進行分區,需結合Sort By使用,而且要寫在前面,如:根據日期分區,在根據年齡排序

#多reduce才有效果
select * from student distribute by createtime sort by age desc

Sort By每個Reducer內部排序,對全局結果來說不是排序,結合Distribute By使用。(多Reducer區內排序)

Cluster By 當分區字段和區內排序字段相同,也就是distribute by和sort by需要字段相同可以使用其代替,但是只能升序排列。

5.3 空字段賦值函數

NVL(string,替換值),如果string為NULL,則返回替換值,否則返回string值

5.4 時間轉換

# 1 格式化時間->  2020-04-05 00:00:00
SELECT date_format("2020-04-05","yyyy-MM-dd HH:mm:ss")
# 2 時間天數相加->  2020-04-05  ->2020-04-01
SELECT date_add("2020-04-05",4)
SELECT date_add("2020-04-05",-4)
# 3 時間天數相減->2020-04-01(一般直接用date_add了,這個不怎么用)
SELECT date_sub("2020-04-05",4)
# 4 兩個時間相減-> 4
SELECT datediff("2020-04-09","2020-04-05")
# 5 時間轉時間戳->  1586016000
SELECT unix_timestamp("2020-04-05","yyyy-MM-dd")
# 6 時間戳轉時間->  2020-04-05
SELECT from_unixtime(cast("1586016000" as BIGINT),"yyyy-MM-dd")
# 7 將日期轉為星期->  1 (星期一)
SELECT pmod(datediff("2020-04-06","1920-01-01")-3,7)

5.5 CASE WHEN 和 IF

財務部
財務部
財務部
科技部
人事部
人事部

如上圖假設表 emp 中有6個人,求每個部門下面的男女個數。

#使用CASE WHEN
select 
	department,
	sum(case sex when '男' then 1 else 0 end) man_count,
	sum(case sex when '女' then 1 else 0 end) woman_count,
from 
	emp
group by
	department
	
#還可以使用IF
select 
	department,
	sum(if(sex='男',1,0)) man_count,
	sum(if(sex='女',1,0)) woman_count,
from 
	emp
group by
	department

5.6 聚合拼接

# 1 concat拼接->  2020-04-05
SELECT concat("2020","-","04","-","05")
# 2 concat_ws拼接->  2020-04-05 注意 concat_ws可以傳collect_set,需要字段為string
SELECT concat_ws("-","2020","04","05")
# 3 匯總成Array類型字段-> [8,4,5,1,6,2,7,3]
SELECT collect_set(id) from dwd.student_buck

5.7 炸裂

EXPLODE(字段名):將一列復雜的array或者map結構拆分成多行

炸裂函數必須配合側寫視圖函數:LATERAL VIEW udtf(expression) 側寫視圖別名 AS 側寫結果列別名

  • 例子
# 建表
CREATE EXTERNAL TABLE `dwd.hobby`(
  `id` bigint COMMENT '',
  `name` string COMMENT '姓名',
  `hobby_name` array<string> COMMENT '愛好名字'
  ) COMMENT '愛好表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
collection items terminated by ','
stored as parquet
location '/warehouse/dwd/test/hobby/'
tblproperties ("parquet.compression"="snappy") 

# 插入3條數據
INSERT INTO TABLE dwd.hobby partition(dt='2020-04-05') 
SELECT 1,"zhangsan",array("籃球","足球","羽毛球","惡作劇")

INSERT INTO TABLE dwd.hobby partition(dt='2020-04-05') 
SELECT 2,"lisi",array("足球","惡作劇")

INSERT INTO TABLE dwd.hobby partition(dt='2020-04-05') 
SELECT 3,"wangwu",array("羽毛球","惡作劇")

# 使用側寫視圖將炸裂結果作為臨時視圖,聚合需要的結果
select 
    name,hobby_name_col 
from 
    dwd.hobby
lateral view explode(hobby_name) hobby_tmp as hobby_name_col

5.8 窗口函數

主要解決1行和n行數據無法聚合在一起展示的問題。

  • OVER():指定分析函數工作的數據窗口大小,這個數據窗口大小會隨着行的變化而變化
    • CURRENT ROW:當前行
    • n PRECEDING:往前n行數據
    • n FOLLOWING:往后n行數據
    • UNBOUNDED:起點,UNBOUNDED PRECEDING表示從前面的起點,UNBOUNDED FOLLOWING表示到后面的終點。
    • Partition By:分區
    • Order By:排序
    • Distribute By:區內分組
    • Sort By:區內排序
    • Group By:不可使用
  • LAG(col,n,默認值):往前n行數據
  • LEAD(col,n,默認值):往后n行數據
  • NTILE(n):類似於分桶,把數據有序分到n個桶里,序號從1開始。
  • RANK():排序相同時會重復,總數不會變
  • DENSE_RANK():排序相同時會重復,總數會減小
  • ROW_NUMBER():會根據順序計算

5.8.0 案例數據准備

#建表
CREATE EXTERNAL TABLE `dwd.order`(
  `name` string COMMENT '姓名',
  `order_date` string COMMENT '購買日期',
  `price` bigint  COMMENT '消費金額'
  ) COMMENT '訂單表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/test/order/'
tblproperties ("parquet.compression"="snappy") 


#插入數據
INSERT INTO TABLE `dwd.order` partition(dt='2020-04-05') VALUES("zhangsan","2020-01-01",15),("lisi","2020-01-02",22),("zhangsan","2020-04-01",34),("lisi","2020-04-01",15),("zhangsan","2020-04-04",42),("zhangsan","2020-03-01",24),("lisi","2020-02-01",65),("wangwu","2020-04-01",33),("zhangsan","2020-05-01",43),("zhangsan","2020-07-01",12),("wangwu","2020-02-05",32),("zhangsan","2020-03-06",22),("lisi","2020-04-07",14)

5.8.1 OVER()無參數

  • 查詢在2020年4月購買過的顧客名字及總人數
SELECT name,count(*) OVER() FROM dwd.`order` WHERE order_date BETWEEN "2020-04-01" AND "2020-04-30" GROUP BY name

5.8.2 OVER()無參數

  • 查詢顧客的購買明細及購買總額
SELECT *,sum(price) OVER() FROM dwd.`order` 

5.8.3 OVER()排序參數

  • 查詢顧客的購買明細及購買總額並按月累加展示
SELECT *,sum(price) OVER(ORDER BY order_date) FROM dwd.`order` 
SELECT *,sum(price) OVER(ORDER BY order_date DESC) FROM dwd.`order` 

over函數傳參,還是給所有結果集進行開窗,但是根據參數限定窗口大小,上面sql的意思為:

1號zhangsan數據窗口內只含有 1號張三

2號lisi數據窗口內含有 1號張三,2號李四

3號lisi數據窗口內含有 1號張三,2號李四,3號李四

依次類推

5.8.4 OVER()分組參數

  • 查詢顧客的購買明細及每個顧客的購買總額
SELECT *,sum(price) OVER(distribute by name) FROM dwd.`order` 

5.8.5 OVER()分組排序參數

  • 查詢顧客的購買明細及每個顧客的購買總額並按月累加展示
SELECT *,sum(price) OVER(distribute by name sort by order_date) FROM dwd.`order` 

5.8.6 CURRENT ROW->UNBOUNDED->PRECEDING->FOLLOWING窗口大小指定

  • 查詢顧客的購買明細及每個顧客的購買總額並直接累加展示
SELECT *,sum(price) OVER(distribute by name rows between UNBOUNDED PRECEDING and CURRENT ROW) FROM dwd.`order` 

5.8.7 LAG->LEAD 窗口值指定

  • 查詢顧客的購買明細並且追加客戶上次購買日期
SELECT *,lag(order_date,1,"1970-01-01") OVER(distribute by name sort by order_date) FROM dwd.`order` 

5.8.8 NTILE 分桶

SELECT *,ntile(5) OVER() FROM dwd.`order` 

  • 查詢當前20%的訂單信息
SELECT * FROM (SELECT *,ntile(5) OVER(ORDER BY order_date) ntile_5 FROM dwd.`order`) t1 WHERE t1.ntile_5=1 

5.8.9 ROW_NUMBER 排名

SELECT *,row_number() OVER(order by order_date) FROM dwd.`order` 

5.8.10 RANK 排名

SELECT *,rank() OVER(order by order_date) FROM dwd.`order` 

5.8.11 DENSE_RANK 排名

SELECT *,dense_rank() OVER(order by order_date) FROM dwd.`order` 

6 自定義函數

  • 用戶自定義函數類別可以分為三種:
    • UDF(一行進一行出),如:concat,split
    • UDAF(多行進一行出),如:count,sum,max
    • UDTF(一行進多行出),如:explode

https://cwiki.apache.org/confluence/display/Hive/HivePlugins

  • 添加jar包及函數的方式請參考6.1案例

6.1 自建UDF

必須有返回值,可以返回null,但是類型不能是void。

需要繼承UDF類,並且方法名為evaluate

  • maven-pom.xml
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>2.1.1</version>
    </dependency>
  • java代碼
import org.apache.hadoop.hive.ql.exec.UDF;

//模擬concat
public class MyUDF extends UDF {
    public String evaluate(String... args) {
        StringBuilder s = new StringBuilder();
        for (String arg : args) {
            s.append(arg);
        }
        return s.toString();
    }
}
  • 打包上傳服務器,進入hive命令行
# 1 添加jar包,注意服務器本地路徑(hive客戶端關閉則消失,需要重新添加,如果不想重新添加,可以直接使用4將jar包放入hive/lib下)
add jar /root/test/function-1.0-SNAPSHOT.jar
# 2 添加函數,注意hive中的函數別名,還有jar包全類名,如下為永久創建和臨時創建temporary(hive客戶端關閉則消失)
create function myconcat as "com.hive.function.udf.MyUDF"
create temporary function myconcat as "com.hive.function.udf.MyUDF"
# 3 加載jar包的第二種方式,上傳jar包至hdfs集群
# create temporary function myconcat as "com.hive.function.udf.MyUDF" using jar 'hdfs:///hive_jar/function-1.0-SNAPSHOT.jar';
# 4  加載jar包的第三種方式,直接放在hive的lib目錄下,啟動hive,使用2添加函數即可。下面是cdh環境的jars包路徑,配置軟連接到hive/lib下
# ln -s /opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/jars/function-1.0-SNAPSHOT.jar /opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/hive/lib
# 5 刪除函數
# drop function myconcat
  • 可以在hive中使用自建函數
SELECT myconcat("a","-","b","-","c")

6.2 自建UDAF

用戶自定義UDAF必須繼承UDAF,必須提供一個實現了UDAFEvaluator接口的內部類

  • java代碼
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

//模擬avg
public class MyUDAF extends UDAF {
    public static class AvgState {
        private long mCount;
        private double mSum;
    }

    public static class AvgEvaluator implements UDAFEvaluator {
        AvgState state;

        public AvgEvaluator() {
            super();
            state = new AvgState();
            init();
        }

        /**
         * init函數類似於構造函數,用於UDAF的初始化
         */
        public void init() {
            state.mSum = 0;
            state.mCount = 0;
        }

        /**
         * iterate接收傳入的參數,並進行內部的輪轉。其返回類型為boolean * * @param o * @return
         */

        public boolean iterate(Double o) {
            if (o != null) {
                state.mSum += o;
                state.mCount++;
            }
            return true;
        }

        /**
         * terminatePartial無參數,其為iterate函數遍歷結束后,返回輪轉數據, * terminatePartial類似於hadoop的Combiner * * @return
         */

        public AvgState terminatePartial() {
            // combiner
            return state.mCount == 0 ? null : state;
        }

        /**
         * merge接收terminatePartial的返回結果,進行數據merge操作,其返回類型為boolean * * @param o * @return
         */

        public boolean merge(AvgState avgState) {
            if (avgState != null) {
                state.mCount += avgState.mCount;
                state.mSum += avgState.mSum;
            }
            return true;
        }

        /**
         * terminate返回最終的聚集函數結果 * * @return
         */
        public Double terminate() {
            return state.mCount == 0 ? null : Double.valueOf(state.mSum / state.mCount);
        }
    }
}
  • 根據6.1創建函數進行函數創建,並使用

6.3 自建UDTF

用戶自定義UDAF必須繼承GenericUDTF,重寫initialize(),process(),close()方法。

  • java代碼
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;
import java.util.List;

//模擬EXPLODE和split
public class MyUDTF extends GenericUDTF {
    private List<String> dataList = new ArrayList<>();

    //初始化方法,返回對象結構校驗器
    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        //列名,會被用戶傳遞的覆蓋
        List<String> fieldNames = new ArrayList<>();
        fieldNames.add("word1");

        //返回列以什么格式輸出,這里是string,添加幾個就是幾個列,和上面的名字個數對應個數。
        List<ObjectInspector> fieldOIs = new ArrayList<>();
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }

    @Override
    public void process(Object[] objects) throws HiveException {
        //獲取數據
        String data = objects[0].toString();
        //獲取分隔符
        String splitKey = objects[1].toString();
        //切分數據
        String[] words = data.split(splitKey);
        //遍歷寫出
        for (String word : words) {
            //將數據放入集合
            dataList.clear();
            dataList.add(word);
            //寫出數據到緩沖區
            forward(dataList);
        }

    }

    @Override
    public void close() throws HiveException {
    //沒有流操作
    }
}
  • 根據6.1創建函數進行函數創建,並使用

7 優化

7.1 調整hive執行引擎

hive.execution.engine配置為spark

7.2 Fetch抓取

Fetch抓取修改為more,可以使全局查找,字段查找,limit查找等都不走計算引擎,而是直接讀取表對應儲存目錄下的文件,大大普通查詢速度。

hive.fetch.task.conversion配置為more

7.3 開啟本地模式

hive可以通過本地模式在單台機器上處理所有的任務,對於小的數據集,執行時間可以明顯被縮短。

hive-site.xml調整下面3個參數,開啟本地模式,文件不超過50M,個數不超過10個。

hive.exec.mode.local.auto=true

hive.exec.mode.local.auto.inputbytes.max=50000000 (50M左右,默認128M->134217728,機器資源足建議使用默認值)

hive.exec.mode.local.auto.input.files.max=10 (模式4個)

<property><name>hive.exec.mode.local.auto</name><value>true</value></property>
<property><name>hive.exec.mode.local.auto.inputbytes.max</name><value>50000000</value></property>
<property><name>hive.exec.mode.local.auto.input.files.max</name><value>10</value></property>

7.4 join優化

在join問題上,讓小表放在左邊 去左鏈接(left join)大表,這樣可以有效的減少內存溢出錯誤發生的幾率。

hive.auto.convert.join開啟。(默認開啟)

hive.mapjoin.smalltable.filesize(默認25000000->接近24M,如果機器內存足可以適當調大,需在hive-site.xml中設置,如7.3)

大表和大表join,空key會打到同一個reduce上,造成數據傾斜,任務緩慢,內存泄漏。(reduce任務某個非常慢,其他很快,及發生數據傾斜)

  • 空key過濾:使用子查詢過濾掉空key,可以有效的提升查詢速率。
  • 空key轉換:附隨機值,使其可以隨機均勻的分布在不同的reduce上,使用case when then或if判斷null值,賦予rand()函數。

7.5 Group By優化

默認情況下map階段同一個key發送給一個reduce,當一個key數據過大時就發生數據傾斜。

那么把某些聚合操作提到Map端進行部分聚合,最后在reduce端得出最終結果,也可以有效的提升執行效率。

hive.map.aggr開啟。(默認開啟)

hive.groupby.mapaggr.checkinterval=100000(默認100000條,在map端進行聚合操作的條目數目,需在hive-site.xml中設置,如7.3)

hive.groupby.skewindata=true(默認false,有數據傾斜時進行負載均衡,需在hive-site.xml中設置,如7.3)

hive.groupby.skewindata當選項設置為true時,生成的查詢計划會有兩個MR Job,第一個MR Job會將key加隨機數均勻的分布到Reduce中,做部分聚合操作(預處理),第二個MR Job在根據預處理結果還原原始key,按照Group By Key分布到Reduce中進行聚合運算,完成最終操作。

7.6 Count(Distinct)去重統計

數據量小的時候沒關系,大數據量下,由於Count Distinct操作需要用一個Reduce任務來完成,這一個Reduce需要處理的數據量太大,會導致Job緩慢,可以使用子查詢Group By再Count的方式替換。

7.7 行列過濾

列處理:不使用Select *,使用什么字段就寫什么字段,就算是所有字段,也要一一列出,養成好習慣。

行處理:在join操作中,不要直接關聯表后使用where條件,這樣會使全表關聯后在過濾,使用子查詢過濾后在join來替換,使查詢效率提高。如下

#錯誤寫法
select id from student s left join class c on s.cid=c.id where c.id<=10
#正確寫法
select id from student s left join (select id from class where id<=10 ) c on s.cid=c.id

7.8 動態分區

當數據會根據情況變化的時候,先有數據,再想分區的情況。

設計表時盡量避免動態分區,速度比靜態分區(也就是直接指定分區慢很多)。

  • 開啟動態分區參數

  • 必要參數

    • hive.exec.dynamic.partition=true; 默認:true。
    • hive.exec.dynamic.partition.mode=nostrict; 默認:strict至少有一個分區列是靜態分區,nostrict所有分區都可以是動態分區。
  • 相關參數

    • hive.exec.max.dynamic.partitions.pernode=100; 默認100,每一個執行mr節點上,允許創建的動態分區的最大數量
    • hive.exec.max.dynamic.partitions=1000; 默認1000,所有執行mr節點上,允許創建的所有動態分區的最大數量
    • hive.exec.max.created.files=100000; 默認100000,所有的mr job允許創建的文件的最大數量
    • hive.error.on.empty.partition=false; 默認false,當有空分區生成時,是否拋出異常。
  • 建表方式同靜態分區,插入方式要注意,查詢結果最后一個字段即為分區字段(不管名字是否一樣,會將最后一個字段的值,直接拿來分區)

#錯誤寫法:會將name值直接插入dt分區字段。
insert into table dwd.student1 partition(dt) 
select id,createdby,createdtime,updatedby,updatedtime,version,name from dwd.student 
#正確寫法
insert into table dwd.student1 partition(dt) 
select id,createdby,createdtime,updatedby,updatedtime,version,name,dt from dwd.student 

7.9 開啟並行計算

hive.exec.parallel=true;默認false。需在hive-site.xml中設置,如7.3

hive.exec.parallel.thread.number=16;默認8,同一個sql允許的最大並行度,針對集群資源適當增加。需在hive-site.xml中設置,如7.3

7.10 多用Explain

在執行的查詢sql前加上Explain指令,查詢分析sql執行過程。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM