hive語法和運行參數層面,主要寫出高效運行SQL,並且利用一些運行參數進行調優SQL執行
查看hive執行計划
hive的SQL語句在執行之前需要將SQL語句轉換成MapReduce任務,因此需要了解轉換過程,可以再SQL語句中輸入如下命令查看具體的執行計划。
explain [extended] query --查看執行計划,添加extended關鍵字可以查看更詳細的查詢計划
示例如下
explain select department, count(*) as total from student where age >= 18 group
by department order by total desc limit 3;
關於 Hive 的執行計划中的 Operator 的概念:(邏輯執行計划:Operator Tree)
select ... from ... where ... group by ... having ... order by .... limit ....
select: FetchOperator
from: TableScanOperator
where+having: FilterOperator
.....
列裁剪
列裁剪就是在查詢時只讀取需要列,當列很多或者數據量很大時,如果查詢時不指定字段(如:select * from table)執行全表掃描效率很低。
hive在讀取數據時,可以只讀取計算中所需要用到的列的數據,忽略不需要計算使用數據列,這樣可以節省數據讀取開銷、中間表存儲開銷和數據整合開銷。
set hive.optimize.op = true; --開啟列裁剪,讀取數據只加載所需要計算數據,此設置默認為true
謂詞下推
將SQL語句中的where謂詞邏輯都盡可能提前執行,減少下游處理的數據量。對應邏輯優化器是PredicatePushDown。
set hive.optimize.ppd = true; --此設置默認為true
示例如下
select a.*,b.* from a join b on a.id = b.id where b.age > 20;
優化后:
select a.*,c.* from a join (select * from b where age > 20)c on a.id = c.id
分區裁剪
分區裁剪就是只讀取所需要的分區。當分區很多且數量特別大時,如果不指定分區執行全表掃描效率就很低。所以在查詢過程中只選擇所需分區數據可以減少讀入數據量提高查詢效率。
set hive.optimize.pruner = true; --此設置默認為true
在hive SQL解析階段對應的則是ColumnPruner邏輯優化器。
合並小文件
大數據技術組件主要存在問題
- 存儲組件主要是海量小文件
- 計算組件主要是數據傾斜
如果MapReduce job 碰到一堆小文件作為輸入,一個文件啟動一個Task任務,在MapReduce編程模型中CombineFileInputFormat 可以將一個節點或者同一個機架上的多個小文件划分為同一個切片,hive默認使用InputFormat的TextInputFormat處理。
-
Map 輸入合並
在執行 MapReduce 程序的時候,一般情況是一個文件的一個數據分塊需要一個 mapTask 來處理。但是如果數據源是大量的小文件,這樣就會啟動大量的 mapTask 任務,這樣會浪費大量資源。可以將輸入的小文件進行合並,從而減少 mapTask 任務數量。
--Map端輸入合並文件之后按照block的大小分割(默認設置) set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; --Map端輸入不合並小文件 set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
-
Map和Reduce輸出合並
大量的小文件會給 HDFS 帶來壓力,影響處理效率。可以通過合並 Map 和 Reduce 的結果文件來消除影響。如果多個節點都是一個小文件,這種情況主要是在輸出並沒有進行文件合並,且輸入切片為256M,則需要通過進行合並小文件進行輸出合並。
set hive.merge.mapfiles=true; --Map輸出文件合並,默認設置為true set hive.merge.mapredfiles=true; --Reduce端輸出文件合並,默認設置為false set hive.merge.size.per.task=256000000; --合並文件的大小,默認值為256000000(256M) set mapred.max.split.size=256000000; --每個Map 最大分割大小 set mapred.min.split.size.per.node=1; --節點上split的最少值,服務器節點 set mapred.min.split.size.per.rack=1; // 服務器機架
hive。merge.size.per.task 和 mapred.min.split.size.per.node聯合配置
- 默認情況把這個節點上的所有數據進行合並,如果合並的那個文件超過了256M就需要開啟另外一個文件繼續合並
- 如果當前節點上的數據不足256M,那么久合並成一個邏輯切片。
MapTask並行度
當Mapper階段計算數據來源
-
MapReduce中的MapTask的並行度機制
現有100個節點,每個任務512M的數據,但是只有40個節點運行一個任務
- Map數過大:當輸入文件特別大,MapTask 特別多,每個計算節點分配執行的MapTask都很多,這時候可以考慮減少MapTask的數量,增大MapTask處理的數據量,而且MapTask過多最終生成的結果文件數也會增多。
- Map 階段輸出文件太小,產生大量小文件
- 初始化和創建Map開銷很大
- Map數過小:當輸入文件很大,任務邏輯復雜,MapTask執行非常慢時,可以考慮增加MapTask數,來使得每個MapTask處理的數據量減少,從而提高任務的執行效率。
- 文件處理或查詢並發度小,Job 執行時間過長
- 大量作業時,容易阻塞集群
在MapReduce編程實例中,一個MapReduce Job的MapTask數量是有輸入切片InputSplit 決定的,而輸入分片是由FileInputFormat.getSplit() 決定,一個輸入分片對應一個MapTask,而輸入分片是由下列參數決定
參數 默認值 意義 dfs.blocksize 128M HDFS默認數據塊大小 mapreduce.input.fileinputformat.split.minsize 1M 最小分片大小(MR) mapreduce.input.fileinputformat.split.maxsize 256M 最大分片大小(MR) 輸入分片大小計算:針對數據是原始數據情況,最終調整splitsize的大小最好是blocksize的整數倍,計算規則如下
long split = Math.max(minsize,Math.min(maxsize,blocksize))
默認情況下,輸入分片大小和HDFS集群默認數據塊大小一致,即默認一個數據塊啟動一個MapTask處理,可以避免服務器節點之間數據傳輸提高Job處理效率。
控制MapTask數方案
- 減少MapTask 數通過合並小文件,主要是針對數據源做處理
- 增加MapTask數可以控制Job的ReduceTask個數
- 推薦使用HDFS默認切塊大小,若要調整需將調整成為HDFS默認切塊大小的N倍
- Map數過大:當輸入文件特別大,MapTask 特別多,每個計算節點分配執行的MapTask都很多,這時候可以考慮減少MapTask的數量,增大MapTask處理的數據量,而且MapTask過多最終生成的結果文件數也會增多。
-
合理控制MapTask數量
- 減少MapTask數可以通過合並小文件來實現
- 增加MapTask數可以通過控制ReduceTask默認MapTask個數
計算方式如下:
輸入文件總數為:total_size HDFS設置數據塊大小:dfs_block_size 默認MapTask個數為 default_mapper_num = total_size/dfs_block_size
MapReduce 中提供了如下參數來控制 map 任務個數,從字面上看,貌似是可以直接設置 MapTask 個數的樣子,但是很遺憾不行,這個參數設置只有在大於 default_mapper_num 的時候,才會生效。
set mapred.map.tasks=10; --默認值為2
如果要減少MapTask數量可以通過設置mapred.min.split.size設置每個任務處理文件的大小,設置文件大小只有大於dfs_block_size時才會生效。計算方式如下:
split_size = max(mapred.min.split.size, dfs_block_size) split_num = total_size / split_size compute_map_num = Math.min(split_num, Math.max(default_mapper_num,mapred.map.tasks))
控制Mapper個數方法
- 若想增加MapTask個數,設置mapred.map.tasks為一個較大值
- 若想減少MapTask個數,設置map.min.split.size為一個較大值
- 如果輸入大量小文件想減少Mapper個數,可以通過設置hive.input.format 合並小文件
如果想要調整 mapper 個數,在調整之前,需要確定處理的文件大概大小以及文件的存在形式(是大量小文件,還是單個大文件),然后再設置合適的參數。不能盲目進行暴力設置,不然適得其反。
MapTask 數量與輸入文件的 split 數息息相關,在 Hadoop 源碼org.apache.hadoop.mapreduce.lib.input.FileInputFormat 類中可以看到 split 划分的具體邏輯。可以直接通過參數 mapred.map.tasks (默認值2)來設定 MapTask 數的期望值,但它不一定會生效。
ReduceTask並行度
如果 ReduceTask 數量過多,一個 ReduceTask 會產生一個結果文件,這樣就會生成很多小文件,那么如果這些結果文件會作為下一個 Job 的輸入,則會出現小文件需要進行合並的問題,而且啟動和初始化ReduceTask 需要耗費資源。
如果 ReduceTask 數量過少,這樣一個 ReduceTask 就需要處理大量的數據,並且還有可能會出現數據傾斜的問題,使得整個查詢耗時長。 默認情況下,Hive 分配的 reducer 個數由下列參數決定:
Hadoop MapReduce 程序中,ReducerTask 個數的設定極大影響執行效率,ReducerTask 數量與輸出文件的數量相關。如果 ReducerTask 數太多,會產生大量小文件,對HDFS造成壓力。如果ReducerTask 數太少,每個ReducerTask 要處理很多數據,容易拖慢運行時間或者造成 OOM。這使得Hive 怎樣決定 ReducerTask 個數成為一個關鍵問題。遺憾的是 Hive 的估計機制很弱,不指定ReducerTask 個數的情況下,Hive 會猜測確定一個ReducerTask 個數,基於以下兩個設定:
參數1:hive.exec.reducers.bytes.per.reducer --默認為256M
參數2:mapreduce.exec.reduces.max --默認為1009
參數3:mapreduce.job.reduces --默認值為-1,標識未進行設置,按照設置為2執行
ReduceTask計算公式
N = Math.min(參數2,輸入數據大小/參數1)
可以通過改變上述兩個參數的值來控制 ReduceTask 的數量。 也可以通過下列參數進行設置
set mapred.map.tasks=10;
set mapreduce.job.reduces=10;
通常情況下,有必要手動指定 ReduceTask 個數。考慮到 Mapper 階段的輸出數據量通常會比輸入有大幅減少,因此即使不設定 ReduceTask 個數,重設 參數2 還是必要的。
依據經驗,可以將 參數2 設定為 M * (0.95 * N) (N為集群中 NodeManager 個數)。一般來說,NodeManage 和 DataNode 的個數是一樣的。
Join優化
-
Join優化整體原則
- 優先考慮再進行Join操作,最大限度地減少參與Join的數據量(where能用就用)
- 小表join大表,最好啟動mapjoin,hive自動啟動mapjoin,小表不能超過25M(默認25M可以進行更改)
- join on 條件相同最好放入同一個Job中,並且join表的排列順序從小到大
- 多張表join操作,多個連接條件都相同會進行轉化到同一個Job中
-
有限過濾數據
盡量減少每個階段數據量,對於分區表能用分區字段過濾盡量使用,同時只選擇后面需要使用到的列,最大限度減少參與join的數據量。
-
小表join大表原則
小表join大表時應遵循小表join大表原則,因為join操作reduce階段位於join左邊表的內容會加載到內存中,將數據量小的表放在左邊,可以有效減少發生內存溢出幾率,join執行是從左到右生成join,應該保證連續查詢中標的大小從左到右依次遞增的順序。
-
使用相同連接鍵
在hive中,當對 3 個或更多張表進行 join 時,如果 on 條件使用相同字段,那么它們會合並為一個MapReduce Job,利用這種特性,可以將相同的 join on 放入一個 job 來節省執行時間。
-
盡量原子操作
盡量避免一個SQL包含復雜的邏輯,可以使用中間表來完成復雜的邏輯。
-
大表join大表
- 空key過濾:有時join超時是因為某些key對應的數據太多,而相同key對應的數據都會發送到相同的reducer上,從而導致內存不夠。此時我們應該仔細分析這些異常的key,很多情況下,這些key對應的數據是異常數據,我們需要在SQL語句中進行過濾。
- 空key轉換:有時雖然某個key為空對應的數據很多,但是相應的數據不是異常數據,必須要包含在join的結果中,此時我們可以表a中key為空的字段賦一個隨機的值,使得數據隨機均勻地分不到不同的reducer上
啟動MapJoin
關於hive調優中,能用就用的原則和方法:
- where 能用則用原則,數據join執行能進行數據過濾就進行數據過濾
- mapjoin 在資源充足情況下進行開啟
- 再不改變實際業務計算的情況下,可以進行局部combiner
MapJoin 是將 join 雙方比較小的表直接分發到各個 map 進程的內存中,在 map 進程中進行 join 操作,這樣就不用進行 reduce 步驟,從而提高了速度。只有 join 操作才能啟用 MapJoin。參數設置如下:
--是否根據輸入小表的大小,自動將reduce端的common join 轉化為map join,將小表刷入內存中
set hive.auto.convert.join = true; --對應邏輯優化器是MapJoinProcessor
set hive.mapjoin.smalltable.filesize = 25000000; --刷入內存表的大小(字節) 25M = 2G
set hive.auto.convert.join.noconditionaltask=true; --hive會基於表的size自動的將普通join轉換成mapjoin
set hive.auto.convert.join.noconditionaltask.size=10000000; --多大的表可以自動觸發放到內層LocalTask中,默認大小10M
Hive 可以進行多表 Join。Join 操作尤其是 Join 大表的時候代價是非常大的。MapJoin 特別適合大小表join的情況。在Hive join場景中,一般總有一張相對小的表和一張相對大的表,小表叫 build table,大表叫 probe table。Hive 在解析帶 join 的 SQL 語句時,會默認將最后一個表作為 probe table,將前面的表作為 build table 並試圖將它們讀進內存。如果表順序寫反,probe table 在前面,引發 OOM 的風險就高了。在維度建模數據倉庫中,事實表就是 probe table,維度表就是 build table。這種 Join 方式在 map 端直接完成 join 過程,消滅了 reduce,效率很高。而且 MapJoin 還支持非等值連接。
當 Hive 執行 Join 時,需要選擇哪個表被流式傳輸(stream),哪個表被緩存(cache)。Hive 將JOIN 語句中的最后一個表用於流式傳輸,因此我們需要確保這個流表在兩者之間是最大的。如果要在不同的 key 上 join 更多的表,那么對於每個 join 集,只需在 ON 條件右側指定較大的表。可以進行手動開啟MapJoin如下:
--SQL方式,在SQL語句中添加MapJoin標記(mapjoin hint),將小表放到內存中,省去shffle操作
SELECT /*MAPJOIN(smalltable)*/ smalltable.key,bigtable.value FROM smalltable JOIN bigtable ON smalltable.key = bigtable.key
Sort-Merge-Bucket(SMB)Map Join是一種Hive Join 優化技術,使用這個技術的前提是所有的表都必須是分桶表(bucket)和分桶排序的(sort),並對分桶表進行優化。具體實現如下:
- 針對參與join的這兩張做相同的hash散列,每個桶里面的數據還要排序
- 兩張表數據個數必須成倍數
- 開啟SMB Join
具體參數設置如下:
--當用戶執行bucket map join的時候,發現不能執行時,禁止查詢
set hive.enforce.sortmergebucketmapjoin=false;
-- join的表通過sort merge join的條件,join是否會自動轉換為sort merge join
set hive.auto.convert.sortmerge.join=true;
--當兩個分桶表 join 時,如果 join on的是分桶字段,小表的分桶數是大表的倍數時,可以啟用mapjoin 來提高效率。
set hive.optimize.bucketmapjoin=false; --bucket map join優化,默認值是 false
set hive.optimize.bucketmapjoin.sortedmerge=false; --bucket map join 優化,默認值是 false
join數據傾斜優化
在編寫join查詢語句時,如果確定是由於join出現的數據傾斜,那么請做如下設置
--join的鍵對應的記錄條數超過這個值則會進行分拆,值根據具體數據量設置
set hive.skewjoin.key=100000;
set hive.optimize.skewjoin=false; --如果是join過程出現傾斜應該設置為true
如果開啟了,在 Join 過程中 Hive 會將計數超過閾值 hive.skewjoin.key(默認100000)的傾斜 key 對應的行臨時寫進文件中,然后再啟動另一個 job 做 map join 生成結果。
通過 hive.skewjoin.mapjoin.map.tasks 參數還可以控制第二個 job 的 mapper 數量,默認10000。
set hive.skewjoin.mapjoin.map.tasks=10000;
CBO優化
join的時候表的順序關系:前面的表都會被夾在到內存中,后面的表進行磁盤掃描
select a.*,b.*,c.* from a join b on a.id = b.id join c on a.id = c.id;
Hive 自 0.14.0 開始,加入了一項 "Cost based Optimizer" 來對 HQL 執行計划進行優化,這個功能通過 "hive.cbo.enable" 來開啟。在 Hive 1.1.0 之后,這個 feature 是默認開啟的,它可以 自動優化 HQL中多個 Join 的順序,並選擇合適的 Join 算法。
CBO成本優化器,代價最小的執行計划就是最好的執行計划。傳統的數據庫,成本優化器做出最優化的執行計划是依據統計信息來計算的。Hive 的成本優化器也一樣。
Hive 在提供最終執行前,優化每個查詢的執行邏輯和物理執行計划。這些優化工作是交給底層來完成的。根據查詢成本執行進一步的優化,從而產生潛在的不同決策:如何排序連接,執行哪種類型的連接,並行度等等。
要使用基於成本的優化(也稱為 CBO),請在查詢開始設置以下參數:
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
笛卡爾積處理
需求:笛卡爾積操作,默認情況下是不允許使用的!底層使用一個ReduceTask來做。笛卡爾積實現方式如下:
select a.*, b.* from a, b;
select a.*, b.* from a, b where a.id = b.id;
select a.*, b.* from a join b on a.id = b.id;
select a.*, b.* from a cross join b
當 Hive 設定為嚴格模式(hive.mapred.mode=strict,nonstrict)時,不允許在 HQL 語句中出現笛卡爾積,這實際說明了 Hive 對笛卡爾積支持較弱。因為找不到 Join key,Hive 只能使用 1 個 reducer 來完成笛卡爾積。
當然也可以使用 limit 的辦法來減少某個表參與 join 的數據量,但對於需要笛卡爾積語義的需求來說,經常是一個大表和一個小表的 Join 操作,結果仍然很大(以至於無法用單機處理),這時 MapJoin 才是最好的解決辦法。MapJoin,顧名思義,會在 Map 端完成 Join 操作。這需要將 Join 操作的一個或多個表完全讀入內存。
PS:MapJoin 在子查詢中可能出現未知 BUG。在大表和小表做笛卡爾積時,規避笛卡爾積的方法是,給 Join 添加一個 Join key,原理很簡單:將小表擴充一列 join key,並將小表的條目復制數倍,joinkey 各不相同;將大表擴充一列 join key 為隨機數。
精髓就在於復制幾倍,最后就有幾個 reduce 來做,而且大表的數據是前面小表擴張 key 值范圍里面隨機出來的,所以復制了幾倍 n,就相當於這個隨機范圍就有多大 n,那么相應的,大表的數據就被隨機的分為了 n 份。並且最后處理所用的 reduce 數量也是 n,而且也不會出現數據傾斜。
兩張表:
table_a(小表):a,b,c,d
table_b(大表):1,2,3,4,5,6,7,8,9
group by 優化
默認情況下,Map 階段同一個 Key 的數據會分發到一個 Reduce 上,當一個 Key 的數據過大時會產生數據傾斜。進行 group by 操作時可以從以下兩個方面進行優化:
-
Map端部分預聚合
事實上並不是所有的聚合操作都需要在 Reduce 部分進行,很多聚合操作都可以先在 Map 端進行部分聚合,然后在 Reduce 端的得出最終結果。
set hive.map.aggr=true; //開啟Map端聚合參數設置 //設置Map端聚合的行數閾值,超過此值時則會拆分job,默認值為100000 set hive.groupby.mapaggr.checkinterval=100000;
-
有數據傾斜的時候進行負載均衡
當HQL語句使用group by 時數據出現傾斜,如果該設置為true,那么hive會自動進行負載均衡。策略就是把MapReduce任務拆分兩個:
- 第一個做預匯總
- 第二個做最終匯總
// 自動優化,有數據傾斜的時候進行負載均衡(默認false) set hive.groupby.skewindata=false;
當設定為true時,生成查詢計划有兩個MapReduce任務
- 在第一個MapReduce任務中,map 的輸出結果會隨機分布到 reduce 中,每個 reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同的
group by key
有可能分發到不同的 reduce 中,從而達到負載均衡的目的; - 第二個 MapReduce 任務再根據預處理的數據結果按照
group by key
分布到各個 reduce 中,最后完成最終的聚合操作。
Map 端部分聚合:並不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端進行部分聚合,最后在 Reduce 端得出最終結果,對應的優化器為 GroupByOptimizer。例如:
select t.a, sum(t.b), count(t.c), count(t.d) from table t group by t.a;
優化后SQL:
select t.a, sum(t.b), count(t.c), count(t.d) from ( select a,b,null c,null d from table union all select a,0 b,c,null d from table group by a,c union all select a,0 b,null c,d from table group by a,d ) t;
order by優化
cluster by = distribute by + sort by 可以使用多個ReduceTask運行
order by 只能使用一個ReduceTask運行
order by 只能是在一個 reduce 進程中進行,所以如果對一個大數據集進行 order by ,會導致一個reduce 進程中處理的數據相當大,造成查詢執行緩慢。
- 在最終結果上進行order by,不要在中間的大數據集上進行排序。如果最終結果較少,可以在一個reduce上進行排序時,那么就在最后的結果集上進行order by。
- 如果是取排序后的前N條數據,可以使用distribute by和sort by在各個reduce上進行排序后前N條,然后再對各個reduce的結果集合合並后在一個reduce中全局排序,再取前N條,因為參與全局排序的order by的數據量最多是reduce個數 * N,所以執行效率會有很大提升。
在Hive中,關於數據排序,提供了四種語法,一定要區分這四種排序的使用方式和適用場景。
- order by:全局排序,缺陷是只能使用一個reduce
- sort by:單機排序,單個reduce結果有序
- cluster by:對同一字段分桶並排序,不能和sort by連用
- distribute by + sort by:分桶,保證同一字段值只存在一個結果文件當中,結合sort by保證每個reduceTask結果有序
Hive HQL 中的 order by 與其他 SQL 方言中的功能一樣,就是將結果按某字段全局排序,這會導致所有 map 端數據都進入一個 reducer 中,在數據量大時可能會長時間計算不完。
如果使用 sort by,那么還是會視情況啟動多個 reducer 進行排序,並且保證每個 reducer 內局部有序。為了控制map 端數據分配到 reducer 的 key,往往還要配合 distribute by 一同使用。如果不加distribute by 的話,map 端數據就會隨機分配到 reducer,根據優化提供兩種全局排序方式如下
建表導入數據
create table if not exists student(id int, name string, sex string, age int,
department string) row format delimited fields terminated by ",";
load data local inpath "/home/bigdata/students.txt" into table student;
-
第一種直接使用order by來做。如果結果數據量很大,這個任務的執行效率會非常低,實現SQL如下
select id,name,age from student order by age desc limit 3;
-
使用distribute by + sort by 多個reduceTask,每個reduceTask分別有序
set mapreduce.job.reduces=3; create table student_orderby_result as select * from student distribute by (case when age > 20 then 0 when age < 18 then 2 else 1 end) sort by (age desc);
關於上述實現SQL中分界值的確定可以通過采樣的方式來估計數據分布規律。
Count Distinct 優化
當要統計某一列去重數時,如果數據量很大,count(distinct) 就會非常慢,原因與 group by 類似,count(distinct) 邏輯只會有很少的 reducer 來處理。這時可以用 group by 來改寫:
// 直接使用 count(distinct xx)
select count(distinct id) from student;
//使用group by進行統計替代distinct
select count(1) from (
select age from student
where department >= "MA"
group by age
) t;
具體示例如下:
優化前 ,一個普通的只使用一個reduceTask來進行count(distinct) 操作,實現SQL如下
// 優化前(只有一個reduce,先去重再count負擔比較大)
select count(distinct id) from tablename;
優化后 ,但是這樣寫會啟動兩個MR job(單純 distinct 只會啟動一個),所以要確保數據量大到啟動job 的 overhead 遠小於計算耗時,才考慮這種方法。當數據集很小或者 key 的傾斜比較明顯時,group by 還可能會比 distinct 慢。
-- 優化后(啟動兩個job,一個job負責子查詢(可以有多個reduce),另一個job負責count(1)):
select count(1) from (select distinct id from tablename) tmp;
select count(1) from (select id from tablename group by id) tmp; // 推薦使用這種
怎樣寫in/exists語句
在hive的早期版本中,in/exists語法是不支持的,但是從hive-0.8X以后的版本開始支持此語法,但是不推薦使用。雖然經過測驗,Hive-2.3.6 也支持 in/exists 操作,但還是推薦使用 Hive 的一個高效替代方案:left semi join,例如下列SQL實現
-- in / exists 實現
select a.id, a.name from a where a.id in (select b.id from b);
select a.id, a.name from a where exists (select id from b where a.id = b.id);
使用join改寫后SQL實現
select a.id, a.name from a join b on a.id = b.id;
最終使用left semi join 實現
select a.id, a.name from a left semi join b on a.id = b.id;
vectorization技術
vectorization:矢量計算技術,在計算類似 scan, filter, aggregation 的時候, vectorization 技術以設置批處理的增量大小為 1024 行單次來達到比單條記錄單次獲得更高的效率。開啟設置如下
set hive.vectorized.execution.enabled=true ;
set hive.vectorized.execution.reduce.enabled=true;
多重模式
如果 碰到一堆SQL且這一堆SQL的模式一樣,都是從同一個表進行掃描做不通邏輯。
可優化的地方:如果有n條SQL,每個SQL執行都會掃描一次這張表,如下執行SQL多次掃描同一張表進行存儲
insert .... select id,name,sex, age from student where age > 17;
insert .... select id,name,sex, age from student where age > 18;
insert .... select id,name,sex, age from student where age > 19;
上述SQL執行插入會掃描多次同一張表,導致執行效率下降,針對此類型SQL執行優化過程如下
// 原SQL執行根據不同值插入不同分區
insert .... select id,name,sex, age from student where age > 17;
insert .... select id,name,sex, age from student where age > 18;
insert .... select id,name,sex, age from student where age > 19;
優化后如下
from student
insert int t_ptn partition(city=A) select id,name,sex, age where city= A
insert int t_ptn partition(city=B) select id,name,sex, age where city= B
insert int t_ptn partition(city=C) select id,name,sex, age where city= C
如果一個 HQL 底層要執行 10 個 Job,那么能優化成 8 個一般來說,肯定能有所提高,多重插入就是一個非常實用的技能。一次讀取,多次插入,有些場景是從一張表讀取數據后,要多次利用,這時可以使用 multi insert 語法,執行SQL示例如下:
from sale_detail
insert overwrite table sale_detail_multi partition (sale_date='2019',region='china' )
select shop_name, customer_id, total_price where .....
insert overwrite table sale_detail_multi partition (sale_date='2020',region='china' )
select shop_name, customer_id, total_price where .....;
說明:multi insert語法有一些限制
- 一般情況下,單個SQL中最多可以寫128路輸出,超過128路,則報語法錯誤。
- 在一個multi insert中,對於分區表,同一個目標分區不允許出現多次;對於未分區表,該表不能出現多次。
- 對於同一張分區表的不同分區,不能同時有insert overwrite和insert into操作,否則報錯返回。
Multi-Group by 是 Hive 的一個非常好的特性,它使得 Hive 中利用中間結果變得非常方便。例如:
FROM (SELECT a.status, b.school, b.gender FROM status_updates a JOIN profiles b
ON (a.userid = b.userid and a.ds='2019-03-20' )) subq1
INSERT OVERWRITE TABLE gender_summary PARTITION(ds='2019-03-20')
SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
INSERT OVERWRITE TABLE school_summary PARTITION(ds='2019-03-20')
SELECT subq1.school, COUNT(1) GROUP BY subq1.school;
上述查詢語句使用了 Multi-Group by 特性連續 group by 了 2 次數據,使用不同的 Multi-Group by。這一特性可以減少一次 MapReduce 操作。
中間結果壓縮
map輸出壓縮設置參數如下:
set mapreduce.map.output.compress=true;
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
中檢結果壓縮
中間數據壓縮就是對 hive 查詢的多個 Job 之間的數據進行壓縮。最好是選擇一個節省CPU耗時的壓縮方式。可以采用 snappy 壓縮算法,該算法的壓縮和解壓效率都非常高。參數設置如下
set hive.exec.compress.intermediate=true;
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.intermediate.compression.type=BLOCK;
結果數據壓縮
最終的結果數據(Reducer輸出數據)也是可以進行壓縮的,可以選擇一個壓縮效果比較好的,可以減少數據的大小和數據的磁盤讀寫時間; 注:常用的 gzip,snappy 壓縮算法是不支持並行處理的,如果數據源是 gzip/snappy壓縮文件大文件,這樣只會有有個 mapper 來處理這個文件,會嚴重影響查詢效率。 所以如果結果數據需要作為其他查詢任務的數據源,可以選擇支持 splitable 的 LZO 算法,這樣既能對結果文件進行壓縮,還可以並行的處理,這樣就可以大大的提高 job 執行的速度了。參數設置如下
set hive.exec.compress.output=true;
set mapreduce.output.fileoutputformat.compress=true;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
在Hadoop集群中支持的壓縮算法如下
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.DeflateCodec
org.apache.hadoop.io.compress.SnappyCodec
org.apache.hadoop.io.compress.Lz4Codec
com.hadoop.compression.lzo.LzoCodec
com.hadoop.compression.lzo.LzopCodec