spark 解決大文件造成的分區數據量過大的問題


背景

在使用spark處理文件時,經常會遇到要處理的文件大小差別的很大的情況。如果不加以處理的話,特別大的文件就可能產出特別大的spark 分區,造成分區數據傾斜,嚴重影響處理效率。

解決方案

Spark RDD

spark在讀取文件構建RDD的時候(調用spark.SparkContext.TextFile(FILENAME, [minPartition]), spark.SparkContext.SequenceFile(FILENAME) ,因為這兩個都實現了FileInputFormat),每個RDD分區的大小是由下面的幾個參數控制的。

spark.hadoop.mapreduce.input.fileinputformat.split.minsize #(單位字節,默認值:0)
dfs.blocksize #(單位字節, 默認值: 128M,  在hdfs-site.xml中配置,這個會影響到hadoop,非常不建議修改)
minPartition #(最小分區數,默認值2)

spark在調用這兩個方法讀取文件為RDD的時候,會經歷如下步驟

  1. 計算要讀取的所有文件的總大小 TOTAL_SIZE
  2. 計算平均每個文件的大小 AVERAGE_SIZE = TOTAL_SIZE/minPartition
  3. 獲取文件所在HDFS上的BLOCK_SIZE (即:dfs.blocksize)
  4. 讀取spark.hadoop.mapreduce.input.fileinputformat.split.minsize,獲取文件的最小值 MIN_SIZE
  5. 計算要產出RDD的分區大小 PARTITION_SIZE = max(MIN_SIZE, min(AVERAGE_SIZE, BLOCK_SIZE))

由上可知,如果調用TextFile時不設置minPartition,且不設置split.minsize,那么產出的RDD每個分區最大大小為 BLOCK_SIZE。

如果希望產出分區的大小小於BLOCK_SIZE,就需要設置minPartition為非常大一個值,使得AVERAGE_SIZE變小,然后通過split.minsize來控制產出的分區大小。

備注:這兩種方法對於大文件可以切分成小文件,但是對於輸入的小文件,即使小於split.minsize也不會合並。不過相比大文件,小文件對spark性能沒有太大影響。

參考資料

  1. org.apache.hadoop.mapred.FileInputFormat.java
  2. SparkContext.scala
Spark SQL

適用於使用spark.sql讀取文件/hive的場景

spark.sql.files.maxPartitionBytes  #單位字節  默認128M   每個分區最大的文件大小,針對於大文件切分
spark.sql.files.openCostInBytes   #單位字節  默認值4M   小於該值的文件將會被合並,針對於小文件合並


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM