Spark SQL 小文件問題處理


在生產中,無論是通過SQL語句或者Scala/Java等代碼的方式使用Spark SQL處理數據,在Spark SQL寫數據時,往往會遇到生成的小文件過多的問題,而管理這些大量的小文件,是一件非常頭疼的事情。

大量的小文件會影響Hadoop集群管理或者Spark在處理數據時的穩定性:

1. Spark SQL寫Hive或者直接寫入HDFS,過多的小文件會對NameNode內存管理等產生巨大的壓力,會影響整個集群的穩定運行

2. 容易導致task數過多,如果超過參數spark.driver.maxResultSize的配置(默認1g),會拋出類似如下的異常,影響任務的處理

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 478 tasks (2026.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

 

當然可以通過調大spark.driver.maxResultSize的默認配置來解決問題,但如果不能從源頭上解決小文件問題,以后還可能遇到類似的問題。

此外,Spark在處理任務時,一個分區分配一個task進行處理,多個分區並行處理,雖然並行處理能夠提高處理效率,但不是意味着task數越多越好。如果數據量不大,過多的task運行反而會影響效率。

下面通過一個例子,Spark SQL寫數據時,導致產生分區數"劇增"的典型場景,通過分區數"劇增",以及Spark中task數和分區數的關系等,來倒推小文件過多的可能原因(這里的分區數是指生成的DataSet/RDD的分區數,不是Hive分區表的分區概念):

1. 現象

1) 對表test_tab進行寫入操作
2) t1的分區數是100,t2的分區數是200,union all后生成的tmp分區數是300
3) test_tab產生的小文件數基本也在300左右
select * from t1 union all select * from t2 as tmp;insert overwrite table test_tab select * from tmp;

 

2. 分析

1)執行上述insert操作時的分區並行度,主要受tmp的分區數(對應一個DataSet)影響,

2)tmp的分區數主要受t1、t2以及union all的影響

3)暫且不考慮t1或t2是物理表還是經過其他處理生成的臨時表,它們的分區數是確定的,這里主要看經過union all處理后,生成的tmp的分區數和t1、t2的分區數有何關系?

4)Spark SQL語句中的union all對應到DataSet中即為unionAll算子,底層調用union算子

在之前的文章《重要|Spark分區並行度決定機制》中已經對Spark RDD中的union算子對union產生的新的RDD的分區數是如何受被union的多個RDD的影響的,做過詳細介紹,這里直接給出結論:

同樣的這種機制也可以套用到Spark SQL中的DataSet上,那么就很好解釋了tmp的分區數為什么等於t1和t2的分區數的和。

最后,Spark中一個task處理一個分區從而也會影響最終生成的文件數。

當然上述只是以Spark SQL中的一個場景闡述了小文件產生過多的原因之一(分區數過多)。在數倉建設中,產生小文件過多的原因有很多種,比如:

1. 流式處理中,每個批次的處理執行保存操作也會產生很多小文件

2. 為了解決數據更新問題,同一份數據保存了不同的幾個狀態,也容易導致文件數過多

那么如何解決這種小文件的問題呢?

1. 通過repartition或coalesce算子控制最后的DataSet的分區數

注意repartition和coalesce的區別,具體可以參考文章《重要|Spark分區並行度決定機制》

2. 將Hive風格的Coalesce and Repartition Hint 應用到Spark SQL需要注意這種方式對Spark的版本有要求,建議在Spark2.4.X及以上版本使用,示例:

INSERT ... SELECT /*+ COALESCE(numPartitions) */ ...
INSERT ... SELECT /*+ REPARTITION(numPartitions) */ ...

 

3. 小文件定期合並

可以定時通過異步的方式針對Hive分區表的每一個分區中的小文件進行合並操作

上述只是給出3種常見的解決辦法,並且要結合實際用到的技術和場景去具體處理,比如對於HDFS小文件過多,也可以通過生成HAR 文件或者Sequence File來解決。

推薦文章:

Spark SQL | 目前Spark社區最活躍的組件之一

Spark存儲Parquet數據到Hive,對map、array、struct字段類型的處理Spark SQL解析查詢parquet格式Hive表獲取分區字段和查詢條件

關於HDFS應知應會的幾個問題Spark RDD詳解

Spark和Spring整合處理離線數據Spark流式狀態管理


關注微信公眾號:大數據學習與分享,獲取更對技術干貨

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM