set hive.execution.engine = tez; --"mr", "tez", "spark"
set tez.queue.name=root.hello;
set tez.grouping.min-size=556000000;
set tez.grouping.max-size=3221225472;
set hive.tez.auto.reducer.parallelism=true;
set mapreduce.map.cpu.vcores=6;
set mapreduce.map.memory.mb=3072;
set mapreduce.map.java.opts=-Xmx2400m -Dfile.encoding=UTF-8;
set mapreduce.reduce.cpu.vcores=6;
set mapreduce.reduce.memory.mb=6144;
set mapreduce.reduce.java.opts=-Xmx4900m -Dfile.encoding=UTF-8;
set hive.exec.max.created.files=900000;
set mapred.job.queue.name=root.ESS-GODDOG-OFFLINE;
set mapred.job.name=t_dm_relation_graph;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=3000;
set hive.exec.max.dynamic.partitions.pernode=1000;
set hive.fetch.task.conversion=more;
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=2048;
set mapreduce.job.jvm.numtasks=-1;
set hive.exec.reducers.bytes.per.reducer=500000000;
--set mapred.reduce.tasks=1024;
set hive.auto.convert.join=true;
set hive.map.aggr=true;
set hive.groupby.skewindata=true;
set mapred.max.split.size=256000000;
set mapred.min.split.size.per.node=134217728;
set mapred.min.split.size.per.rack=134217728;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.size.per.task=256000000;
set hive.merge.smallfiles.avgsize=16000000;
set io.file.buffer.size=2097152;
set mapreduce.task.io.sort.mb=300;
set mapreduce.map.sort.spill.percent=0.8;
set mapreduce.task.io.sort.factor=10;
set mapreduce.reduce.shuffle.parallelcopies=8;
set hive.exec.compress.intermediate=true;
set mapreduce.map.output.compress=true;
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
set mapreduce.reduce.merge.inmem.threshold=0;
set mapreduce.reduce.input.buffer.percent=0.8;
set hive.exec.compress.output=true;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
# 關閉local hadoop
set hive.exec.mode.local.auto=false;
set mapred.job.priority=HIGH;
set mapreduce.job.reduce.slowstart.completedmaps=0.15
Map端調優
map數控制
對於map運行時間過長或者map數據傾斜比較嚴重時可以通過設置切片大小實現對map數的控制,hive設置如下
set mapred.max.split.size=100000000; set mapred.min.split.size.per.node=100000000; set mapred.min.split.size.per.rack=100000000;
map小文件優化
使用Combinefileinputformat,將多個小文件打包作為一個整體的inputsplit,減少map任務數
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
Map 內存控制
ApplicationMaster執行在獨立的jvm中以child process的方式執行 Mapper/Reducer task
子任務繼承ApplicationMaster的環境,用戶可以通過mapreduce.{map|reduce}.java.opts 指定child-jvm參數
set mapreduce.map.memory.mb=2048; set mapreduce.map.java.opts=-Xmx1536m
MapJoin優化
set hive.auto.convert.join = true; set hive.mapjoin.smalltable.filesize=25000000;
select /*+ MAPJOIN(table_a)*/,a.*,b.* from table_a a join table_b b on a.id = b.id
reduce端優化
reduce數控制
hive1.2.1自己如何確定reduce數: reduce個數的設定極大影響任務執行效率,不指定reduce個數的情況下,Hive會猜測確定一個reduce個數,基於以下兩個設定:hive.exec.reducers.bytes.per.reducer(每個reduce任務處理的數據量,默認為256*1000*1000)。hive.exec.reducers.max(每個任務最大的reduce數,默認為1009)
可以直接通過mapred.reduce.tasks(新mapreduce.job.reduces) 設置map數的大小或者通過hive.exec.reducers.bytes.per.reducer設置每個reduce task處理的數據量大小來間接控制reduce大小
set mapred.reduce.tasks = 15; set hive.exec.reducers.bytes.per.reducer=500000000;
其中
set mapreduce.job.reduces=number的方式優先級最高(舊參數:mapred.reduce.tasks)。
set hive.exec.reducers.max=number優先級次之,
set hive.exec.reducers.bytes.per.reducer=number 優先級最低。從hive0.14開始,一個reducer處理文件的大小的默認值是256M。
reduce內存控制
set mapreduce.reduce.memory.mb=2048; set mapreduce.reduce.java.opts=-Xmx1536m
MapReduce輸出壓縮優化
Map端壓縮
--啟用中間數據壓縮 set hive.exec.compress.intermediate=true -- 啟用最終數據輸出壓縮 set hive.exec.compress.output=true; --開啟mapreduce中map輸出壓縮功能 set mapreduce.map.output.compress=true; --設置mapreduce中map輸出數據的壓縮方式 set mapreduce.map.output.compress.codec= org.apache.hadoop.io.compress.SnappyCodec; --開啟mapreduce最終輸出數據壓縮 set mapreduce.output.fileoutputformat.compress=true; --設置mapreduce最終數據輸出壓縮方式 set mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec; --設置mapreduce最終數據輸出壓縮為塊壓縮。三種壓縮選擇:NONE, RECORD, BLOCK。 Record壓縮率低,一般建議使用BLOCK壓縮。 set mapreduce.output.fileoutputformat.compress.type=BLOCK;
hive創建文件限制
Maximum number of HDFS files created by all mappers/reducers in a MapReduce job.
一個MR job中允許創建多少個HDFS文件
MR對文件創建的總數是有限制的,這個限制取決於參數:
hive.exec.max.created.files,默認值是100000,可以根據實際情況調整
hive.error.on.empty.partition false 當動態分區插入產生空值時是否拋出異常
set hive.exec.max.created.files=150000
MapReduce文件合並優化
--設置map端輸出進行合並,默認為true set hive.merge.mapfiles = true --設置MapReduce結果輸出進行合並,默認為false set hive.merge.mapredfiles = true --設置合並文件的大小 set hive.merge.size.per.task = 256*1000*1000 --當輸出文件的平均大小小於該值時,啟動一個獨立的MapReduce任務進行文件merge。 set hive.merge.smallfiles.avgsize=16000000
Group by優化
group by數據傾斜
hive.map.aggr=true //map端是否聚合 hive.map.aggr.hash.force.flush.memory.threshold=0.9 //map側聚合最大內存,超出后flush hive.map.aggr.hash.min.reduction=0.5 //開啟聚合的最小閾值 hive.map.aggr.hash.percentmemory=0.5 //map端hash表聚合內存大小 --group的鍵對應的記錄條數超過這個值則會進行分拆,值根據具體數據量設置 set hive.groupby.mapaggr.checkinterval=100000 ; --如果是group by過程出現傾斜 應該設置為true set hive.groupby.skewindata=true; --join的鍵對應的記錄條數超過這個值則會進行分拆,值根據具體數據量設置 set hive.skewjoin.key=100000; --如果是join 過程出現傾斜 應該設置為true set hive.optimize.skewjoin=true;
空key轉換
有時雖然某個key為空對應的數據很多,但是相應的數據不是異常數據,必須要包含在join的結果中,此時我們可以表a中key為空的字段賦一個隨機的值,使得數據隨機均勻地分不到不同的reducer上
案例
- 不隨機分布空null值
1). 設置5個reduce個數
set mapreduce.job.reduces = 5;
2). JOIN兩張表
insert overwrite table jointable select n.* from nullidtable n left join ori b on n.id = b.id;
可以看出來,出現了數據傾斜,某些reducer的資源消耗遠大於其他reducer。
- 隨機分布空null值
1) . 設置5個reduce個數
set mapreduce.job.reduces = 5;
2). JOIN兩張表
insert overwrite table jointable select n.* from nullidtable n full join ori o on case when n.id is null then concat('hive', rand()) else n.id end = o.id;
可以看出來,消除了數據傾斜,負載均衡reducer的資源消耗。
動態分區優化
hive默認關閉了動態分區,使用動態分區時候,hive.exec.dynamic.partition參數必須設置成true;nonstrict模式表示允許所有的分區字段都可以使用動態分區。一般需要設置為nonstrict;當分區數比較多時hive.exec.max.dynamic.partitions和hive.exec.max.dynamic.partitions.pernode也需要根據時間情況調大
set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.max.dynamic.partitions=3000; set hive.exec.max.dynamic.partitions.pernode=1000;
Fetch優化
我們在執行hive代碼的時候,一條簡單的命令大部分都會轉換成為mr代碼在后台執行,但是有時候我們僅僅只是想獲取一部分數據而已,僅僅是獲取數據,還需要轉化成為mr去執行就比較浪費時間和內存了
hive提供了一個參數hive.fetch.task.conversion,這個參數所對應着"none", "minimal", "more":
1.在none下 : disable hive.fetch.task.conversion。所有查詢都啟動MR。
2.在minimal下,我們執行select * ,limit,filter在一個表所屬的分區表上操作,這三種情況都會直接進行數據的拿去,也就是直接把數據從對應的表格拿出來,不用跑mr代碼
3.在more模式下,運行select,filter,limit,都是運行數據的fetch,不跑mr應用
set hive.fetch.task.conversion=more;
Insert overwrite優化
當動態分區過多時,由於需要創建太多的文件,FileSinkOperator很有可能會觸發GC overlimit問題,可以啟用hive.optimize.sort.dynamic.partition,動態分區列將被全局排序。這樣我們就可以為每個分區值只打開一個文件寫入器,可以極大的減小reduce端內存的壓力
set hive.optimize.sort.dynamic.partition=true;
Hive Sql並行執行優化
hive.exec.parallel參數控制在同一個sql中的不同的job是否可以同時運行,默認為false。有時候一個sql中的有些job是不相關的,設置hive.exec.parallel=true可以讓這些不相關的job同時跑
set hive.exec.parallel=true; set hive.exec.parallel.thread.number=32; //同一個sql允許最大並行度,默認為8。
1.1.1 reduce數據傾斜導致報錯?
常見出錯的原因:
通過INSERT語句插入數據到動態分區表中,也可能會超過HDFS同時打開文件數的限制。
如果沒有join或聚合,INSERT ... SELECT語句會被轉換為只有map任務的作業。mapper任務會讀取輸入記錄然后將它們發送到目標分區目錄。
在這種情況下,每個mapper必須為遇到的每個動態分區創建一個新的文件寫入器(file writer)。
mapper在運行時所需的內存量隨着它遇到的分區數量的增加而增加。
優化動作:
set hive.optimize.sort.dynamic.partition = true
通過這個優化,這個只有map任務的mapreduce會引入reduce過程,這樣動態分區的那個字段比如日期在傳到reducer時會被排序。由於分區字段是排序的,因此每個reducer只需要保持一個文件寫入器(file writer)隨時處於打開狀態,在收到來自特定分區的所有行后,關閉記錄寫入器(record writer),從而減小內存壓力。這種優化方式在寫parquet文件時使用的內存要相對少一些,但代價是要對分區字段進行排序。
類似的詳細說明,鏈接,如下:
https://www.jianshu.com/p/cfb827642905?from=groupmessage&isappinstalled=0