Hive一些參數設置


在使用union all的時候,系統資源足夠的情況下,為了加快hive處理速度,可以設置如下參數實現並發執行

set mapred.job.priority=VERY_HIGH;
set hive.exec.parallel=true;

 設置map reduce個數

-- 設置map capacity
set mapred.job.map.capacity=2000;
set mapred.job.reduce.capacity=2000;
-- 設置每個reduce的大小 set hive.exec.reducers.bytes.per.reducer=500000000; -- 直接設置個數 set mapred.reduce.tasks = 15;

 設置任務名稱

-- 設置名稱
set mapred.job.name=my_job_{DATE};

 Hive文件合並

-- 設置文件合並
set abaci.is.dag.job=false;
set hive.merge.mapredfiles=true;
set mapred.combine.input.format.local.only=false;
set hive.merge.smallfiles.avgsize=100000000;

-- 在map only的情況下,如上的參數如果沒有生效,可以設置如下
-- 在HQL的最外層增加distribute by rand()
select * from XXX distribute by rand()

 

use namespace udw_ns;
set mapred.job.name=job_name_{DATE};
set hive.mapred.mode=nonstrict;
set mapred.reduce.tasks = 600;

set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;

set hive.exec.compress.output=true;
set mapred.output.compress=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.LzoCodec;

 

dfs.block.size

  決定HDFS文件block數量的多少(文件個數),它會間接的影響Job Tracker的調度和內存的占用(更影響內存的使用),

mapred.map.tasks.speculative.execution=true 

mapred.reduce.tasks.speculative.execution=true

這是兩個推測式執行的配置項,默認是true

所謂的推測執行,就是當所有task都開始運行之后,Job Tracker會統計所有任務的平均進度,如果某個task所在的task node機器配

置比較低或者CPU load很高(原因很多),導致任務執行比總體任務的平均執行要慢,此時Job Tracker會啟動一個新的任務

(duplicate task),原有任務和新任務哪個先執行完就把另外一個kill掉,這也是我們經常在Job Tracker頁面看到任務執行成功,但

是總有些任務被kill,就是這個原因。

mapred.child.java.opts

一般來說,都是reduce耗費內存比較大,這個選項是用來設置JVM堆的最大可用內存,但不要設置過大,如果超過2G(這是數字有

待考證),就應該考慮一下優化程序。

Input Split的大小,決定了一個Job擁有多少個map,默認64M每個Split,如果輸入的數據量巨大,那么默認的64M的block會有特

別多Map Task,集群的網絡傳輸會很大,給Job Tracker的調度、隊列、內存都會帶來很大壓力。

mapred.min.split.size

這個配置決定了每個Input Split 的最小值,也間接決定了一個job的map數量

HDFS塊大小是在job寫入時決定的,而分片的大小,是由三個元素決定的(在3各種去最大的那個)

(1) 輸入的塊數 (2) Mapred.min.split.size (3) Job.setNumMapTasks()

mapred.compress.map.output

壓縮Map的輸出,這樣做有兩個好處:
a)壓縮是在內存中進行,所以寫入map本地磁盤的數據就會變小,大大減少了本地IO次數
b) Reduce從每個map節點copy數據,也會明顯降低網絡傳輸的時間
注:數據序列化其實效果會更好,無論是磁盤IO還是數據大小,都會明顯的降低。

io.sort.mb
以MB為單位,默認100M,這個值比較小
map節點沒運行完時,內存的數據過多,要將內存中的內容寫入洗盤,這個設置就是設置內存緩沖的大小,在suffle之前
這個選項定義了map輸出結果在內存里占用buffer的大小,當buffer達到某個閾值(后面那條配置),會啟動一個后台線程來對buffer

的內容進行排序,然后寫入本地磁盤(一個spill文件)

根據map輸出數據量的大小,可以適當的調整buffer的大小,注意是適當的調整,並不是越大越好,假設內存無限大,

io.sort.mb=1024(1G), 和io.sort.mb=300 (300M),前者未必比后者快:
(1)1G的數據排序一次
(2)排序3次,每次300MB
一定是后者快(歸並排序)

io.sort.spill.percent
這個值就是上面提到的buffer的閾值,默認是0.8,既80%,當buffer中的數據達到這個閾值,后台線程會起來對buffer中已有的數

據進行排序,然后寫入磁盤,此時map輸出的數據繼續往剩余的20% buffer寫數據,如果buffer的剩余20%寫滿,排序還沒結束,

map task被block等待。
如果你確認map輸出的數據基本有序,排序時間很短,可以將這個閾值適當調高,更理想的,如果你的map輸出是有序的數據,那

么可以把buffer設的更大,閾值設置為1.

Io.sort.factor
同時打開的文件句柄的數量,默認是10
當一個map task執行完之后,本地磁盤上(mapred.local.dir)有若干個spill文件,map task最后做的一件事就是執行merge sort,

把這些spill文件合成一個文件(partition,combine階段)。
執行merge sort的時候,每次同時打開多少個spill文件,就是由io.sort.factor決定的。打開的文件越多,不一定merge sort就越

快,也要根據數據情況適當的調整。
注:merge排序的結果是兩個文件,一個是index,另一個是數據文件,index文件記錄了每個不同的key在數據文件中的偏移量(即partition)。
在map節點上,如果發現map所在的子節點的機器io比較重,原因可能是io.sort.factor這個設置的比較小,io.sort.factor設置小的

話,如果spill文件比較多,merge成一個文件要很多輪讀取操作,這樣就提升了io的負載。io.sort.mb小了,也會增加io的負載。
如果設置了執行combine的話,combine只是在merge的時候,增加了一步操作,不會改變merge的流程,所以combine不會減少

或者增加文件個數。另外有個min.num.spills.for.combine的參數,表示執行一個merge操作時,如果輸入文件數小於這個數字,就

不調用combiner。如果設置了combiner,在寫spill文件的時候也會調用,這樣加上merge時候的調用,就會執行兩次combine。

提高Reduce的執行效率,除了在Hadoop框架方面的優化,重點還是在代碼邏輯上的優化.比如:對Reduce接受到的value可能有重

復的,此時如果用Java的Set或者STL的Set來達到去重的目的,那么這個程序不是擴展良好的(non-scalable),受到數據量的限制,

當數據膨脹,內存勢必會溢出

mapred.reduce.parallel.copies
Reduce copy數據的線程數量,默認值是5
Reduce到每個完成的Map Task 拷貝數據(通過RPC調用),默認同時啟動5個線程到map節點取數據。這個配置還是很關鍵的,

如果你的map輸出數據很大,有時候會發現map早就100%了,reduce卻在緩慢的變化,那就是copy數據太慢了,比如5個線程

copy 10G的數據,確實會很慢,這時就要調整這個參數,但是調整的太大,容易造成集群擁堵,所以 Job tuning的同時,也是個權

衡的過程,要熟悉所用的數據!
mapred.job.shuffle.input.buffer.percent
當指定了JVM的堆內存最大值以后,上面這個配置項就是Reduce用來存放從Map節點取過來的數據所用的內存占堆內存的比例,默

認是0.7,既70%,通常這個比例是夠了,但是對於大數據的情況,這個比例還是小了一些,0.8-0.9之間比較合適。(前提是你的

reduce函數不會瘋狂的吃掉內存)
mapred.job.shuffle.merge.percent(默認值0.66)
mapred.inmem.merge.threshold(默認值1000)

第一個指的是從Map節點取數據過來,放到內存,當達到這個閾值之后,后台啟動線程(通常是Linux native process)把內存中的

數據merge sort,寫到reduce節點的本地磁盤;
第二個指的是從map節點取過來的文件個數,當達到這個個數之后,也進行merger sort,然后寫到reduce節點的本地磁盤;這兩

個配置項第一個優先判斷,其次才判斷第二個thresh-hold。
從實際經驗來看,mapred.job.shuffle.merge.percent默認值偏小,完全可以設置到0.8左右;第二個默認值1000,完全取決於

map輸出數據的大小,如果map輸出的數據很大,默認值1000反倒不好,應該小一些,如果map輸出的數據不大(light

weight),可以設置2000或者以上。
mapred.reduce.slowstart.completed.maps (map完成多少百分比時,開始shuffle)

當map運行慢,reduce運行很快時,如果不設置mapred.reduce.slowstart.completed.maps會使job的shuffle時間變的很長,

map運行完很早就開始了reduce,導致reduce的slot一直處於被占用狀態。mapred.reduce.slowstart.completed.maps 這個值是

和“運行完的map數除以總map數”做判斷的,當后者大於等於設定的值時,開始reduce的shuffle。所以當map比reduce的執行

時間多很多時,可以調整這個值(0.75,0.80,0.85及以上)

下面從流程里描述一下各個參數的作用:
當map task開始運算,並產生中間數據時,其產生的中間結果並非直接就簡單的寫入磁盤。這中間的過程比較復雜,並且利用到了

內存buffer來進行已經產生的部分結果的緩存,並在內存buffer中進行一些預排序來優化整個map的性能。每一個map都會對應存

在一個內存buffer(MapOutputBuffer),map會將已經產生的部分結果先寫入到該buffer中,這個buffer默認是100MB大小,但

是這個大小是可以根據job提交時的參數設定來調整的,該參數即為:io.sort.mb。當map的產生數據非常大時,並且把io.sort.mb

調大,那么map在整個計算過程中spill的次數就勢必會降低,map task對磁盤的操作就會變少,如果map tasks的瓶頸在磁盤上,

這樣調整就會大大提高map的計算性能。
map在運行過程中,不停的向該buffer中寫入已有的計算結果,但是該buffer並不一定能將全部的map輸出緩存下來,當map輸出

超出一定閾值(比如100M),那么map就必須將該buffer中的數據寫入到磁盤中去,這個過程在mapreduce中叫做spill。map並

不是要等到將該buffer全部寫滿時才進行spill,因為如果全部寫滿了再去寫spill,勢必會造成map的計算部分等待buffer釋放空間的

情況。所以,map其實是當buffer被寫滿到一定程度(比如80%)時,就開始進行spill。這個閾值也是由一個job的配置參數來控

制,即io.sort.spill.percent,默認為0.80或80%。這個參數同樣也是影響spill頻繁程度,進而影響map task運行周期對磁盤的讀寫

頻率的。但非特殊情況下,通常不需要人為的調整。調整io.sort.mb對用戶來說更加方便。
當map task的計算部分全部完成后,如果map有輸出,就會生成一個或者多個spill文件,這些文件就是map的輸出結果。map在正

常退出之前,需要將這些spill合並(merge)成一個,所以map在結束之前還有一個merge的過程。merge的過程中,有一個參數

可以調整這個過程的行為,該參數為:io.sort.factor。該參數默認為10。它表示當merge spill文件時,最多能有多少並行的stream

向merge文件中寫入。比如如果map產生的數據非常的大,產生的spill文件大於10,而io.sort.factor使用的是默認的10,那么當

map計算完成做merge時,就沒有辦法一次將所有的spill文件merge成一個,而是會分多次,每次最多10個stream。這也就是說,

當map的中間結果非常大,調大io.sort.factor,有利於減少merge次數,進而減少map對磁盤的讀寫頻率,有可能達到優化作業的

目的。
當job指定了combiner的時候,我們都知道map介紹后會在map端根據combiner定義的函數將map結果進行合並。運行combiner

函數的時機有可能會是merge完成之前,或者之后,這個時機可以由一個參數控制,即min.num.spill.for.combine(default 3),

當job中設定了combiner,並且spill數最少有3個的時候,那么combiner函數就會在merge產生結果文件之前運行。通過這樣的方

式,就可以在spill非常多需要merge,並且很多數據需要做conbine的時候,減少寫入到磁盤文件的數據數量,同樣是為了減少對磁

盤的讀寫頻率,有可能達到優化作業的目的。
減少中間結果讀寫進出磁盤的方法不止這些,還有就是壓縮。也就是說map的中間,無論是spill的時候,還是最后merge產生的結

果文件,都是可以壓縮的。壓縮的好處在於,通過壓縮減少寫入讀出磁盤的數據量。對中間結果非常大,磁盤速度成為map執行瓶

頸的job,尤其有用。控制map中間結果是否使用壓縮的參數為:mapred.compress.map.output(true/false)。將這個參數設置為

true時,那么map在寫中間結果時,就會將數據壓縮后再寫入磁盤,讀結果時也會采用先解壓后讀取數據。這樣做的后果就是:寫

入磁盤的中間結果數據量會變少,但是cpu會消耗一些用來壓縮和解壓。所以這種方式通常適合job中間結果非常大,瓶頸不在

cpu,而是在磁盤的讀寫的情況。說的直白一些就是用cpu換IO。根據觀察,通常大部分的作業cpu都不是瓶頸,除非運算邏輯異常

復雜。所以對中間結果采用壓縮通常來說是有收益的。
當采用map中間結果壓縮的情況下,用戶還可以選擇壓縮時采用哪種壓縮格式進行壓縮,現在hadoop支持的壓縮格式有:

GzipCodec,LzoCodec,BZip2Codec,LzmaCodec等壓縮格式。通常來說,想要達到比較平衡的cpu和磁盤壓縮比,LzoCodec

比較適合。但也要取決於job的具體情況。用戶若想要自行選擇中間結果的壓縮算法,可以設置配置參數:

mapred.map.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec或者其他用戶自行選擇的壓縮方式。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM