什么是小文件?
生產上,我們往往將Spark SQL作為Hive的替代方案,來獲得SQL on Hadoop更出色的性能。因此,本文所講的是指存儲於HDFS中小文件,即指文件的大小遠小於HDFS上塊(dfs.block.size)大小的文件。
小文件問題的影響
-
一方面,大量的小文件會給Hadoop集群的擴展性和性能帶來嚴重的影響。NameNode在內存中維護整個文件系統的元數據鏡像,用戶HDFS的管理;其中每個HDFS文件元信息(位置,大小,分塊等)對象約占150字節,如果小文件過多,會占用大量內存,直接影響NameNode的性能。相對的,HDFS讀寫小文件也會更加耗時,因為每次都需要從NameNode獲取元信息,並與對應的DataNode建立連接。如果NameNode在宕機中恢復,也需要更多的時間從元數據文件中加載。
-
另一方面,也會給Spark SQL等查詢引擎造成查詢性能的損耗,大量的數據分片信息以及對應產生的Task元信息也會給Spark Driver的內存造成壓力,帶來單點問題。此外,入庫操作最后的commit job操作,在Spark Driver端單點做,很容易出現單點的性能問題。
Spark小文件產生的過程
-
數據源本身就是就含大量小文件
-
動態分區插入數據,沒有Shuffle的情況下,輸入端有多少個邏輯分片,對應的
HadoopRDD
就會產生多少個HadoopPartition
,每個Partition對應於Spark作業的Task(個數為M),分區數為N。最好的情況就是(M=N) && (M中的數據也是根據N來預先打散的),那就剛好寫N個文件;最差的情況下,每個Task中都有各個分區的記錄,那文件數最終文件數將達到M * N個。這種情況下是極易產生小文件的。
比如我們拿TPCDS測試集中的store_sales進行舉例, sql如下所示
use tpcds_1t_parquet; INSERT overwrite table store_sales partition ( ss_sold_date_sk ) SELECT ss_sold_time_sk, ss_item_sk, ss_customer_sk, ss_cdemo_sk, ss_hdemo_sk, ss_addr_sk, ss_store_sk, ss_promo_sk, ss_ticket_number, ss_quantity, ss_wholesale_cost, ss_list_price, ss_sales_price, ss_ext_discount_amt, ss_ext_sales_price, ss_ext_wholesale_cost, ss_ext_list_price, ss_ext_tax, ss_coupon_amt, ss_net_paid, ss_net_paid_inc_tax, ss_net_profit, ss_sold_date_sk FROM tpcds_1t_ext.et_store_sales;
首先我們得到其執行計划,如下所示,
== Physical Plan == InsertIntoHiveTable MetastoreRelation tpcds_1t_parquet, store_sales, Map(ss_sold_date_sk -> None), true, false +- HiveTableScan [ss_sold_time_sk#4L, ss_item_sk#5L, ss_customer_sk#6L, ss_cdemo_sk#7L, ss_hdemo_sk#8L, ss_addr_sk#9L, ss_store_sk#10L, ss_promo_sk#11L, ss_ticket_number#12L, ss_quantity#13, ss_wholesale_cost#14, ss_list_price#15, ss_sales_price#16, ss_ext_discount_amt#17, ss_ext_sales_price#18, ss_ext_wholesale_cost#19, ss_ext_list_price#20, ss_ext_tax#21, ss_coupon_amt#22, ss_net_paid#23, ss_net_paid_inc_tax#24, ss_net_profit#25, ss_sold_date_sk#3L], MetastoreRelation tpcds_1t_ext, et_store_sales
store_sales的原生文件包含1616邏輯分片,對應生成1616 個Spark Task,插入動態分區表之后生成1824個數據分區加一個NULL值的分區,每個分區下都有可能生成1616個文件,這種情況下,最終的文件數量極有可能達到2949200。1T的測試集store_sales也就大概300g,這種情況每個文件可能就零點幾M。
- 動態分區插入數據,有Shuffle的情況下,上面的M值就變成了
spark.sql.shuffle.partitions(默認值200)
這個參數值,文件數的算法和范圍和2中基本一致。
比如,為了防止Shuffle階段的數據傾斜我們可以在上面的sql中加上 distribute by rand()
,這樣我們的執行計划就變成了,
InsertIntoHiveTable MetastoreRelation tpcds_1t_parquet, store_sales, Map(ss_sold_date_sk -> None), true, false +- *Project [ss_sold_time_sk#4L, ss_item_sk#5L, ss_customer_sk#6L, ss_cdemo_sk#7L, ss_hdemo_sk#8L, ss_addr_sk#9L, ss_store_sk#10L, ss_promo_sk#11L, ss_ticket_number#12L, ss_quantity#13, ss_wholesale_cost#14, ss_list_price#15, ss_sales_price#16, ss_ext_discount_amt#17, ss_ext_sales_price#18, ss_ext_wholesale_cost#19, ss_ext_list_price#20, ss_ext_tax#21, ss_coupon_amt#22, ss_net_paid#23, ss_net_paid_inc_tax#24, ss_net_profit#25, ss_sold_date_sk#3L] +- Exchange(coordinator id: 1080882047) hashpartitioning(_nondeterministic#49, 2048), coordinator[target post-shuffle partition size: 67108864] +- *Project [ss_sold_time_sk#4L, ss_item_sk#5L, ss_customer_sk#6L, ss_cdemo_sk#7L, ss_hdemo_sk#8L, ss_addr_sk#9L, ss_store_sk#10L, ss_promo_sk#11L, ss_ticket_number#12L, ss_quantity#13, ss_wholesale_cost#14, ss_list_price#15, ss_sales_price#16, ss_ext_discount_amt#17, ss_ext_sales_price#18, ss_ext_wholesale_cost#19, ss_ext_list_price#20, ss_ext_tax#21, ss_coupon_amt#22, ss_net_paid#23, ss_net_paid_inc_tax#24, ss_net_profit#25, ss_sold_date_sk#3L, rand(4184439864130379921) AS _nondeterministic#49] +- HiveTableScan [ss_sold_date_sk#3L, ss_sold_time_sk#4L, ss_item_sk#5L, ss_customer_sk#6L, ss_cdemo_sk#7L, ss_hdemo_sk#8L, ss_addr_sk#9L, ss_store_sk#10L, ss_promo_sk#11L, ss_ticket_number#12L, ss_quantity#13, ss_wholesale_cost#14, ss_list_price#15, ss_sales_price#16, ss_ext_discount_amt#17, ss_ext_sales_price#18, ss_ext_wholesale_cost#19, ss_ext_list_price#20, ss_ext_tax#21, ss_coupon_amt#22, ss_net_paid#23, ss_net_paid_inc_tax#24, ss_net_profit#25], MetastoreRelation tpcds_1t_ext, et_store_sales
這種情況下,這樣我們的文件數妥妥的就是spark.sql.shuffle.partitions * N,因為rand函數一般會把數據打散的非常均勻。當spark.sql.shuffle.partitions設置過大時,小文件問題就產生了;當spark.sql.shuffle.partitions設置過小時,任務的並行度就下降了,性能隨之受到影響。
最理想的情況,當然是根據分區字段進行shuffle,在上面的sql中加上distribute by ss_sold_date_sk
。 把同一分區的記錄都哈希到同一個分區中去,由一個Spark的Task進行寫入,這樣的話只會產生N個文件,在我們的case中store_sales,在1825個分區下各種生成了一個數據文件。
但是這種情況下也容易出現數據傾斜的問題,比如雙11的銷售數據就很容易在這種情況下發生傾斜。

如上圖所示,在我們插入store_sales時,就發生了null值的傾斜,大大的拖慢的數據入庫的時間。
如何解決Spark SQL產生小文件問題
前面已經提到根據分區字段進行分區,除非每個分區下本身的數據較少,分區字段選擇不合理,那么小文件問題基本上就不存在了,但是也有可能由於shuffle引入新的數據傾斜問題。
我們首先可以嘗試是否可以將兩者結合使用, 在之前的sql上加上distribute by ss_sold_date_sk,cast(rand() * 5 as int)
, 這個類似於我們處理數據傾斜問題時候給字段加上后綴的形式。如,
use tpcds_1t_parquet; INSERT overwrite table store_sales partition ( ss_sold_date_sk ) SELECT ss_sold_time_sk, ss_item_sk, ss_customer_sk, ss_cdemo_sk, ss_hdemo_sk, ss_addr_sk, ss_store_sk, ss_promo_sk, ss_ticket_number, ss_quantity, ss_wholesale_cost, ss_list_price, ss_sales_price, ss_ext_discount_amt, ss_ext_sales_price, ss_ext_wholesale_cost, ss_ext_list_price, ss_ext_tax, ss_coupon_amt, ss_net_paid, ss_net_paid_inc_tax, ss_net_profit, ss_sold_date_sk FROM tpcds_1t_ext.et_store_sales distribute by ss_sold_date_sk, cast(rand() * 5 as int);
按照之前的推算,每個分區下將產生5個文件,同時null值傾斜部分的數據也被打散成五份進行計算,緩解了數據傾斜的問題 ,我們最終將得到1825 *5=9105個文件,如下所示
1825 9105 247111074494 /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales
如果我們將5改得更小,文件數也會越少,但相應的傾斜key的計算時間也會上去。
在我們知道那個分區鍵傾斜的情況下,我們也可以將入庫的SQL拆成幾個部分,比如我們store_sales是因為null值傾斜,我們就可以通過where ss_sold_date_sk is not null
和 where ss_sold_date_sk is null
將原始數據分成兩個部分。前者可以基於分區字段進行分區,如distribute by ss_sold_date_sk
;后者可以基於隨機值進行分區,distribute by cast(rand() * 5 as int)
, 這樣可以靜態的將null值部分分成五個文件。
FROM tpcds_1t_ext.et_store_sales
where ss_sold_date_sk is not null
distribute by ss_sold_date_sk;
FROM tpcds_1t_ext.et_store_sales where ss_sold_date_sk is null distribute by distribute by cast(rand() * 5 as int);
對於傾斜部分的數據,我們可以開啟Spark SQL的自適應功能,spark.sql.adaptive.enabled=true
來動態調整每個相當於Spark的reduce端task處理的數據量,這樣我們就不需要人為的感知隨機值的規模了,我們可以直接
FROM tpcds_1t_ext.et_store_sales where ss_sold_date_sk is null distribute by distribute by rand() ;
然后Spark在Shuffle 階段會自動的幫我們將數據盡量的合並成spark.sql.adaptive.shuffle.targetPostShuffleInputSize
(默認64m)的大小,以減少輸出端寫文件線程的總量,最后減少個數。
對於spark.sql.adaptive.shuffle.targetPostShuffleInputSize
參數而言,我們也可以設置成為dfs.block.size
的大小,這樣可以做到和塊對齊,文件大小可以設置的最為合理。
Spark SQL 小文件實驗
在我們的猛獁大數據平台上面,隨便的建立幾個SQL作業,不用會Spark也可以用SQL把大數據玩得666!

從左到右依次為
- 建表 - 按分區字段插入非空集合到分區表 - 按rand插入空集到分區表,並開啟自Spark SQL適應
- 建表 - 不shuffle 按原始分片直接插入分區表
- 建表 - 全集按照分區字段插入到分區表
- 建表 - 全局按分區字段+cast(rand() * 5 as int)方式插入分區表
雙擊每個工作節點,我們也可以對我們的SQL作業進行參數的調整

選中我們對應的實驗組,點擊執行后,可以查看任務的運行狀態。

從各組的實驗結果來看
bin/hadoop fs -count /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/
1825 1863 192985051585 /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales
bin/hadoop fs -du -h /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00000
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00001
183.3 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00002
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00003
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00004
183.2 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00005
183.2 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00006
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00007
183.0 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00008
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00009
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00010
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00011
183.2 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00012
183.0 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00013
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00014
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00015
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00016
183.2 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00017
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00018
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00019
183.2 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00020
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00021
183.0 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00022
183.2 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00023
183.2 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00024
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00025
183.2 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00026
182.9 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00027
183.3 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00028
183.0 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00029
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00030
183.0 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00031
183.2 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00032
183.2 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00033
182.9 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00034
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00035
183.0 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00036
183.3 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00037
183.0 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00038
70.5 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00039
bin/hadoop fs -du -h /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=2452642
194.5 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=2452642/part-00369
實驗組一的小文件控制還是可喜可賀的。對於我們1t的tpcds測試數據,null值分區字段下只有40個文件,其他每個數據分區也只有一個數據文件,總目錄1825,總文件數1863. 在解決數據傾斜問題的基礎上,也只比純按照分區字段進行distibute by多了39個文件。
總結
本文講述的是如何在純寫SQL的場景下,如何用Spark SQL做數據導入時候,控制小文件的數量。
- 對於原始數據進行按照分區字段進行shuffle,可以規避小文件問題。但有可能引入數據傾斜的問題;
- 可以通過distribute by ss_sold_date_sk, cast(rand() * N as int),N值可以在文件數量和傾斜度之間做權衡
- 知道傾斜鍵的情況下,可以將原始數據分成幾個部分處理,不傾斜的按照分區鍵shuffle,傾斜部分可以按照rand函數來shuffle
- 活用Spark SQL自適應功能,目前Spark 的各版本的Release中其實也就兩個參數,設
spark.sql.adaptive.enabled=true
即可開啟該功能,spark.sql.adaptive.shuffle.targetPostShuffleInputSize
設置reduce任務處理文件的上限,配合結論3使用,解決小文件問題事半功倍。 - 對於Spark 2.4的用戶,也可以使用HINT 詳情請看 https://issues.apache.org/jira/browse/SPARK-24940
- 猛獁大數據平台是一站式大數據管理和應用開發平台,具有敏捷易用,成熟穩定,安全可靠,開放靈活的特點,提供7*24小時專業服務。
作者:Kent_Yao
鏈接:https://www.jianshu.com/p/ddd2382a738a
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權並注明出處。