第8章 壓縮和存儲(Hive高級)8.1 Hadoop源碼編譯支持Snappy壓縮8.1.1 資源准備8.1.2 jar包安裝8.1.3 編譯源碼8.2 Hadoop壓縮配置8.2.1 MR支持的壓縮編碼8.2.2 壓縮參數配置8.3 開啟Map輸出階段壓縮8.4 開啟Reduce輸出階段壓縮8.5 文件存儲格式8.5.1 列式存儲和行式存儲8.5.2 TextFile格式8.5.3 Orc格式8.5.4 Parquet格式8.5.5 主流文件存儲格式對比實驗8.6 存儲和壓縮結合8.6.1 修改Hadoop集群具有Snappy壓縮方式8.6.2 測試存儲和壓縮第9章 企業級調優(Hive優化)9.1 Fetch抓取9.2 本地模式9.3 表的優化9.3.1 小表join大表、大表join小表9.3.2 大表Join大表9.3.3 MapJoin9.3.4 group by9.3.5 Count(Distinct) 去重統計9.3.6 笛卡爾積9.3.7 行列過濾9.3.8 動態分區調整9.3.9 分桶9.3.10 分區9.4 數據傾斜9.4.1 合理設置 Map 數9.4.2 小文件進行合並9.4.3 復雜文件增加 Map 數9.4.4 合理設置 Reduce 數9.5 並行執行9.6 嚴格模式9.7 JVM重用9.8 推測執行9.9 壓縮9.10 執行計划(Explain)
第8章 壓縮和存儲(Hive高級)
8.1 Hadoop源碼編譯支持Snappy壓縮
8.1.1 資源准備
1、CentOS聯網
配置CentOS能連接外網。Linux虛擬機 ping www.baidu.com 是暢通的。
注意:
采用root角色編譯,減少文件夾權限出現問題。
2、jar包准備(hadoop源碼、JDK8、maven、protobuf)
(1)hadoop-2.7.2-src.tar.gz
(2)jdk-8u144-linux-x64.tar.gz
(3)snappy-1.1.3.tar.gz
(4)apache-maven-3.0.5-bin.tar.gz
(5)protobuf-2.5.0.tar.gz
8.1.2 jar包安裝
注意:
所有操作必須在root用戶下完成。
1、JDK解壓、配置環境變量JAVA_HOME和PATH,驗證java -version(如下都需要驗證是否配置成功)
[root@hadoop101 software]# tar -zxf jdk-8u144-linux-x64.tar.gz -C /opt/module/
[root@hadoop101 software]# vim /etc/profile
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_144
export PATH=$PATH:$JAVA_HOME/bin
[root@hadoop101 software]# source /etc/profile
驗證命令:java -version
2、Maven解壓、配置 MAVEN_HOME 和 PATH
[root@hadoop101 software]# tar -zxvf apache-maven-3.0.5-bin.tar.gz -C /opt/module/
[root@hadoop101 software]# vim /etc/profile
#MAVEN_HOME
export MAVEN_HOME=/opt/module/apache-maven-3.0.5
export PATH=$PATH:$MAVEN_HOME/bin
[root@hadoop101 software]# source /etc/profile
驗證命令:mvn -version
8.1.3 編譯源碼
1、准備編譯環境
[root@hadoop101 software]# yum install svn
[root@hadoop101 software]# yum install autoconf automake libtool cmake
[root@hadoop101 software]# yum install ncurses-devel
[root@hadoop101 software]# yum install openssl-devel
[root@hadoop101 software]# yum install gcc*
2、編譯安裝snappy
[root@hadoop101 software]# tar -zxvf snappy-1.1.3.tar.gz -C /opt/module/
[root@hadoop101 module]# cd snappy-1.1.3/
[root@hadoop101 snappy-1.1.3]# ./configure
[root@hadoop101 snappy-1.1.3]# make
[root@hadoop101 snappy-1.1.3]# make install
# 查看snappy庫文件
[root@hadoop101 snappy-1.1.3]# ls -lh /usr/local/lib | grep snappy
3、編譯安裝protobuf
[root@hadoop101 software]# tar -zxvf protobuf-2.5.0.tar.gz -C /opt/module/
[root@hadoop101 module]# cd protobuf-2.5.0/
[root@hadoop101 protobuf-2.5.0]# ./configure
[root@hadoop101 protobuf-2.5.0]# make
[root@hadoop101 protobuf-2.5.0]# make install
# 查看protobuf版本以測試是否安裝成功
[root@hadoop101 protobuf-2.5.0]# protoc --version
4、編譯hadoop native
[root@hadoop101 software]# tar -zxvf hadoop-2.7.2-src.tar.gz
[root@hadoop101 software]# cd hadoop-2.7.2-src/
[root@hadoop101 software]# mvn clean package -DskipTests -Pdist,native -Dtar -Dsnappy.lib=/usr/local/lib -Dbundle.snappy
執行成功后,/opt/software/hadoop-2.7.2-src/hadoop-dist/target/hadoop-2.7.2.tar.gz 即為新生成的支持snappy壓縮的二進制安裝包。
8.2 Hadoop壓縮配置
8.2.1 MR支持的壓縮編碼

為了支持多種壓縮/解壓縮算法,Hadoop引入了編碼/解碼器,如下表所示。

壓縮性能的比較:

http://google.github.io/snappy/
On a single core of a Core i7 processor in 64-bit mode, Snappy compresses at about 250 MB/sec or more and decompresses at about 500 MB/sec or more.
8.2.2 壓縮參數配置
要在Hadoop中啟用壓縮,可以配置如下參數(mapred-site.xml文件中):

8.3 開啟Map輸出階段壓縮
開啟 map 輸出階段壓縮可以減少 job 中 map 和 Reduce task 間數據傳輸量。具體配置如下:
案例實操:
1、開啟hive中間傳輸數據壓縮功能
hive (default)> set hive.exec.compress.intermediate=true;
2、開啟mapreduce中map輸出壓縮功能
hive (default)> set mapreduce.map.output.compress=true;
3、設置mapreduce中map輸出數據的壓縮方式
hive (default)> set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
4、執行查詢語句
hive (default)> select count(ename) name from emp;
5、瀏覽器端查看歷史服務器中的設置是否生效

8.4 開啟Reduce輸出階段壓縮
當 Hive 將輸出寫入到表中時,輸出內容同樣可以進行壓縮。屬性 hive.exec.compress.output
控制着這個功能。用戶可能需要保持默認設置文件中的默認值 false,這樣默認的輸出就是非壓縮的純文本文件了。用戶可以通過在查詢語句或執行腳本中設置這個值為 true,來開啟輸出結果壓縮功能。
案例實操:
1、開啟hive最終輸出數據壓縮功能
hive (default)> set hive.exec.compress.output=true;
2、開啟mapreduce最終輸出數據壓縮
hive (default)> set set hive.exec.compress.output=true;
3、設置mapreduce最終數據輸出壓縮方式
hive (default)> set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
4、設置mapreduce最終數據輸出壓縮為塊壓縮(默認是行壓縮RECORD)
hive (default)> set mapreduce.output.fileoutputformat.compress.type=BLOCK;
5、瀏覽器端查看歷史服務器中的設置是否生效

6、測試一下輸出結果是否是壓縮文件
hive (default)> insert overwrite local directory '/opt/module/datas/snappy-result' select * from emp distribute by deptno sort by empno desc;
查看,是壓縮文件
[atguigu@hadoop102 snappy-result]$ pwd
/opt/module/datas/snappy-result
[atguigu@hadoop102 snappy-result]$ ll
總用量 4
-rw-r--r--. 1 atguigu atguigu 446 2月 27 22:53 000000_0.snappy
8.5 文件存儲格式
Hive支持的存儲數的格式主要有:TEXTFILE 、SEQUENCEFILE、ORC、PARQUET。
8.5.1 列式存儲和行式存儲

如上圖所示左邊為邏輯表,右邊第一個為行式存儲,第二個為列式存儲。
1、行存儲的特點
查詢滿足條件的一整行數據的時候,列存儲則需要去每個聚集的字段找到對應的每個列的值,行存儲只需要找到其中一個值,其余的值都在相鄰地方,所以此時行存儲查詢的速度更快。
2、列存儲的特點
因為每個字段的數據聚集存儲,在查詢只需要少數幾個字段的時候,能大大減少讀取的數據量;每個字段的數據類型一定是相同的,列式存儲可以針對性的設計更好的壓縮算法。
TEXTFILE和SEQUENCEFILE的存儲格式都是基於行存儲的。
ORC和PARQUET是基於列式存儲的。
8.5.2 TextFile格式
默認格式,數據不做壓縮,磁盤開銷大,數據解析開銷大。可結合Gzip、Bzip2使用,但使用Gzip這種方式,hive不會對數據進行切分
,從而無法對數據進行並行
操作。
8.5.3 Orc格式
Orc (Optimized Row Columnar)是Hive 0.11版里引入的新的存儲格式。
如下圖所示可以看到每個Orc文件由1個或多個stripe組成,每個stripe250MB大小,這個Stripe實際相當於RowGroup概念,不過大小由4MB->250MB,這樣應該能提升順序讀的吞吐率。每個Stripe里有三部分組成,分別是Index Data,Row Data,Stripe Footer:

1)Index Data:一個輕量級的index,默認是
每隔1W行做一個索引
。這里做的索引應該只是記錄某行的各字段在Row Data中的offset(偏移量)。
2)Row Data:存的是具體的數據,
先取部分行,然后對這些行按列進行存儲
。對每個列進行了編碼,分成多個Stream來存儲。
3)Stripe Footer:存的是各個Stream的類型,長度等信息。
每個文件有一個File Footer,這里面存的是每個Stripe的行數,每個Column的數據類型信息等;每個文件的尾部是一個PostScript,這里面記錄了整個文件的壓縮類型以及FileFooter的長度信息等。在讀取文件時,會seek到文件尾部讀PostScript,從里面解析到File Footer長度,再讀FileFooter,從里面解析到各個Stripe信息,再讀各個Stripe,即從后往前讀。
8.5.4 Parquet格式
arquet是面向分析型業務的列式存儲格式,由Twitter和Cloudera合作開發,2015年5月從Apache的孵化器里畢業成為Apache頂級項目。
Parquet文件是以二進制方式存儲
的,所以是不可以直接讀取的,文件中包括該文件的數據和元數據,因此Parquet格式文件是自解析的
。
通常情況下,在存儲Parquet數據的時候會按照Block大小設置行組的大小
,由於一般情況下每一個Mapper任務處理數據的最小單位是一個Block,這樣可以把每一個行組由一個Mapper任務處理,增大任務執行並行度
。Parquet文件的格式如下圖所示。

上圖展示了一個Parquet文件的內容,一個文件中可以存儲多個行組,文件的首位都是該文件的Magic Code,用於校驗它是否是一個Parquet文件,Footer length記錄了文件元數據的大小,通過該值和文件長度可以計算出元數據的偏移量,文件的元數據中包括每一個行組的元數據信息和該文件存儲數據的Schema信息。除了文件中每一個行組的元數據,每一頁的開始都會存儲該頁的元數據,在Parquet中,有三種類型的頁:
數據頁、字典頁和索引頁
。數據頁用於存儲當前行組中該列的值,字典頁存儲該列值的編碼字典,每一個列塊中最多包含一個字典頁,索引頁用來存儲當前行組下該列的索引,
目前Parquet中還不支持索引頁
。
8.5.5 主流文件存儲格式對比實驗
從存儲文件的壓縮比和查詢速度兩個角度對比。
存儲文件的壓縮比測試:
1、測試數據
將log.data上傳至hdfs中的/opt/module/datas目錄下
2、TextFile
(1)創建表,存儲數據格式為TEXTFILE
create table log_text(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
row format delimited fields terminated by '\t'
stored as textfile;
(2)向表中加載數據(load命令本質是put,存儲的格式是TextFile)
load data local inpath '/opt/module/datas/log.data' into table log_text;
(3)查看表中數據大小
dfs -du -h /user/hive/warehouse/log_text;
+-------------------------------------------------+--+
| DFS Output |
+-------------------------------------------------+--+
| 18.1 M /user/hive/warehouse/log_text/log.data |
+-------------------------------------------------+--+
3、Orc
(1)創建表,存儲數據格式為Orc
create table log_orc(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
row format delimited fields terminated by '\t'
stored as orc;
(2)向表中加載數據(不能使用load方式加載數據,需要insert into方式,即一定要通過MapReduce任務加載數據)
insert into table log_orc select * from log_text;
(3)查看表中數據大小
dfs -du -h /user/hive/warehouse/log_orc;
+-----------------------------------------------+--+
| DFS Output |
+-----------------------------------------------+--+
| 2.8 M /user/hive/warehouse/log_orc/000000_0 |
+-----------------------------------------------+--+
3、Parquet
(1)創建表,存儲數據格式為parquet
create table log_parquet(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
row format delimited fields terminated by '\t'
stored as parquet;
(2)向表中加載數據
insert into table log_parquet select * from log_text;
(3)查看表中數據大小
dfs -du -h /user/hive/warehouse/log_parquet;
+----------------------------------------------------+--+
| DFS Output |
+----------------------------------------------------+--+
| 13.1 M /user/hive/warehouse/log_parquet/000000_0 |
+----------------------------------------------------+--+
存儲文件的壓縮比
總結:ORC > Parquet > textFile
存儲文件的查詢速度測試:
1、TextFile
select count(*) from log_text;
+---------+--+
| _c0 |
+---------+--+
| 100000 |
+---------+--+
1 row selected (20.145 seconds)
2、ORC
select count(*) from log_orc;
+---------+--+
| _c0 |
+---------+--+
| 100000 |
+---------+--+
1 row selected (20.44 seconds)
3、Parquet
select count(*) from log_parquet;
+---------+--+
| _c0 |
+---------+--+
| 100000 |
+---------+--+
1 row selected (17.945 seconds)
存儲文件的查詢速度總結
:查詢速度相近。
小結:在公司的Hive中對數據壓縮使用的壓縮格式是snappy,存儲文件的格式使用的ORC格式。
8.6 存儲和壓縮結合
8.6.1 修改Hadoop集群具有Snappy壓縮方式
1、查看hadoop checknative命令使用
[atguigu@hadoop102 hadoop-2.7.2]$ hadoop
Usage: hadoop [--config confdir] [COMMAND | CLASSNAME]
CLASSNAME run the class named CLASSNAME
or
where COMMAND is one of:
fs run a generic filesystem user client
version print the version
jar <jar> run a jar file
note: please use "yarn jar" to launch
YARN applications, not this command.
checknative [-a|-h] check native hadoop and compression libraries availability
distcp <srcurl> <desturl> copy file or directories recursively
archive -archiveName NAME -p <parent path> <src>* <dest> create a hadoop archive
classpath prints the class path needed to get the
credential interact with credential providers
Hadoop jar and the required libraries
daemonlog get/set the log level for each daemon
trace view and modify Hadoop tracing settings
Most commands print help when invoked w/o parameters.
2、查看hadoop支持的壓縮方式
[atguigu@hadoop102 hadoop-2.7.2]$ hadoop checknative
19/02/27 19:18:38 WARN bzip2.Bzip2Factory: Failed to load/initialize native-bzip2 library system-native, will use pure-Java version
19/02/27 19:18:38 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
Native library checking:
hadoop: true /opt/module/hadoop-2.7.2/lib/native/libhadoop.so
zlib: true /lib64/libz.so.1
snappy: false
lz4: true revision:99
bzip2: false
openssl: false Cannot load libcrypto.so (libcrypto.so: 無法打開共享對象文件: 沒有那個文件或目錄)!
3、將編譯好的支持Snappy壓縮的hadoop-2.7.2.tar.gz包導入到hadoop102的/opt/software/中
4、解壓hadoop-2.7.2.tar.gz到當前路徑
[atguigu@hadoop102 hadoop-snappy]$ pwd
/opt/software/hadoop-snappy
[atguigu@hadoop102 hadoop-snappy]$ tar -zxvf hadoop-2.7.2.tar.gz
5、進入到/opt/software/hadoop-snappy/hadoop-2.7.2/lib/native路徑可以看到支持Snappy壓縮的動態鏈接庫
[atguigu@hadoop102 native]$ pwd
/opt/software/hadoop-snappy/hadoop-2.7.2/lib/native
[atguigu@hadoop102 native]$ ll
總用量 5188
-rw-r--r--. 1 atguigu atguigu 1210260 9月 1 2017 libhadoop.a
-rw-r--r--. 1 atguigu atguigu 1487268 9月 1 2017 libhadooppipes.a
lrwxrwxrwx. 1 atguigu atguigu 18 2月 27 20:00 libhadoop.so -> libhadoop.so.1.0.0
-rwxr-xr-x. 1 atguigu atguigu 716316 9月 1 2017 libhadoop.so.1.0.0
-rw-r--r--. 1 atguigu atguigu 582048 9月 1 2017 libhadooputils.a
-rw-r--r--. 1 atguigu atguigu 364860 9月 1 2017 libhdfs.a
lrwxrwxrwx. 1 atguigu atguigu 16 2月 27 20:00 libhdfs.so -> libhdfs.so.0.0.0
-rwxr-xr-x. 1 atguigu atguigu 229113 9月 1 2017 libhdfs.so.0.0.0
-rw-r--r--. 1 atguigu atguigu 472950 9月 1 2017 libsnappy.a
-rwxr-xr-x. 1 atguigu atguigu 955 9月 1 2017 libsnappy.la
lrwxrwxrwx. 1 atguigu atguigu 18 2月 27 20:00 libsnappy.so -> libsnappy.so.1.3.0
lrwxrwxrwx. 1 atguigu atguigu 18 2月 27 20:00 libsnappy.so.1 -> libsnappy.so.1.3.0
-rwxr-xr-x. 1 atguigu atguigu 228177 9月 1 2017 libsnappy.so.1.3.0
6、拷貝/opt/software/hadoop-snappy/hadoop-2.7.2/lib/native里面的所有內容到開發集群的/opt/module/hadoop-2.7.2/lib/native路徑上
[atguigu@hadoop102 native]$ cp ../native/* /opt/module/hadoop-2.7.2/lib/native/
7、分發集群
[atguigu@hadoop102 lib]$ xsync native/
8、再次查看hadoop支持的壓縮類型
[atguigu@hadoop102 lib]$ hadoop checknative
19/02/27 20:07:28 WARN bzip2.Bzip2Factory: Failed to load/initialize native-bzip2 library system-native, will use pure-Java version
19/02/27 20:07:28 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
Native library checking:
hadoop: true /opt/module/hadoop-2.7.2/lib/native/libhadoop.so
zlib: true /lib64/libz.so.1
snappy: true /opt/module/hadoop-2.7.2/lib/native/libsnappy.so.1
lz4: true revision:99
bzip2: false
openssl: false Cannot load libcrypto.so (libcrypto.so: 無法打開共享對象文件: 沒有那個文件或目錄)!
9、重新啟動hadoop集群、zookeeper集群、hive、歷史服務器
[atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh -- 啟動dfs集群
[atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh -- 啟動yarn集群
[atguigu@hadoop102 hive]$ zkstart.sh -- 啟動zookeeper集群
[atguigu@hadoop102 hive]$ bin/hive -- 啟動hive
[atguigu@hadoop102 hadoop-2.7.2]$ sbin/mr-jobhistory-daemon.sh start historyserver -- 啟動歷史服務器
8.6.2 測試存儲和壓縮
官網:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC
ORC存儲方式的壓縮:

1、創建一個非壓縮的的ORC存儲方式
(1)建表語句
create table log_orc_none(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
row format delimited fields terminated by '\t'
stored as orc tblproperties ("orc.compress"="NONE");
(2)插入數據
insert into table log_orc_none select * from log_text;
(3)查看插入后數據
dfs -du -h /user/hive/warehouse/log_orc_none/;
+----------------------------------------------------+--+
| DFS Output |
+----------------------------------------------------+--+
| 7.7 M /user/hive/warehouse/log_orc_none/000000_0 |
+----------------------------------------------------+--+
2、創建一個SNAPPY壓縮的ORC存儲方式
(1)建表語句
create table log_orc_snappy(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
row format delimited fields terminated by '\t'
stored as orc tblproperties ("orc.compress"="SNAPPY");
(2)插入數據
insert into table log_orc_snappy select * from log_text;
(3)查看插入后數據
dfs -du -h /user/hive/warehouse/log_orc_snappy/;
+------------------------------------------------------+--+
| DFS Output |
+------------------------------------------------------+--+
| 3.8 M /user/hive/warehouse/log_orc_snappy/000000_0 |
+------------------------------------------------------+--+
3、上一節中默認創建的ORC存儲方式,導入數據后的大小為
2.8 M /user/hive/warehouse/log_orc/000000_0
比Snappy壓縮的還小。原因是orc存儲文件默認采用ZLIB壓縮。ZLIB壓縮率比snappy的小,但是ZLIB解壓縮速率很低
。
4、存儲方式和壓縮總結
在實際的項目開發當中,hive表的數據存儲格式一般選擇:orc或parquet。壓縮方式一般選擇snappy或lzo。
第9章 企業級調優(Hive優化)
9.1 Fetch抓取
Fetch抓取是指,Hive中對某些情況的查詢可以不必使用MapReduce計算
。例如:SELECT * FROM employees; 在這種情況下,Hive可以簡單地讀取employee對應的存儲目錄下的文件,然后輸出查詢結果到控制台。
在hive-default.xml.template文件中 hive.fetch.task.conversion 默認是 more,老版本hive默認是minimal,該屬性修改為more以后,在全局查找、字段查找、limit查找等都不走MapReduce計算
。
<property>
<name>hive.fetch.task.conversion</name>
<value>more</value>
<description>
Expects one of [none, minimal, more].
Some select queries can be converted to single FETCH task minimizing latency.
Currently the query should be single sourced not having any subquery and should not have
any aggregations or distincts (which incurs RS), lateral views and joins.
0. none : disable hive.fetch.task.conversion
1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)
</description>
</property>
案例實操:
1)把hive.fetch.task.conversion設置成none,然后執行查詢語句,都會執行MapReduce程序。
hive (default)> set hive.fetch.task.conversion=none;
hive (default)> select * from emp;
hive (default)> select ename from emp;
hive (default)> select ename from emp limit 3;
9.2 本地模式
大多數的 Hadoop Job 是需要 Hadoop 提供的完整的可擴展性來處理大數據集的。不過,有時 Hive 的輸入數據量是非常小的。在這種情況下,為查詢觸發執行任務消耗的時間可能會比實際job的執行時間要多的多。對於大多數這種情況,Hive可以通過本地模式在單台機器上處理所有的任務。對於小數據集,執行時間可以明顯被縮短
。
用戶可以通過設置 hive.exec.mode.local.auto 的值為 true ,來讓 Hive 在適當的時候自動啟動這個優化。
set hive.exec.mode.local.auto=true; // 開啟本地mr模式
// 設置local mr的最大輸入數據量,當輸入數據量小於這個值時采用local mr的方式,默認為134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
// 設置local mr的最大輸入文件個數,當輸入文件個數小於這個值時采用local mr的方式,默認為4
set hive.exec.mode.local.auto.input.files.max=10;
案例實操:
1)開啟本地模式,並執行查詢語句
set hive.exec.mode.local.auto=true;
select count(*) from log_orc;
+---------+--+
| _c0 |
+---------+--+
| 100000 |
+---------+--+
1 row selected (1.382 seconds)
2)關閉本地模式,並執行查詢語句
set hive.exec.mode.local.auto=false;
select count(*) from log_orc;
+---------+--+
| _c0 |
+---------+--+
| 100000 |
+---------+--+
1 row selected (16.895 seconds)
9.3 表的優化
9.3.1 小表join大表、大表join小表
將key相對分散,並且數據量小的表放在join的左邊,這樣可以有效減少內存溢出錯誤發生的幾率;再進一步,可以使用map join讓小的維度表(1000條以下的記錄條數)先進內存。在map端完成reduce。
實際測試發現:新版的hive已經對小表JOIN大表和大表JOIN小表進行了優化。小表放在左邊和右邊已經沒有明顯區別。
案例實操
1、需求
測試大表JOIN小表和小表JOIN大表的效率。
2、建大表、小表和JOIN后表的語句
// 創建大表
create table bigtable(
id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string
)
row format delimited fields terminated by '\t';
// 創建小表
create table smalltable(
id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string
)
row format delimited fields terminated by '\t';
// 創建join后表的語句
create table jointable(
id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string
)
row format delimited fields terminated by '\t';
3、分別向大表和小表中導入數據
load data local inpath '/opt/module/datas/bigtable' into table bigtable;
load data local inpath '/opt/module/datas/smalltable' into table smalltable;
4、關閉mapjoin功能(默認是打開的)
set hive.auto.convert.join=false;
5、執行小表JOIN大表語句
insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from smalltable s
left join bigtable b
on b.id = s.id;
No rows affected (31.015 seconds)
6、執行大表JOIN小表語句
insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable b
left join smalltable s
on s.id = b.id;
No rows affected (32.59 seconds)
9.3.2 大表Join大表
1、空KEY過濾
有時join超時是因為某些key對應的數據太多,而相同key對應的數據都會發送到相同的reducer上,從而導致內存不夠。此時我們應該仔細分析這些異常的key,很多情況下,這些key對應的數據是異常數據,我們需要在SQL語句中進行過濾。例如:設置key對應的字段為空,操作如下:
案例實操
(1)配置歷史服務器
配置mapred-site.xml
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop102:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop102:19888</value>
</property>
啟動歷史服務器
[atguigu@hadoop102 hadoop-2.7.2]$ sbin/mr-jobhistory-daemon.sh start historyserver
查看jobhistory
http://hadoop102:19888/jobhistory
(2)創建原始數據表、空id表、合並后的數據表
// 創建原始表
create table ori(
id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string
)
row format delimited fields terminated by '\t';
// 創建空id表
create table nullidtable(
id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string
)
row format delimited fields terminated by '\t';
// 創建join后的表
create table jointable(
id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string
)
row format delimited fields terminated by '\t';
(3)分別加載原始數據和空id數據到對應表中
load data local inpath '/opt/module/datas/ori' into table ori;
load data local inpath '/opt/module/datas/nullid' into table nullidtable;
(4)測試不過濾空id
insert overwrite table jointable
select n.* from nullidtable n left join ori o on n.id=o.id;
No rows affected (40.267 seconds)
(5)測試過濾空id
insert overwrite table jointable
select n.* from (select * from nullidtable where id is not null) n left join ori o on n.id=o.id;
No rows affected (25.801 seconds)
2、空key轉換
有時雖然某個key為空對應的數據很多,但是相應的數據不是異常數據,必須要包含在join的結果中,此時我們可以為表a中key為空的字段賦一個隨機的值,使得數據隨機均勻地分不到不同的reducer上。
案例實操:
不隨機分布空null值的情形:
(1)設置5個reduce個數
set mapreduce.job.reduces=5;
(2)JOIN兩張表
insert overwrite table jointable
select n.* from nullidtable n left join ori b on n.id=b.id;
結果:如下圖所示,可以看出來,出現了數據傾斜
,某些reducer的資源消耗遠大於其他reducer。

隨機分布空null值的情形:
(1)設置5個reduce個數
set mapreduce.job.reduces=5;
(2)JOIN兩張表
insert overwrite table jointable
select n.* from nullidtable n full join ori o on
case when n.id is null then concat('hive', rand()) else n.id end=o.id;
結果:如下圖所示,可以看出來,消除了數據傾斜,負載均衡reducer的資源消耗。

9.3.3 MapJoin
如果不指定MapJoin或者不符合MapJoin的條件,那么Hive解析器會將Join操作轉換成Common Join,即:在Reduce階段完成join。容易發生數據傾斜。可以用MapJoin把小表全部加載到內存
在map端進行join,避免reducer處理。
1、開啟MapJoin參數設置
(1)設置自動選擇Mapjoin
set hive.auto.convert.join=true; -- 默認為true
(2)大表小表的閾值設置(默認25M以下認為是小表):
set hive.mapjoin.smalltable.filesize=25000000;
2、MapJoin工作機制,如下圖所示

案例實操:
(1)開啟Mapjoin功能
set hive.auto.convert.join=true; -- 默認為true
(2)執行小表JOIN大表語句
insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from smalltable s
join bigtable b
on s.id=b.id;
No rows affected (24.594 seconds)
(3)執行大表JOIN小表語句
insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable b
join smalltable s
on s.id=b.id;
No rows affected (24.315 seconds)
9.3.4 group by
默認情況下,Map階段同一key數據分發給一個Reduce,當一個key數據過大時就傾斜了。
並不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端進行部分聚合(預處理)
,最后在Reduce端得出最終結果。(類似於Combine
)
1、開啟Map端聚合參數設置
(1)是否在Map端進行聚合,默認為true
set hive.map.aggr=true;
(2)設置在Map端進行聚合操作的條目數目
set hive.groupby.mapaggr.checkinterval=100000;
(3)有數據傾斜的時候進行負載均衡(默認是false)
set hive.groupby.skewindata=true;
當選項設定為 true,生成的查詢計划會有兩個MR Job
。第一個MR Job中,Map的輸出結果會隨機分布到Reduce中,每個Reduce做部分聚合操作,並輸出結果,這樣處理的結果是相同的group by key有可能被分發到不同的Reduce中
,從而達到負載均衡
的目的;第二個MR Job再根據預處理
的數據結果按照group by key分布到Reduce中(這個過程可以保證相同的group by key被分布到同一個Reduce中),最后完成最終的聚合操作。
9.3.5 Count(Distinct) 去重統計
數據量小的時候無所謂,數據量大的情況下,由於COUNT DISTINCT操作需要用一個Reduce Task來完成,這一個Reduce需要處理的數據量太大,就會導致整個Job很難完成,一般COUNT DISTINCT使用先GROUP BY再COUNT的方式替換。
案例實操
1、創建一張大表
create table bigtable(
id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string
)
row format delimited fields terminated by '\t';
2、加載數據
load data local inpath '/opt/module/datas/bigtable' into table bigtable;
3.設置5個reduce個數
set mapreduce.job.reduces=5;
4、執行去重id查詢(只能用一個Reduce)
select count(distinct id) from bigtable;
+--------+--+
| c0 |
+--------+--+
| 99947 |
+--------+--+
1 row selected (46.446 seconds)
5、采用GROUP BY去重id(可以使用多個Reduce,子查詢幫我們做去重工作,把數據分發給了5個Reduce進行去重處理,大數據量的情況下效率高)
select count(id) from (select id from bigtable group by id) a;
+--------+--+
| _c0 |
+--------+--+
| 99947 |
+--------+--+
1 row selected (78.134 seconds)
雖然會多用一個Job來完成,但在數據量大的情況下,這個絕對是值得的
。
9.3.6 笛卡爾積
盡量避免笛卡爾積,join的時候不加on條件,或者無效的on條件,Hive只能使用1個reducer來完成笛卡爾積(笛卡爾積過大會撐爆內存)。
9.3.7 行列過濾
列處理:在SELECT中,只拿需要的列,如果有,盡量使用分區過濾,少用SELECT *。
行處理:在分區剪裁中,當使用外關聯時,如果將副表的過濾條件寫在Where后面,那么就會先全表關聯,之后再過濾,比如:
案例實操:
1、測試先關聯兩張表,再用where條件過濾
select o.id from bigtable b
join ori o on o.id=b.id
where o.id<=10;
100 rows selected (23.533 seconds)
2、通過子查詢先過濾后,再關聯表(推薦使用)(謂詞下推:先過濾再關聯
)
select b.id from bigtable b
join (select id from ori where id<=10) o on b.id=o.id;
100 rows selected (22.041 seconds)
9.3.8 動態分區調整
關系型數據庫中,對分區表insert數據時候,數據庫自動會根據分區字段的值,將數據插入到相應的分區中,Hive中也提供了類似的機制,即動態分區(Dynamic Partition),只不過,使用Hive的動態分區,需要進行相應的配置。
1、開啟動態分區參數設置
(1)開啟動態分區功能(默認true,開啟)
set hive.exec.dynamic.partition=true;
(2)設置為非嚴格模式(動態分區的模式,默認strict,表示必須指定至少一個分區為靜態分區,nonstrict模式表示允許所有的分區字段都可以使用動態分區。)
set hive.exec.dynamic.partition.mode=nonstrict;
(3)在所有執行MR的節點上,最大一共可以創建多少個動態分區。
set hive.exec.max.dynamic.partitions=1000;
(4)在每個執行MR的節點上,最大可以創建多少個動態分區
。該參數需要根據實際的數據來設定。比如:源數據中包含了一年的數據,即day字段有365個值,那么該參數就需要設置成大於365,如果使用默認值100,則會報錯。
set hive.exec.max.dynamic.partitions.pernode=100;
(5)整個MR Job中,最大可以創建多少個HDFS文件。
set hive.exec.max.created.files=100000;
(6)當有空分區生成時,是否拋出異常。一般不需要設置,默認值是false,表示不拋出異常。
set hive.error.on.empty.partition=false;
2、案例實操1:將從靜態分區中查詢到的數據按照分區字段(p_time),動態地放置在動態分區中。
需求:將ori中的數據按照時間(如:20111230000008),插入到目標分區表ori_partitioned_target的相應分區中。
(1)創建分區表
create table ori_partitioned(
id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string
)
partitioned by(p_time bigint)
row format delimited fields terminated by '\t';
(2)加載數據到分區表中(靜態分區)
load data local inpath '/opt/module/datas/ds1' into table ori_partitioned partition(p_time='20111230000010');
load data local inpath '/opt/module/datas/ds2' into table ori_partitioned partition(p_time='20111230000011');
(3)創建目標分區表
create table ori_partitioned_target(
id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string
)
partitioned by(p_time string)
row format delimited fields terminated by '\t';
(4)設置動態分區,並向分區動態地插入數據
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=1000;
set hive.exec.max.dynamic.partitions.pernode=100;
set hive.exec.max.created.files=100000;
set hive.error.on.empty.partition=false;
insert overwrite table ori_partitioned_target partition(p_time)
select id, time, uid, keyword, url_rank, click_num, click_url, p_time from ori_partitioned;
(5)查看目標分區表的分區情況
show partitions ori_partitioned_target;
3、案例實操2:將從普通表中查詢到的數據按照分區字段(deptno),動態地放置在動態分區中。
(1)創建原始表(管理表)
create table if not exists default.emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int
)
row format delimited fields terminated by '\t';
(2)加載數據到管理表中
load data local inpath '/opt/module/datas/emp.txt' into table default.emp;
(3)創建目標分區表
create table if not exists default.emp_partiton1(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double
)
partitioned by(deptnoooooo int)
row format delimited fields terminated by '\t';
(4)設置動態分區,並向分區動態地插入數據
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=1000;
set hive.exec.max.dynamic.partitions.pernode=100;
set hive.exec.max.created.files=100000;
set hive.error.on.empty.partition=false;
insert overwrite table emp_partiton1 partition(deptnoooooo)
select empno, ename, job, mgr, hiredate, sal, comm, deptno from emp;
(5)查看目標分區表的分區情況
show partitions emp_partiton1;
+-----------------+--+
| partition |
+-----------------+--+
| deptnoooooo=10 |
| deptnoooooo=20 |
| deptnoooooo=30 |
+-----------------+--+
注意細節1
:原始表字段=分區表字段+分區字段注意細節2
:插入數據的字段順序要與創建原始表的順序一致。注意細節3
:分區字段的名稱可以任意起。分區字段可以任意選取。注意對應關系即可。
9.3.9 分桶
詳見6.6章。
9.3.10 分區
詳見4.6章。
9.4 數據傾斜
9.4.1 合理設置 Map 數
1)通常情況下,作業會通過 input 的目錄產生一個或者多個 map 任務。
主要的決定因素有:input的文件總個數,input的文件大小,集群設置的文件塊大小。
2)是不是 map 數越多越好?
答案是否定的。如果一個任務有很多小文件(遠遠小於塊大小128m),則每個小文件也會被當做一個塊,用一個map任務來完成,而一個map任務啟動和初始化的時間遠遠大於邏輯處理的時間,就會造成很大的資源浪費。而且,同時可執行的map數是受限的。
3)是不是保證每個 map 處理接近128m的文件塊,就高枕無憂了?
答案也是不一定。比如有一個127m的文件,正常會用一個map去完成,但這個文件只有一個或者兩個小字段,卻有幾千萬的記錄,而且如果map處理的邏輯比較復雜,用一個map任務去做(map()方法是按行調用),肯定也比較耗時。
針對上面的問題2和3,我們需要采取兩種方式來解決:即減少map數和增加map數。
9.4.2 小文件進行合並
在map執行前合並小文件,減少map數:CombineHiveInputFormat
具有對小文件進行合並的功能(系統默認的格式)。HiveInputFormat沒有對小文件合並功能。
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
9.4.3 復雜文件增加 Map 數
當input的文件都很大,任務邏輯處理復雜,map 執行非常慢的時候,可以考慮增加 Map 數,來使得每個map處理的數據量減少,從而提高任務的執行效率。
增加map的方法為:根據computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
公式,調整maxSize最大值。讓maxSize最大值低於blocksize就可以增加map的個數。
案例實操:
1、執行查詢
set mapreduce.job.reduces=-1; -- 設置mapreduce的個數
select count(*) from emp;
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2、設置最大切片值為100個字節
set mapreduce.input.fileinputformat.split.maxsize;
+----------------------------------------------------------+--+
| set |
+----------------------------------------------------------+--+
| mapreduce.input.fileinputformat.split.maxsize=256000000 |
+----------------------------------------------------------+--+
set mapreduce.input.fileinputformat.split.maxsize=100;
select count(*) from emp;
Hadoop job information for Stage-1: number of mappers: 6; number of reducers: 1
9.4.4 合理設置 Reduce 數
1、調整 reduce 個數方法一
(1)每個Reduce處理的數據量默認是256MB
hive.exec.reducers.bytes.per.reducer=256000000
(2)每個任務最大的reduce數,默認為1009
hive.exec.reducers.max=1009
(3)計算reducer數的公式
N=min(參數2,總輸入數據量/參數1)
2、調整 reduce 個數方法二
在hadoop的mapred-default.xml文件中修改
設置每個job的Reduce個數
set mapreduce.job.reduces=15;
3、reduce 個數並不是越多越好
(1)過多的啟動和初始化reduce也會消耗時間和資源。
(2)另外,有多少個reduce,就會有多少個輸出文件,如果生成了很多個小文件,那么如果這些小文件作為下一個任務的輸入,則也會出現小文件過多的問題。
在設置reduce個數的時候也需要考慮這兩個原則:處理大數據量利用合適的reduce數;使單個reduce任務處理數據量大小要合適
。
9.5 並行執行
Hive會將一個查詢轉化成一個或者多個階段。這樣的階段可以是MapReduce階段、抽樣階段、合並階段、limit階段。或者Hive執行過程中可能需要的其他階段。默認情況下,Hive一次只會執行一個階段。不過,某個特定的job可能包含眾多的階段,而這些階段可能並非完全互相依賴的,也就是說有些階段是可以並行執行的,這樣可能使得整個job的執行時間縮短。不過,如果有更多的階段可以並行執行,那么job可能就越快完成。
通過設置參數 hive.exec.parallel 值為 true,就可以開啟並發執行。不過,在共享集群中,需要注意下,如果job中並行階段增多,那么集群利用率就會增加。
set hive.exec.parallel=true; // 打開任務並行執行
set hive.exec.parallel.thread.number=16; // 同一個sql允許最大並行度,默認為8
當然,得是在系統資源比較空閑的時候才有優勢,否則,沒資源,並行也起不來。
9.6 嚴格模式
Hive提供了一個嚴格模式,可以防止用戶執行那些可能意想不到的不好的影響的查詢。
通過設置屬性hive.mapred.mode值為默認是非嚴格模式nonstrict
。開啟嚴格模式需要修改 hive.mapred.mode 值為 strict,開啟嚴格模式可以禁止3種類型的查詢。
hive-default.xml.template
<property>
<name>hive.mapred.mode</name>
<value>strict</value>
<description>
The mode in which the Hive operations are being performed.
In strict mode, some risky queries are not allowed to run. They include:
Cartesian Product.
No partition being picked up for a query.
Comparing bigints and strings.
Comparing bigints and doubles.
Orderby without limit.
</description>
</property>
1) 對於分區表,除非 where 語句中含有分區字段過濾條件來限制范圍,否則不允許執行
。換句話說,就是用戶不允許掃描所有分區
。進行這個限制的原因是,通常分區表都擁有非常大的數據集,而且數據增加迅速。沒有進行分區限制的查詢可能會消耗令人不可接受的巨大資源來處理這個表。
演示示例:
select * from emp_partiton;
2) 對於使用了 order by 語句的查詢,要求必須使用limit語句
。因為 order by 為了執行排序過程會將所有的結果數據分發到同一個Reducer中進行處理,強制要求用戶增加這個 limit 語句可以防止Reducer額外執行很長一段時間。
演示示例:
select * from emp order by sal;
3) 限制笛卡爾積的查詢
。對關系型數據庫非常了解的用戶可能期望在執行 join 查詢的時候不使用 on 語句而是使用where語句,這樣關系數據庫的執行優化器就可以高效地將 where 語句轉化成那個 on 語句。不幸的是,Hive並不會執行這種優化,因此,如果表足夠大,那么這個查詢就會出現不可控的情況。
演示示例:
select e.ename, d.dname from emp e, dept d;
9.7 JVM重用
JVM重用是Hadoop調優參數的內容,其對Hive的性能具有非常大的影響,特別是對於很難避免小文件的場景或task特別多的場景,這類場景大多數執行時間都很短
。
Hadoop的默認配置通常是使用派生JVM來執行map和Reduce任務的。這時JVM的啟動過程可能會造成相當大的開銷,尤其是執行的job包含有成百上千task任務的情況。JVM重用可以使得JVM實例在同一個job中重新使用N次
。N的值可以在Hadoop的mapred-site.xml文件中進行配置。通常在10-20之間,具體多少需要根據具體業務場景測試得出。
<property>
<name>mapreduce.job.jvm.numtasks</name>
<value>10</value>
<description>How many tasks to run per jvm. If set to -1, there is
no limit.
</description>
</property>
這個功能的缺點是,開啟JVM重用將一直占用使用到的task插槽,以便進行重用,直到任務完成后才能釋放。如果某個“不平衡的”job中有某幾個reduce task執行的時間要比其他Reduce task消耗的時間多的多的話,那么保留的插槽就會一直空閑着卻無法被其他的job使用,直到所有的task都結束了才會釋放。
9.8 推測執行
在分布式集群環境下,因為程序Bug(包括Hadoop本身的bug),負載不均衡或者資源分布不均等原因,會造成同一個作業的多個任務之間運行速度不一致,有些任務的運行速度可能明顯慢於其他任務(比如一個作業的某個任務進度只有50%,而其他所有任務已經運行完畢),則這些任務會拖慢作業的整體執行進度。為了避免這種情況發生,Hadoop采用了推測執行(Speculative Execution)機制,它根據一定的法則推測出“拖后腿”的任務,並為這樣的任務啟動一個備份任務,讓該任務與原始任務同時處理同一份數據,並最終選用最先成功運行完成任務的計算結果作為最終結果。
推測執行算法原理:

設置開啟推測執行參數:Hadoop的mapred-site.xml文件中進行配置:
<property>
<name>mapreduce.map.speculative</name>
<value>true</value>
<description>If true, then multiple instances of some map tasks may be executed in parallel.</description>
</property>
<property>
<name>mapreduce.reduce.speculative</name>
<value>true</value>
<description>If true, then multiple instances of some reduce tasks may be executed in parallel.</description>
</property>
不過hive本身也提供了配置項來控制reduce-side的推測執行:
<property>
<name>hive.mapred.reduce.tasks.speculative.execution</name>
<value>true</value>
<description>Whether speculative execution for reducers should be turned on. </description>
</property>
關於調優這些推測執行變量,還很難給一個具體的建議。如果用戶對於運行時的偏差非常敏感的話,那么可以將這些功能關閉掉。
如果用戶因為輸入數據量很大而需要執行長時間的map或者Reduce task的話,那么啟動推測執行造成的浪費是非常巨大大。
9.9 壓縮
詳見第8章。
9.10 執行計划(Explain)
1、基本語法
EXPLAIN [EXTENDED | DEPENDENCY | AUTHORIZATION] query
2、案例實操
(1)查看下面這條語句的執行計划
hive (default)> explain select * from emp;
hive (default)> explain select deptno, avg(sal) avg_sal from emp group by deptno;
(2)查看詳細執行計划
hive (default)> explain extended select * from emp;
hive (default)> explain extended select deptno, avg(sal) avg_sal from emp group by deptno;