Hive(十)Hive性能調優總結


一、Fetch抓取

1、理論分析

Fetch抓取是指,Hive中對某些情況的查詢可以不必使用MapReduce計算。例如:SELECT * FROM employees;在這種情況下,Hive可以簡單地讀取employee對應的存儲目錄下的文件,然后輸出查詢結果到控制台。

在hive-default.xml.template文件中hive.fetch.task.conversion默認是more,老版本hive默認是minimal,該屬性修改為more以后,在全局查找、字段查找、limit查找等都不走mapreduce。

<property>
    <name>hive.fetch.task.conversion</name>
    <value>more</value>
    <description>
      Expects one of [none, minimal, more].
      Some select queries can be converted to single FETCH task minimizing latency.
      Currently the query should be single sourced not having any subquery and should not have
      any aggregations or distincts (which incurs RS), lateral views and joins.
      0. none : disable hive.fetch.task.conversion
      1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
      2. more  : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)
    </description>
  </property>

2、案例實操

(1)把hive.fetch.task.conversion設置成none,然后執行查詢語句,都會執行mapreduce程序。

hive (default)> set hive.fetch.task.conversion=none;

hive (default)> select * from emp;

hive (default)> select ename from emp;

hive (default)> select ename from emp limit 3;

(2)把hive.fetch.task.conversion設置成more,然后執行查詢語句,如下查詢方式都不會執行mapreduce程序。

hive (default)> set hive.fetch.task.conversion=more;

hive (default)> select * from emp;

hive (default)> select ename from emp;

hive (default)> select ename from emp limit 3;

二、本地模式

1、理論分析

Hive 在集群上查詢時,默認是在集群上 N 台機器上運行, 需要多個機器進行協調運行,這 個方式很好地解決了大數據量的查詢問題。但是當 Hive 查詢處理的數據量比較小時,其實沒有必要啟動分布式模式去執行,因為以分布式方式執行就涉及到跨網絡傳輸、多節點協調 等,並且消耗資源。這個時間可以只使用本地模式來執行 mapreduce job,只在一台機器上執行,速度會很快。啟動本地模式涉及到三個參數:

set hive.exec.mode.local.auto=true 是打開 hive 自動判斷是否啟動本地模式的開關,但是只 是打開這個參數並不能保證啟動本地模式,要當 map 任務數不超過

hive.exec.mode.local.auto.input.files.max 的個數並且 map 輸入文件大小不超過

hive.exec.mode.local.auto.inputbytes.max 所指定的大小時,才能啟動本地模式。

如下:用戶可以通過設置hive.exec.mode.local.auto的值為true,來讓Hive在適當的時候自動啟動這個優化。

set hive.exec.mode.local.auto=true;  //開啟本地mr

//設置local mr的最大輸入數據量,當輸入數據量小於這個值時采用local  mr的方式,默認為134217728,即128M

set hive.exec.mode.local.auto.inputbytes.max=50000000;

//設置local mr的最大輸入文件個數,當輸入文件個數小於這個值時采用local mr的方式,默認為4

set hive.exec.mode.local.auto.input.files.max=10;

2、案例實操

(1)開啟本地模式,並執行查詢語句

hive (default)> set hive.exec.mode.local.auto=true; 

hive (default)> select * from emp cluster by deptno;

Time taken: 1.328 seconds, Fetched: 14 row(s)

(2)關閉本地模式,並執行查詢語句

hive (default)> set hive.exec.mode.local.auto=false; 

hive (default)> select * from emp cluster by deptno;

Time taken: 20.09 seconds, Fetched: 14 row(s);

三、Hive的壓縮存儲

1、合理利用文件存儲格式 

創建表時,盡量使用 orc、parquet 這些列式存儲格式,因為列式存儲的表,每一列的數據在物理上是存儲在一起的,Hive查詢時會只遍歷需要列數據,大大減少處理的數據量。

2、壓縮的原因

Hive 最終是轉為 MapReduce 程序來執行的,而MapReduce 的性能瓶頸在於網絡 IO 和 磁盤 IO,要解決性能瓶頸,最主要的是減少數據量,對數據進行壓縮是個好的方式。壓縮 雖然是減少了數據量,但是壓縮過程要消耗CPU的,但是在Hadoop中, 往往性能瓶頸不在於CPU,CPU壓力並不大,所以壓縮充分利用了比較空閑的 CPU

3、常用壓縮方法對比

各個壓縮方式所對應的 Class 類:

4、壓縮方式的選擇

壓縮比率

壓縮解壓縮速度

是否支持 Split

5、壓縮使用

Job 輸出文件按照 block 以 GZip 的方式進行壓縮:

set mapreduce.output.fileoutputformat.compress=true // 默認值是 false

set mapreduce.output.fileoutputformat.compress.type=BLOCK // 默認值是 Record

set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec // 默認值是 org.apache.hadoop.io.compress.DefaultCodec

Map 輸出結果也以 Gzip 進行壓縮:

set mapred.map.output.compress=true

set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec // 默認值是 org.apache.hadoop.io.compress.DefaultCodec 

對 Hive 輸出結果和中間都進行壓縮:

set hive.exec.compress.output=true // 默認值是 false,不壓縮

set hive.exec.compress.intermediate=true // 默認值是 false,為 true 時 MR 設置的壓縮才啟用

四、表的優化

1、小表、大表Join

1)理論分析

將key相對分散,並且數據量小的表放在join的左邊,這樣可以有效減少內存溢出錯誤發生的幾率;再進一步,可以使用Group讓小的維度表(1000條以下的記錄條數)先進內存。在map端完成reduce。

實際測試發現:新版的hive已經對小表JOIN大表和大表JOIN小表進行了優化。小表放在左邊和右邊已經沒有明顯區別。

2)案例實操

(0)需求:測試大表JOIN小表和小表JOIN大表的效率

(1)建大表、小表和JOIN后表的語句

create table bigtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

 

create table smalltable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

 

create table jointable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

(2)分別向大表和小表中導入數據

hive (default)> load data local inpath '/opt/module/datas/bigtable' into table bigtable;

hive (default)>load data local inpath '/opt/module/datas/smalltable' into table smalltable;

(3)關閉mapjoin功能(默認是打開的)

set hive.auto.convert.join = false;

(4)執行小表JOIN大表語句

insert overwrite table jointable

select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url

from smalltable s

left join bigtable  b

on b.id = s.id;

Time taken: 35.921 seconds

(5)執行大表JOIN小表語句

insert overwrite table jointable

select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url

from bigtable  b

left join smalltable  s

on s.id = b.id;

Time taken: 34.196 seconds;

2、大表Join大表

1)空KEY過濾

有時join超時是因為某些key對應的數據太多,而相同key對應的數據都會發送到相同的reducer上,從而導致內存不夠。此時我們應該仔細分析這些異常的key,很多情況下,這些key對應的數據是異常數據,我們需要在SQL語句中進行過濾。例如key對應的字段為空,操作如下:

案例實操

(1)配置歷史服務器

配置mapred-site.xml

<property>

<name>mapreduce.jobhistory.address</name>

<value>node21:10020</value>

</property>

<property>

    <name>mapreduce.jobhistory.webapp.address</name>

    <value>node21:19888</value>

</property>

啟動歷史服務器

sbin/mr-jobhistory-daemon.sh start historyserver

查看jobhistory

http://node21:19888/jobhistory

(2)創建原始數據表、空id表、合並后數據表

create table ori(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

 

create table nullidtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

 

create table jointable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

(3)分別加載原始數據和空id數據到對應表中

hive (default)> load data local inpath '/opt/module/datas/ori' into table ori;

hive (default)> load data local inpath '/opt/module/datas/nullid' into table nullidtable;

(4)測試不過濾空id

hive (default)> insert overwrite table jointable

select n.* from nullidtable n left join ori o on n.id = o.id;

Time taken: 42.038 seconds

(5)測試過濾空id

hive (default)> insert overwrite table jointable

select n.* from (select * from nullidtable where id is not null ) n  left join ori o on n.id = o.id;

Time taken: 31.725 seconds

2)空key轉換

有時雖然某個key為空對應的數據很多,但是相應的數據不是異常數據,必須要包含在join的結果中,此時我們可以表a中key為空的字段賦一個隨機的值,使得數據隨機均勻地分不到不同的reducer上。例如:

案例實操:

不隨機分布空null值:

(1)設置5個reduce個數

set mapreduce.job.reduces = 5;

(2)JOIN兩張表

insert overwrite table jointable

select n.* from nullidtable n left join ori b on n.id = b.id;

結果:可以看出來,出現了數據傾斜,某些reducer的資源消耗遠大於其他reducer。

 

隨機分布空null值

(1)設置5個reduce個數

set mapreduce.job.reduces = 5;

(2)JOIN兩張表

insert overwrite table jointable

select n.* from nullidtable n full join ori o on

case when n.id is null then concat('hive', rand()) else n.id end = o.id;

結果:可以看出來,消除了數據傾斜,負載均衡reducer的資源消耗

 

3、Map Join

理論分析

如果不指定MapJoin或者不符合MapJoin的條件,那么Hive解析器會將Join操作轉換成Common Join,即:在Reduce階段完成join。容易發生數據傾斜。可以用MapJoin把小表全部加載到內存在map端進行join,避免reducer處理。

1)開啟MapJoin參數設置:

(1)設置自動選擇Mapjoin

set hive.auto.convert.join = true; 默認為true

(2)大表小表的閥值設置(默認25M一下認為是小表):

set hive.mapjoin.smalltable.filesize=25000000;

2)MapJoin工作機制

 

首先是Task A,它是一個Local Task(在客戶端本地執行的Task),負責掃描小表b的數據,將其轉換成一個HashTable的數據結構,並寫入本地的文件中,之后將該文件加載到DistributeCache中。

接下來是Task B,該任務是一個沒有Reduce的MR,啟動MapTasks掃描大表a,在Map階段,根據a的每一條記錄去和DistributeCache中b表對應的HashTable關聯,並直接輸出結果。

由於MapJoin沒有Reduce,所以由Map直接輸出結果文件,有多少個Map Task,就有多少個結果文件。

案例實操:

(1)開啟Mapjoin功能

set hive.auto.convert.join = true; 默認為true

(2)執行小表JOIN大表語句

insert overwrite table jointable

select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url

from smalltable s

join bigtable  b

on s.id = b.id;

Time taken: 24.594 seconds

(3)執行大表JOIN小表語句

insert overwrite table jointable

select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url

from bigtable  b

join smalltable  s

on s.id = b.id;

Time taken: 24.315 seconds

4、Group By

默認情況下,Map階段同一Key數據分發給一個reduce,當一個key數據過大時就傾斜了。並不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端進行部分聚合,最后在Reduce端得出最終結果。

1)開啟Map端聚合參數設置

(1)是否在Map端進行聚合,默認為True

set hive.map.aggr = true

(2)在Map端進行聚合操作的條目數目

set hive.groupby.mapaggr.checkinterval = 100000

(3)有數據傾斜的時候進行負載均衡(默認是false)

set hive.groupby.skewindata = true

    當選項設定為 true,生成的查詢計划會有兩個MR Job。第一個MR Job中,Map的輸出結果會隨機分布到Reduce中,每個Reduce做部分聚合操作,並輸出結果,這樣處理的結果是相同的Group By Key有可能被分發到不同的Reduce中,從而達到負載均衡的目的;第二個MR Job再根據預處理的數據結果按照Group By Key分布到Reduce中(這個過程可以保證相同的Group By Key被分布到同一個Reduce中),最后完成最終的聚合操作。

5、Count(Distinct) 

數據量小的時候無所謂,數據量大的情況下,由於COUNT DISTINCT操作需要用一個Reduce Task來完成,這一個Reduce需要處理的數據量太大,就會導致整個Job很難完成,一般COUNT DISTINCT使用先GROUP BY再COUNT的方式替換:

案例實操

(1)創建一張大表

hive (default)> create table bigtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

(2)加載數據

hive (default)> load data local inpath '/opt/module/datas/bigtable' into table bigtable;

(3)設置5個reduce個數

set mapreduce.job.reduces = 5;

(4)執行去重id查詢

hive (default)> select count(distinct id) from bigtable;

Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 7.12 sec   HDFS Read: 120741990 HDFS Write: 7 SUCCESS

Total MapReduce CPU Time Spent: 7 seconds 120 msec

OK

c0

100001

Time taken: 23.607 seconds, Fetched: 1 row(s)

Time taken: 34.941 seconds, Fetched: 1 row(s)

(5)采用GROUP by去重id

hive (default)> select count(id) from (select id from bigtable group by id) a;

Stage-Stage-1: Map: 1  Reduce: 5   Cumulative CPU: 17.53 sec   HDFS Read: 120752703 HDFS Write: 580 SUCCESS

Stage-Stage-2: Map: 3  Reduce: 1   Cumulative CPU: 4.29 sec   HDFS Read: 9409 HDFS Write: 7 SUCCESS

Total MapReduce CPU Time Spent: 21 seconds 820 msec

OK

_c0

100001

Time taken: 50.795 seconds, Fetched: 1 row(s)

雖然會多用一個Job來完成,但在數據量大的情況下,這個絕對是值得的。

6、笛卡爾積

盡量避免笛卡爾積,join的時候不加on條件,或者無效的on條件,Hive只能使用1個reducer來完成笛卡爾積

當 Hive 設定為嚴格模式(hive.mapred.mode=strict)時,不允許在 HQL 語句中出現笛卡爾積, 這實際說明了 Hive 對笛卡爾積支持較弱。因為找不到 Join key,Hive 只能使用 1 個 reducer 來完成笛卡爾積。

當然也可以使用 limit 的辦法來減少某個表參與 join 的數據量,但對於需要笛卡爾積語義的 需求來說,經常是一個大表和一個小表的 Join 操作,結果仍然很大(以至於無法用單機處 理),這時 MapJoin才是最好的解決辦法。MapJoin,顧名思義,會在 Map 端完成 Join 操作。 這需要將 Join 操作的一個或多個表完全讀入內存。

PS:MapJoin 在子查詢中可能出現未知 BUG。在大表和小表做笛卡爾積時,規避笛卡爾積的 方法是,給 Join 添加一個 Join key,原理很簡單:將小表擴充一列 join key,並將小表的條 目復制數倍,join key 各不相同;將大表擴充一列 join key 為隨機數。

精髓就在於復制幾倍,最后就有幾個 reduce 來做,而且大表的數據是前面小表擴張 key 值 范圍里面隨機出來的,所以復制了幾倍 n,就相當於這個隨機范圍就有多大 n,那么相應的, 大表的數據就被隨機的分為了 n 份。並且最后處理所用的 reduce 數量也是 n,而且也不會 出現數據傾斜。

7、行列過濾

列處理:在SELECT中,只拿需要的列,如果有,盡量使用分區過濾,少用SELECT *。

行處理:在分區剪裁中,當使用外關聯時,如果將副表的過濾條件寫在Where后面,那么就會先全表關聯,之后再過濾,比如:

案例實操:

(1)測試先關聯兩張表,再用where條件過濾

hive (default)> select o.id from bigtable b

join ori o on o.id = b.id

where o.id <= 10;

Time taken: 34.406 seconds, Fetched: 100 row(s)

Time taken: 26.043 seconds, Fetched: 100 row(s)

(2)通過子查詢后,再關聯表

hive (default)> select b.id from bigtable b

join (select id from ori where id <= 10 ) o on b.id = o.id;

Time taken: 30.058 seconds, Fetched: 100 row(s)

Time taken: 29.106 seconds, Fetched: 100 row(s)

8、動態分區調整

關系型數據庫中,對分區表Insert數據時候,數據庫自動會根據分區字段的值,將數據插入到相應的分區中,Hive中也提供了類似的機制,即動態分區(Dynamic Partition),只不過,使用Hive的動態分區,需要進行相應的配置。

1)開啟動態分區參數設置

(1)開啟動態分區功能(默認true,開啟)

hive.exec.dynamic.partition=true

(2)設置為非嚴格模式(動態分區的模式,默認strict,表示必須指定至少一個分區為靜態分區,nonstrict模式表示允許所有的分區字段都可以使用動態分區。)

hive.exec.dynamic.partition.mode=nonstrict

(3)在所有執行MR的節點上,最大一共可以創建多少個動態分區。

hive.exec.max.dynamic.partitions=1000

(4)在每個執行MR的節點上,最大可以創建多少個動態分區。該參數需要根據實際的數據來設定。比如:源數據中包含了一年的數據,即day字段有365個值,那么該參數就需要設置成大於365,如果使用默認值100,則會報錯。

hive.exec.max.dynamic.partitions.pernode=100

(5)整個MR Job中,最大可以創建多少個HDFS文件。

hive.exec.max.created.files=100000

(6)當有空分區生成時,是否拋出異常。一般不需要設置。

hive.error.on.empty.partition=false

2)案例實操

需求:將ori中的數據按照時間(如:20111230000008),插入到目標表ori_partitioned_target的相應分區中。

(1)創建分區表

create table ori_partitioned(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string)

partitioned by (p_time bigint)

row format delimited fields terminated by '\t';

(2)加載數據到分區表中

hive (default)> load data local inpath '/opt/module/datas/ds1' into table ori_partitioned partition(p_time='20111230000010') ;

hive (default)> load data local inpath '/opt/module/datas/ds2' into table ori_partitioned partition(p_time='20111230000011') ;

(3)創建目標分區表

create table ori_partitioned_target(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) PARTITIONED BY (p_time STRING) row format delimited fields terminated by '\t';

(4)設置動態分區

set hive.exec.dynamic.partition = true;

set hive.exec.dynamic.partition.mode = nonstrict;

set hive.exec.max.dynamic.partitions = 1000;

set hive.exec.max.dynamic.partitions.pernode = 100;

set hive.exec.max.created.files = 100000;

set hive.error.on.empty.partition = false;

 

hive (default)> insert overwrite table ori_partitioned_target partition (p_time) 

select id, time, uid, keyword, url_rank, click_num, click_url, p_time from ori_partitioned;

(5)查看目標分區表的分區情況

hive (default)> show partitions ori_partitioned_target;

9、優化 in/exists 語句

雖然經過測驗,hive1.2.1 也支持 in/exists 操作,但還是推薦使用 hive 的一個高效替代方案:left semi join

比如說:

select a.id, a.name from a where a.id in (select b.id from b);
select a.id, a.name from a where exists (select id from b where a.id = b.id);

應該轉換成:

select a.id, a.name from a left semi join b on a.id = b.id;

10、排序選擇

cluster by:對同一字段分桶並排序,不能和 sort by 連用

distribute by + sort by:分桶,保證同一字段值只存在一個結果文件當中,結合 sort by 保證 每個 reduceTask 結果有序

sort by:單機排序,單個 reduce 結果有序

order by:全局排序,缺陷是只能使用一個 reduce

11、合並 MapReduce操作

Multi-group by 是 Hive 的一個非常好的特性,它使得 Hive 中利用中間結果變得非常方便。 例如:

FROM (SELECT a.status, b.school, b.gender FROM status_updates a JOIN profiles b ON (a.userid =
b.userid and a.ds='2009-03-20' ) ) subq1
INSERT OVERWRITE TABLE gender_summary PARTITION(ds='2009-03-20')
SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
INSERT OVERWRITE TABLE school_summary PARTITION(ds='2009-03-20')
SELECT subq1.school, COUNT(1) GROUP BY subq1.school

上述查詢語句使用了 multi-group by 特性連續 group by 了 2 次數據,使用不同的 group by key。 這一特性可以減少一次 MapReduce 操作

12、合理利用分桶:Bucketing 和 Sampling

Bucket 是指將數據以指定列的值為 key 進行 hash,hash 到指定數目的桶中。這樣就可以支持高效采樣了。如下例就是以 userid 這一列為 bucket 的依據,共設置 32 個 buckets

CREATE TABLE page_view(viewTime INT, userid BIGINT,
 page_url STRING, referrer_url STRING,
 ip STRING COMMENT 'IP Address of the User')
 COMMENT 'This is the page view table'
 PARTITIONED BY(dt STRING, country STRING)
 CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY '1'
 COLLECTION ITEMS TERMINATED BY '2'
 MAP KEYS TERMINATED BY '3'
 STORED AS SEQUENCEFILE;

通常情況下,Sampling 在全體數據上進行采樣,這樣效率自然就低,它要去訪問所有數據。 而如果一個表已經對某一列制作了 bucket,就可以采樣所有桶中指定序號的某個桶,這就減少了訪問量。

如下例所示就是采樣了 page_view 中 32 個桶中的第三個桶的全部數據:

SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 32);

如下例所示就是采樣了 page_view 中 32 個桶中的第三個桶的一半數據:

SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 64);

五、數據傾斜

1、Map數

1)通常情況下,作業會通過input的目錄產生一個或者多個map任務。主要的決定因素有:input的文件總個數,input的文件大小,集群設置的文件塊大小。

在 MapReduce 的編程案例中,我們得知,一個MR Job的 MapTask 數量是由輸入分片 InputSplit 決定的。而輸入分片是由 FileInputFormat.getSplit()決定的。一個輸入分片對應一個 MapTask, 而輸入分片是由三個參數決定的:

輸入分片大小的計算是這么計算出來的:

long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))

默認情況下,輸入分片大小和 HDFS 集群默認數據塊大小一致,也就是默認一個數據塊,啟用一個 MapTask 進行處理,這樣做的好處是避免了服務器節點之間的數據傳輸,提高 job 處理效率。

2)是不是map數越多越好?(Map 數過大)

答案是否定的。如果一個任務有很多小文件(遠遠小於塊大小128m),則每個小文件也會被當做一個塊,用一個map任務來完成,而一個map任務啟動和初始化的時間遠遠大於邏輯處理的時間,就會造成很大的資源浪費。而且,同時可執行的map數是受限的。

3)是不是保證每個map處理接近128m的文件塊,就高枕無憂了?(Map 數過小)

答案也是不一定。比如有一個127m的文件,正常會用一個map去完成,但這個文件只有一個或者兩個小字段,卻有幾千萬的記錄,如果map處理的邏輯比較復雜,用一個map任務去做,肯定也比較耗時。

針對上面的問題2和3,我們需要采取兩種方式來解決:即減少map數和增加map數;

2、小文件進行合並

在map執行前合並小文件,減少map數:CombineHiveInputFormat具有對小文件進行合並的功能(系統默認的格式)。HiveInputFormat沒有對小文件合並功能。

set hive.merge.mapfiles = true                   ##在 map only 的任務結束時合並小文件
set hive.merge.mapredfiles = false               ## true 時在 MapReduce 的任務結束時合並小文件 set hive.merge.size.per.task = 256*1000*1000 ##合並文件的大小 set mapred.max.split.size=256000000; ##每個 Map 最大分割大小 set mapred.min.split.size.per.node=1; ##一個節點上 split 的最少值 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; ##執行Map前進行小文件合並

3、復雜文件增加Map數

當input的文件都很大,任務邏輯復雜,map執行非常慢的時候,可以考慮增加Map數,來使得每個map處理的數據量減少,從而提高任務的執行效率。

增加map的方法為:根據computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M公式,調整maxSize最大值。讓maxSize最大值低於blocksize就可以增加map的個數。

案例實操:

(1)執行查詢

hive (default)> select count(*) from emp;

Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1

(2)設置最大切片值為100個字節

hive (default)> set mapreduce.input.fileinputformat.split.maxsize=100;

hive (default)> select count(*) from emp;

Hadoop job information for Stage-1: number of mappers: 6; number of reducers: 1

4、Reduce數

Hadoop MapReduce 程序中,reducer 個數的設定極大影響執行效率,這使得 Hive 怎樣決定 reducer 個數成為一個關鍵問題。遺憾的是 Hive 的估計機制很弱,不指定 reducer 個數的情況下,Hive 會猜測確定一個 reducer 個數,基於以下兩個設定:

1、hive.exec.reducers.bytes.per.reducer(默認為 256000000)

2、hive.exec.reducers.max(默認為 1009)

3、mapreduce.job.reduces=-1(設置一個常量 reducetask 數量)

計算 reducer 數的公式很簡單: N=min(參數 2,總輸入數據量/參數 1) 通常情況下,有必要手動指定 reducer 個數。考慮到 map 階段的輸出數據量通常會比輸入有 大幅減少,因此即使不設定 reducer 個數,重設參數 2 還是必要的。

依據 Hadoop 的經驗,可以將參數 2 設定為 0.95*(集群中 datanode 個數)。 

1)調整reduce個數方法一

(1)每個Reduce處理的數據量默認是256MB

hive.exec.reducers.bytes.per.reducer=256000000

(2)每個任務最大的reduce數,默認為1009

hive.exec.reducers.max=1009

(3)計算reducer數的公式

N=min(參數2,總輸入數據量/參數1)

2)調整reduce個數方法二

在hadoop的mapred-default.xml文件中修改

設置每個job的Reduce個數

set mapreduce.job.reduces = 15;

3)reduce個數並不是越多越好

1)過多的啟動和初始化reduce也會消耗時間和資源;

2)另外,有多少個reduce,就會有多少個輸出文件,如果生成了很多個小文件,那么如果這些小文件作為下一個任務的輸入,則也會出現小文件過多的問題;

在設置reduce個數的時候也需要考慮這兩個原則:處理大數據量利用合適的reduce數;使單個reduce任務處理數據量大小要合適;

5、並行執行

Hive會將一個查詢轉化成一個或者多個階段。這樣的階段可以是MapReduce階段、抽樣階段、合並階段、limit階段。或者Hive執行過程中可能需要的其他階段。默認情況下,Hive一次只會執行一個階段。不過,某個特定的job可能包含眾多的階段,而這些階段可能並非完全互相依賴的,也就是說有些階段是可以並行執行的,這樣可能使得整個job的執行時間縮短。不過,如果有更多的階段可以並行執行,那么job可能就越快完成。

通過設置參數hive.exec.parallel值為true,就可以開啟並發執行。不過,在共享集群中,需要注意下,如果job中並行階段增多,那么集群利用率就會增加。

set hive.exec.parallel=true;              //打開任務並行執行

set hive.exec.parallel.thread.number=16;  //同一個sql允許最大並行度,默認為8。

當然,得是在系統資源比較空閑的時候才有優勢,否則,沒資源,並行也起不來。

6、嚴格模式

Hive提供了一個嚴格模式,可以防止用戶執行那些可能意向不到的不好的影響的查詢。

通過設置屬性hive.mapred.mode值為默認是非嚴格模式nonstrict 。開啟嚴格模式需要修改hive.mapred.mode值為strict,開啟嚴格模式可以禁止3種類型的查詢。

<property>

    <name>hive.mapred.mode</name>

    <value>strict</value>

    <description>

      The mode in which the Hive operations are being performed.

      In strict mode, some risky queries are not allowed to run. They include:

        Cartesian Product.

        No partition being picked up for a query.

        Comparing bigints and strings.

        Comparing bigints and doubles.

        Orderby without limit.

    </description>

  </property>

1)對於分區表,除非where語句中含有分區字段過濾條件來限制范圍,否則不允許執行。換句話說,就是用戶不允許掃描所有分區。進行這個限制的原因是,通常分區表都擁有非常大的數據集,而且數據增加迅速。沒有進行分區限制的查詢可能會消耗令人不可接受的巨大資源來處理這個表。

2)對於使用了order by語句的查詢,要求必須使用limit語句。因為order by為了執行排序過程會將所有的結果數據分發到同一個Reducer中進行處理,強制要求用戶增加這個LIMIT語句可以防止Reducer額外執行很長一段時間。

3)限制笛卡爾積的查詢。對關系型數據庫非常了解的用戶可能期望在執行JOIN查詢的時候不使用ON語句而是使用where語句,這樣關系數據庫的執行優化器就可以高效地將WHERE語句轉化成那個ON語句。不幸的是,Hive並不會執行這種優化,因此,如果表足夠大,那么這個查詢就會出現不可控的情況。

7、JVM重用

JVM重用是Hadoop調優參數的內容,其對Hive的性能具有非常大的影響,特別是對於很難避免小文件的場景或task特別多的場景,這類場景大多數執行時間都很短。

Hadoop的默認配置通常是使用派生JVM來執行map和Reduce任務的。這時JVM的啟動過程可能會造成相當大的開銷,尤其是執行的job包含有成百上千task任務的情況。JVM重用可以使得JVM實例在同一個job中重新使用N次。N的值可以在Hadoop的mapred-site.xml文件中進行配置。通常在10-20之間,具體多少需要根據具體業務場景測試得出。

<property>

  <name>mapreduce.job.jvm.numtasks</name>

  <value>10</value>

  <description>How many tasks to run per jvm. If set to -1, there is

  no limit.

  </description>

</property>

這個功能的缺點是,開啟JVM重用將一直占用使用到的task插槽,以便進行重用,直到任務完成后才能釋放。如果某個“不平衡的”job中有某幾個reduce task執行的時間要比其他Reduce task消耗的時間多的多的話,那么保留的插槽就會一直空閑着卻無法被其他的job使用,直到所有的task都結束了才會釋放。

8、推測執行

在分布式集群環境下,因為程序Bug(包括Hadoop本身的bug),負載不均衡或者資源分布不均等原因,會造成同一個作業的多個任務之間運行速度不一致,有些任務的運行速度可能明顯慢於其他任務(比如一個作業的某個任務進度只有50%,而其他所有任務已經運行完畢),則這些任務會拖慢作業的整體執行進度。為了避免這種情況發生,Hadoop采用了推測執行(Speculative Execution)機制,它根據一定的法則推測出“拖后腿”的任務,並為這樣的任務啟動一個備份任務,讓該任務與原始任務同時處理同一份數據,並最終選用最先成功運行完成任務的計算結果作為最終結果。

設置開啟推測執行參數:Hadoop的mapred-site.xml文件中進行配置

<property>

  <name>mapreduce.map.speculative</name>

  <value>true</value>

  <description>If true, then multiple instances of some map tasks

               may be executed in parallel.</description>

</property>

 

<property>

  <name>mapreduce.reduce.speculative</name>

  <value>true</value>

  <description>If true, then multiple instances of some reduce tasks

               may be executed in parallel.</description>

</property>

不過hive本身也提供了配置項來控制reduce-side的推測執行:

  <property>

    <name>hive.mapred.reduce.tasks.speculative.execution</name>

    <value>true</value>

    <description>Whether speculative execution for reducers should be turned on. </description>

  </property>

關於調優這些推測執行變量,還很難給一個具體的建議。如果用戶對於運行時的偏差非常敏感的話,那么可以將這些功能關閉掉。如果用戶因為輸入數據量很大而需要執行長時間的map或者Reduce task的話,那么啟動推測執行造成的浪費是非常巨大大。

9、執行計划(Explain)

1)基本語法

EXPLAIN [EXTENDED | DEPENDENCY | AUTHORIZATION] query

2)案例實操

(1)查看下面這條語句的執行計划

hive (default)> explain select * from emp;

hive (default)> explain select deptno, avg(sal) avg_sal from emp group by deptno;

(2)查看詳細執行計划

hive (default)> explain extended select * from emp;

hive (default)> explain extended select deptno, avg(sal) avg_sal from emp group by deptno;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM