spark數據分區數量的原理


 

 

原始RDD或數據集中的每一個分區都映射一個或多個數據文件, 該映射是在文件的一部分或者整個文件上完成的。

Spark Job RDD/datasets在執行管道中,通過根據分區到數據文件的映射讀取數據輸入到RDD/dataset。

如何根據某些參數確定spark的分區數?

 


 

使用Dataset APIs讀取數據的分區數:

functions:

https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/DataFrameReader.html

*文件格式 APIs*
Dataset<Row> = SparkSession.read.csv(...)
Dataset<Row> = SparkSession.read.json(...)
Dataset<Row> = SparkSession.read.text(...)
Dataset<Row> = SparkSession.read.parquet(...)
Dataset<Row> = SparkSession.read.orc(...)

*通用格式 API*
Dataset<Row> = SparkSession.read.format(String fileformat).load(...)

 

影響數據分區數的參數:

(a)spark.default.parallelism (default: Total No. of CPU cores)
(b)spark.sql.files.maxPartitionBytes (default: 128 MB) 【讀取文件時打包到單個分區中的最大字節數。】
(c)spark.sql.files.openCostInBytes (default: 4 MB)  【 該參數默認4M,表示小於4M的小文件會合並到一個分區中,用於減小小文件,防止太多單個小文件占一個分區情況。這個參數就是合並小文件的閾值,小於這個閾值的文件將會合並。】

 

使用這些配置參數值,一個名為maxSplitBytes的最大分割准則被計算如下:

 

maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore)

 

bytesPerCore = (文件總大小 + 文件個數 * openCostInBytes)/ default.parallelism

 

maxSplitBytes: 

for each_file in files:
    if each_file is can split:
        if each_file.size() > maxSplitBytes:
            # file 被切分為 block_number 塊其中block_number-1大小為 maxSplitBytes,1塊<=maxSplitBytes
            block_number = ceil(each_file.size() / maxSplitBytes)
        else:
            block_number = 1
    else:
        #文件不可分
        block_number = 1

 

數據文件計算文件塊之后,將一個或多個文件塊打包到一個分區中。

打包過程從初始化一個空分區開始,然后對每個文件塊進行迭代:

1. 如果沒有當前分區要打包,請初始化要打包的新分區,然后將迭代的文件塊分配給該分區。 分區大小成為塊大小與“ openCostInBytes”的額外開銷的總和。

2.如果塊大小的增加不超過當前分區(正在打包)的大小超過' maxSplitBytes ',那么文件塊將成為當前分區的一部分。分區大小是由塊大小和“openCostInBytes”額外開銷的總和增加的。

3.如果塊大小的增加超過了當前分區被打包的大小超過了' maxSplitBytes ',那么當前分區被聲明為完整並啟動一個新分區。迭代的文件塊成為正在初始化的新分區的一部分,而新分區大小成為塊大小和‘openCostInBytes’額外開銷的總和。

打包過程結束后,將獲得用於讀取相應數據文件的數據集的分區數。

 

 

 

 

 

盡管獲得分區數量的過程似乎有點復雜,但基本的思想是,如果文件是可分拆的,那么首先在maxSplitBytes邊界處拆分單個文件。

在此之后,將文件的分割塊或不可分割的文件打包到一個分區中,這樣,在將塊打包到一個分區中期間,

如果分區大小超過maxSplitBytes,則認為該分區已經打包完成,然后采用一個新分區進行打包。因此,最終從包裝過程中得到一定數量的分區。

e.g:

core設置為10

(a) 54 parquet files, 65 MB each, 默認參數 。  

bytesPerCore = (54*65 + 54 * 4)/ 10 = 372M

maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,372M)=128

65 < 128 && 2*65 > 128 ==> 54分區

 

(b)54 parquet files, 63 MB each, 默認參數。

bytesPerCore = (54*63 + 54 * 4)/ 10 = 361M

maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,361M)=128

63 < 128 &&   4 + 2*63=126+4=130 > 128=maxPartitionBytes  ==> 54 (看起來 1分區可以容納2個塊,但是存在一個openCostInBytes開銷4M,2個63+4大於了 128M,故一個分區只能一個塊)

 

(c)54 parquet files, 40 MB each, 默認參數。

bytesPerCore = (54*40 + 54 * 4)/ 10 = 237M

maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,237M)=128

40 < 128 && (4+3*  40) = 124 < 128 (故一個分區可以裝3個塊) = 54/3 = 18分區

 

(d)54 parquet files, 40 MB each, maxPartitionBytes=88M 其余默認

bytesPerCore = (54*40 + 54 * 4)/ 10 = 237M

maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(88M,237M)=88

40 < 88 && (4+2*40) = 84 < 88 (一個分區2個) = 27個分區

 

(e) 54 parquet files, 40 MB each ; spark.default.parallelism set to 400

bytesPerCore = (54*40 + 54 * 4)/ 400 = 5.94M

maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,5.94M)=5.94

每個文件塊數:ceil(40 / 5.94) = 7個   5.94 + 4M  > 5.94  一個分區一個塊

所以總分區數為:  54 * 7 = 378 個分區 

 

 

 


 使用RDD APIs讀取數據文件的分區數

 

RDD APIs類似下面的API

*SparkContext.newAPIHadoopFile(String path, Class<F> fClass, Class<K> kClass, Class<V> vClass, org.apache.hadoop.conf.Configuration conf)
*SparkContext.textFile(String path, int minPartitions)
*SparkContext.sequenceFile(String path, Class<K> keyClass, Class<V> valueClass)
*SparkContext.sequenceFile(String path, Class<K> keyClass, Class<V> valueClass, int minPartitions)
*SparkContext.objectFile(String path, int minPartitions, scala.reflect.ClassTag<T> evidence$4)

在這些API中,會詢問參數' minPartitions ',而在另一些API中則沒有。如果沒有查詢,則默認值為2或1,1(默認情況下為1)。並行性是1。這個“minPartitions”是決定這些api返回的RDD中分區數量的因素之一。其他因素為Hadoop配置參數的值:

# 關於 mapred.min.split.size see  https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/DeprecatedProperties.html

# 關於 dfs.blocksize  see https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml 默認: 128M

minSize (mapred.min.split.size - default value 1 MB) or  minSize (mapreduce.input.fileinputformat.split.minsize - default value 1 MB) 

blockSize (dfs.blocksize - default 128 MB)

 

goalSize = Sum of all files lengths to be read / minPartitions

splitSize = Math.max(minSize, Math.min(goalSize, blockSize));
 

現在使用“splitSize”,

for each_file in files:
    if each_file:
        if each_file.size() > splitSize:
            # file 被切分為 block_number 塊其中block_number-1大小為 splitSize,最后一個塊<=splitSize
            block_number = ceil(each_file.size() / maxSplitBytes)
        else:
            # 大小等於文件長度的文件塊
            block_number = 1
            block_size  = 文件長度的文件塊
    else:
        #文件不可分
        block_number = 1
        block_size = 文件長度的文件塊

每個文件塊(大小大於0)都映射到單個分區。因此,由數據文件上的RDD api返回的RDD中的分區數,等於使用“splitSize”對數據文件進行切片而得到的非零文件塊的數

 

 

 

e.g:

(a). 31 parquet files, 330 MB each, blocksize at default 128 MB, minPartitions not specified, core is 10

splitSize = Math.max(minSize, Math.min(Sum of all files lengths to be read / minPartitions, blockSize))  = max( 0 , 128M) = 128M 

一個文件按照splitSize=128M可以分3個,故一共分區數 31*3=93

 

(b). 54 parquet files, 40 MB each, blocksize at default 128 MB,  core is 10

splitSize = Math.max(minSize, Math.min(Sum of all files lengths to be read / minPartitions, blockSize))  = max( 0 , min(2264924160/1,128M)) = 128M 

splitSize=128M ,40 <128 1個文件長度的文件塊 故為54個分區

 

(c) 31 parquet files, 330 MB each, blocksize at default 128 MB, minPartitions specified as 1000

splitSize = Math.max(minSize, Math.min(Sum of all files lengths to be read / minPartitions, blockSize))  = max( 0 , min(31 * 330 * 1024 * 1024/1000 ,128 * 1024 * 1024)) = 10726932 = 10.23M

 一個文件分為 ceil(330/10.23) = 33塊  共計:31 * 33 = 1023 共計分區: 1023個

 

(d)  31 parquet files, 330 MB each, blocksize at default 128 MB, minPartitions not specified, ‘mapred.min.split.size’ set at 256 MB, No. of core equal to 10

splitSize = Math.max(minSize, Math.min(Sum of all files lengths to be read / minPartitions, blockSize))  = max( 256 , min(31 * 330 * 1024 * 1024/1 ,128 * 1024 * 1024)) =256M

330/256.0 = 2 , 31 * 2 = 62個分區

 

 


 

總結:

     分區的最佳數量是高效可靠的Spark應用程序的關鍵。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM