一、Hive的壓縮和存儲
1,MapReduce支持的壓縮編碼
| 壓縮格式 |
工具 |
算法 |
文件擴展名 |
是否可切分 |
對應的編碼/解碼器 |
| DEFLATE |
無 |
DEFLATE |
.deflate |
否 |
org.apache.hadoop.io.compress.DefaultCodec |
| Gzip |
gzip |
DEFLATE |
.gz |
否 |
org.apache.hadoop.io.compress.GzipCodec |
| bzip2 |
bzip2 |
bzip2 |
.bz2 |
是 |
org.apache.hadoop.io.compress.BZip2Codec |
| LZO |
lzop |
LZO |
.lzo |
是 |
com.hadoop.compression.lzo.LzopCodec |
| Snappy |
無 |
Snappy |
.snappy |
否 |
org.apache.hadoop.io.compress.SnappyCodec |
2,文件壓縮格式:
TEXTFILE和SEQUENCEFILE的存儲格式都是基於行式存儲的;
ORC和PARQUET是基於列式存儲的。
a>TextFile格式:
默認格式,數據不做壓縮,磁盤開銷大,數據解析開銷大。可結合Gzip、Bzip2使用,但使用Gzip這種方式,hive不會對數據進行切分,從而無法對數據進行並行操作。
b>Orc格式:
Hive 0.11版里引入的新的存儲格式,數據按行分塊 每塊按照列存儲 ,壓縮快 快速列存取,效率比rcfile高,是rcfile的改良版本,相比RC能夠更好的壓縮,能夠更快的查詢,但還是不支持模式演進。
c>parquent格式:
Parquet文件是以二進制方式存儲的,所以是不可以直接讀取的,文件中包括該文件的數據和元數據,因此Parquet格式文件是自解析的。
在實際的項目開發當中,hive表的數據存儲格式一般選擇:orc或parquet。壓縮方式一般選擇snappy,lzo。
二、Hive的企業級調優:
1,Fetch抓取:
默認開啟。Fetch抓取是指,Hive中對某些情況的查詢可以不必使用MapReduce計算。(hive-default.xml.template文件中hive.fetch.task.conversion默認是more)例如:SELECT * FROM person;在這種情況下,Hive可以簡單地讀取person對應的存儲目錄下的文件,然后輸出查詢結果到控制台。
2,本地模式:
大多數的Hadoop Job是需要Hadoop提供的完整的可擴展性來處理大數據集的。不過,有時Hive的輸入數據量是非常小的。在這種情況下,為查詢觸發執行任務消耗的時間可能會比實際job的執行時間要多的多。對於大多數這種情況,Hive可以通過本地模式在單台機器上處理所有的任務。對於小數據集,執行時間可以明顯被縮短。
set hive.exec.mode.local.auto=true; //開啟本地mr //設置local mr的最大輸入數據量,當輸入數據量小於這個值時采用local mr的方式,默認為134217728,即128M set hive.exec.mode.local.auto.inputbytes.max=50000000; //設置local mr的最大輸入文件個數,當輸入文件個數小於這個值時采用local mr的方式,默認為4 set hive.exec.mode.local.auto.input.files.max=10;
3,表的優化:
a>大小表的join:
新版的hive已經對小表JOIN大表和大表JOIN小表進行了優化。小表放在左邊和右邊已經沒有明顯區別。
b>大表join大表:
空key過濾:空key對應的數據無意義
select n.* from (select * from nullidtable where id is not null ) n left join ori o on n.id = o.id;
空key轉換:空key對應的數據還是有意義,需要保留
#為空key賦予隨機值,在進入reduce的時候防止空key太多而造成數據傾斜
select n.* from nullidtable n full join ori o on case when n.id is null then concat('hive', rand()) else n.id end = o.id;
c>MapJoin(小表join大表)
#默認開啟 set hive.auto.convert.join = true; #大表小表的閾值設置(默認25M以下認為是小表)可以調整 set hive.mapjoin.smalltable.filesize=25000000;
d>group by:
默認情況下,Map階段同一個key數據分發到同一個reduce,當一個key的數據過大時就會出現數據傾斜。
#是否在Map端進行聚合,默認為True set hive.map.aggr = true
#在Map端進行聚合操作的條目數目 set hive.groupby.mapaggr.checkinterval = 100000
#有數據傾斜的時候進行負載均衡(默認是false) set hive.groupby.skewindata = true
當設置為負載均衡之后,生成的計划會有兩個MR的job。第一個MRjob,Map的輸出結果可能會隨機分布到Reduce中,每個Reduce做部分聚合操作,並輸出結果,這樣處理的結果是相同的Group By Key有可能被分發到不同的Reduce中,從而達到負載均衡的目的;第二個MRjob再根據預處理的數據結果按照Group By Key分布到Reduce中(這個過程可以保證相同的Group By Key被分布到同一個Reduce中),最后完成最終的聚合操作。
e>Count(distinct)去重統計
數據量小的時候無所謂,數據量大的情況下,由於COUNT DISTINCT的全聚合操作,即使設定了reduce task個數,set mapred.reduce.tasks=100;hive也只會啟動一個reducer,這就造成一個Reduce處理的數據量太大,導致整個Job很難完成,一般COUNT DISTINCT使用先GROUP BY再COUNT的方式替換:
#設置reduce個數為5 set mapreduce.job.reduces = 5; #采用distinct去重 select count(distinct id) from bigtable; #采用group by去重 select count(id) from (select id from bigtable group by id) a;
雖然會多用一個Job來完成,但在數據量大的情況下,這個絕對是值得的。
f>笛卡爾積
盡量避免笛卡爾積,join的時候不加on條件,或者無效的on條件,Hive只能使用1個reducer來完成笛卡爾積。
g>行列過濾
列處理:在select中,只拿需要的列,如果有,盡量使用分區過濾,少用select *。
行處理:在分區剪裁中,當使用外關聯時,如果將副表的過濾條件寫在where后面,那么就會先全表關聯,之后再過濾。
#先關聯再過濾 select o.id from bigtable b join ori o on o.id = b.id #先過濾再關聯 select b.id from bigtable b join (select id from ori where id <= 10 ) o on b.id = o.id;
h>動態分區
(1)開啟動態分區功能(默認true,開啟) set hive.exec.dynamic.partition=true (2)設置為非嚴格模式(動態分區的模式,默認strict,表示必須指定至少一個分區為靜態分區,nonstrict模式表示允許所有的分區字段都可以使用動態分區。) set hive.exec.dynamic.partition.mode=nonstrict (3)在所有執行MR的節點上,最大一共可以創建多少個動態分區。默認1000 set hive.exec.max.dynamic.partitions=1000 (4)在每個執行MR的節點上,最大可以創建多少個動態分區。該參數需要根據實際的數據來設定。比如:源數據中包含了一年的數據,即day字段有365個值,那么該參數就需要設置成大於365,如果使用默認值100,則會報錯。 set hive.exec.max.dynamic.partitions.pernode=100 (5)整個MR Job中,最大可以創建多少個HDFS文件。默認100000 set hive.exec.max.created.files=100000 (6)當有空分區生成時,是否拋出異常。一般不需要設置。默認false set hive.error.on.empty.partition=false
i>分區
分區表實際上就是對應一個HDFS文件系統上的獨立的文件夾,該文件夾下是該分區所有的數據文件。Hive中的分區就是分目錄,把一個大的數據集根據業務需要分割成小的數據集。在查詢時通過WHERE子句中的表達式選擇查詢所需要的指定的分區,這樣的查詢效率會提高很多。
j>分桶
分區提供一個隔離數據和優化查詢的便利方式。不過,並非所有的數據集都可形成合理的分區。對於一張表或者分區,Hive 可以進一步組織成桶,也就是更為細粒度的數據范圍划分。分區針對的是數據的存儲路徑;分桶針對的是數據文件。
#創建分桶表
create table stu_buck(id int, name string) clustered by(id) into 4 buckets row format delimited fields terminated by '\t';
#創建中間表
create table stu(id int, name string) row format delimited fields terminated by '\t';
#導入數據到中間表
load data local inpath '/opt/module/datas/student.txt' into table stu;
#開啟分桶
set hive.enforce.bucketing=true;
#設置reduce數量為-1
set mapreduce.job.reduces=-1;
#向分桶表中導入數據
insert into table stu_buck select id, name from stu;
4,合理設置Map的數量和Reduce的數量
a>復雜文件增加map數量
增加map的方法為:根據computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M公式,調整maxSize最大值。
讓maxSize最大值低於blocksize就可以增加map的個數。
set mapreduce.input.fileinputformat.split.maxsize=100;
b>小文件進行合並
#設置為CombineHiveInputFormat合並小文件 set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; #在map-only任務結束時合並小文件,默認true set hive.merge.mapfiles = true; #在map-reduce任務結束時合並小文件,默認false set hive.merge.mapredfiles = true; #合並文件的大小,默認256M set hive.merge.size.per.task = 268435456; #當輸出文件的平均大小小於該值時,啟動一個獨立的map-reduce任務進行文件merge set hive.merge.smallfiles.avgsize = 16777216;
c>合理設置reduce的個數
1.調整reduce個數方法一
(1)每個Reduce處理的數據量默認是256MB hive.exec.reducers.bytes.per.reducer=256000000 (2)每個任務最大的reduce數,默認為1009 hive.exec.reducers.max=1009 (3)計算reducer數的公式 N=min(參數2,總輸入數據量/參數1)
2.調整reduce個數方法二
在hadoop的mapred-default.xml文件中修改 設置每個job的Reduce個數 set mapreduce.job.reduces = 15;
3.reduce個數並不是越多越好
1)過多的啟動和初始化reduce也會消耗時間和資源; 2)另外,有多少個reduce,就會有多少個輸出文件,如果生成了很多個小文件,那么如果這些小文件作為下一個任務的輸入,則也會出現小文件過多的問題; 在設置reduce個數的時候也需要考慮這兩個原則:處理大數據量利用合適的reduce數;使單個reduce任務處理數據量大小要合適;
5,並行執行
Hive會將一個查詢轉化成一個或者多個階段。這樣的階段可以是MapReduce階段、抽樣階段、合並階段、limit階段。或者Hive執行過程中可能需要的其他階段。默認情況下,Hive一次只會執行一個階段。不過,某個特定的job可能包含眾多的階段,而這些階段可能並非完全互相依賴的,也就是說有些階段是可以並行執行的,這樣可能使得整個job的執行時間縮短。不過,如果有更多的階段可以並行執行,那么job可能就越快完成。
set hive.exec.parallel=true; //打開任務並行執行 set hive.exec.parallel.thread.number=16; //同一個sql允許最大並行度,默認為8。
在共享集群中,需要注意下,如果job中並行階段增多,那么集群利用率就會增加。當然,得是在系統資源比較空閑的時候才有優勢,否則,沒資源,並行也起不來。
6,嚴格模式
通過設置屬性hive.mapred.mode值為默認是非嚴格模式nonstrict 。開啟嚴格模式需要修改hive.mapred.mode值為strict,開啟嚴格模式可以禁止3種類型的查詢。
1)對於分區表,除非where語句中含有分區字段過濾條件來限制范圍,否則不允許執行。(就是用戶不允許掃描所有分區)
2)對於使用了order by語句的查詢,要求必須使用limit語句。 因為order by為了執行排序過程會將所有的結果數據分發到同一個Reducer中進行處理,強制要求用戶增加這個LIMIT語句可以防止Reducer額外執行很長一段時間。
3)限制笛卡爾積的查詢。
7,JVM重用
JVM重用是Hadoop調優參數的內容,其對Hive的性能具有非常大的影響,特別是對於很難避免小文件的場景或task特別多的場景,這類場景大多數執行時間都很短。
在Hadoop的mapred-site.xml文件中進行配置
<property> <name>mapreduce.job.jvm.numtasks</name> <value>10</value> <description>How many tasks to run per jvm. If set to -1, there is no limit</description> </property>
8,推測執行
Hadoop的mapred-site.xml文件中進行配置,默認是true
<property> <name>mapreduce.map.speculative</name> <value>true</value> <description>If true, then multiple instances of some map tasks may be executed in parallel.</description> </property> <property> <name>mapreduce.reduce.speculative</name> <value>true</value> <description>If true, then multiple instances of some reduce tasks may be executed in parallel.</description> </property>
不過hive本身也提供了配置項來控制reduce-side的推測執行:默認是true
<property>
<name>hive.mapred.reduce.tasks.speculative.execution</name>
<value>true</value>
<description>Whether speculative execution for reducers should be turned on. </description>
</property>
不建議開啟的情況:(1)任務間存在嚴重的負載傾斜;(2)特殊任務,比如任務向數據庫中寫數據。
9,壓縮
見第一章
10,explain執行計划
利用explain查看sql的執行計划
