原始RDD或數據集中的每一個分區都映射一個或多個數據文件, 該映射是在文件的一部分或者整個文件上完成的。
Spark Job RDD/datasets在執行管道中,通過根據分區到數據文件的映射讀取數據輸入到RDD/dataset。
如何根據某些參數確定spark的分區數?
使用Dataset APIs讀取數據的分區數:
functions:
https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/DataFrameReader.html
*文件格式 APIs* Dataset<Row> = SparkSession.read.csv(...) Dataset<Row> = SparkSession.read.json(...) Dataset<Row> = SparkSession.read.text(...) Dataset<Row> = SparkSession.read.parquet(...) Dataset<Row> = SparkSession.read.orc(...) *通用格式 API* Dataset<Row> = SparkSession.read.format(String fileformat).load(...)
影響數據分區數的參數:
(a)spark.default.parallelism (default: Total No. of CPU cores)
(b)spark.sql.files.maxPartitionBytes (default: 128 MB) 【讀取文件時打包到單個分區中的最大字節數。】
(c)spark.sql.files.openCostInBytes (default: 4 MB) 【 該參數默認4M,表示小於4M的小文件會合並到一個分區中,用於減小小文件,防止太多單個小文件占一個分區情況。這個參數就是合並小文件的閾值,小於這個閾值的文件將會合並。】
使用這些配置參數值,一個名為maxSplitBytes的最大分割准則被計算如下:
maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore)
bytesPerCore = (文件總大小 + 文件個數 * openCostInBytes)/ default.parallelism
maxSplitBytes:
for each_file in files: if each_file is can split: if each_file.size() > maxSplitBytes: # file 被切分為 block_number 塊其中block_number-1大小為 maxSplitBytes,1塊<=maxSplitBytes block_number = ceil(each_file.size() / maxSplitBytes) else: block_number = 1 else: #文件不可分 block_number = 1
數據文件計算文件塊之后,將一個或多個文件塊打包到一個分區中。
打包過程從初始化一個空分區開始,然后對每個文件塊進行迭代:
1. 如果沒有當前分區要打包,請初始化要打包的新分區,然后將迭代的文件塊分配給該分區。 分區大小成為塊大小與“ openCostInBytes”的額外開銷的總和。
2.如果塊大小的增加不超過當前分區(正在打包)的大小超過' maxSplitBytes ',那么文件塊將成為當前分區的一部分。分區大小是由塊大小和“openCostInBytes”額外開銷的總和增加的。
3.如果塊大小的增加超過了當前分區被打包的大小超過了' maxSplitBytes ',那么當前分區被聲明為完整並啟動一個新分區。迭代的文件塊成為正在初始化的新分區的一部分,而新分區大小成為塊大小和‘openCostInBytes’額外開銷的總和。
打包過程結束后,將獲得用於讀取相應數據文件的數據集的分區數。
盡管獲得分區數量的過程似乎有點復雜,但基本的思想是,如果文件是可分拆的,那么首先在maxSplitBytes邊界處拆分單個文件。
在此之后,將文件的分割塊或不可分割的文件打包到一個分區中,這樣,在將塊打包到一個分區中期間,
如果分區大小超過maxSplitBytes,則認為該分區已經打包完成,然后采用一個新分區進行打包。因此,最終從包裝過程中得到一定數量的分區。
e.g:
core設置為10
(a) 54 parquet files, 65 MB each, 默認參數 。
bytesPerCore = (54*65 + 54 * 4)/ 10 = 372M
maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,372M)=128
65 < 128 && 2*65 > 128 ==> 54分區
(b)54 parquet files, 63 MB each, 默認參數。
bytesPerCore = (54*63 + 54 * 4)/ 10 = 361M
maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,361M)=128
63 < 128 && 4 + 2*63=126+4=130 > 128=maxPartitionBytes ==> 54 (看起來 1分區可以容納2個塊,但是存在一個openCostInBytes開銷4M,2個63+4大於了 128M,故一個分區只能一個塊)
(c)54 parquet files, 40 MB each, 默認參數。
bytesPerCore = (54*40 + 54 * 4)/ 10 = 237M
maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,237M)=128
40 < 128 && (4+3* 40) = 124 < 128 (故一個分區可以裝3個塊) = 54/3 = 18分區
(d)54 parquet files, 40 MB each, maxPartitionBytes=88M 其余默認
bytesPerCore = (54*40 + 54 * 4)/ 10 = 237M
maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(88M,237M)=88
40 < 88 && (4+2*40) = 84 < 88 (一個分區2個) = 27個分區
(e) 54 parquet files, 40 MB each ; spark.default.parallelism set to 400
bytesPerCore = (54*40 + 54 * 4)/ 400 = 5.94M maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,5.94M)=5.94 每個文件塊數:ceil(40 / 5.94) = 7個 5.94 + 4M > 5.94 一個分區一個塊 所以總分區數為: 54 * 7 = 378 個分區
使用RDD APIs讀取數據文件的分區數
RDD APIs類似下面的API
*SparkContext.newAPIHadoopFile(String path, Class<F> fClass, Class<K> kClass, Class<V> vClass, org.apache.hadoop.conf.Configuration conf) *SparkContext.textFile(String path, int minPartitions) *SparkContext.sequenceFile(String path, Class<K> keyClass, Class<V> valueClass) *SparkContext.sequenceFile(String path, Class<K> keyClass, Class<V> valueClass, int minPartitions) *SparkContext.objectFile(String path, int minPartitions, scala.reflect.ClassTag<T> evidence$4)
在這些API中,會詢問參數' minPartitions ',而在另一些API中則沒有。如果沒有查詢,則默認值為2或1,1(默認情況下為1)。並行性是1。這個“minPartitions”是決定這些api返回的RDD中分區數量的因素之一。其他因素為Hadoop配置參數的值:
# 關於 mapred.min.split.size see https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
# 關於 dfs.blocksize see https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml 默認: 128M
minSize (mapred.min.split.size - default value 1 MB) or minSize (mapreduce.input.fileinputformat.split.minsize - default value 1 MB)
blockSize (dfs.blocksize - default 128 MB)
goalSize = Sum of all files lengths to be read / minPartitions
splitSize = Math.max(minSize, Math.min(goalSize, blockSize));
現在使用“splitSize”,
for each_file in files: if each_file: if each_file.size() > splitSize: # file 被切分為 block_number 塊其中block_number-1大小為 splitSize,最后一個塊<=splitSize block_number = ceil(each_file.size() / maxSplitBytes) else: # 大小等於文件長度的文件塊 block_number = 1 block_size = 文件長度的文件塊 else: #文件不可分 block_number = 1 block_size = 文件長度的文件塊
每個文件塊(大小大於0)都映射到單個分區。因此,由數據文件上的RDD api返回的RDD中的分區數,等於使用“splitSize”對數據文件進行切片而得到的非零文件塊的數
e.g:
(a). 31 parquet files, 330 MB each, blocksize at default 128 MB, minPartitions not specified, core is 10
splitSize = Math.max(minSize, Math.min(Sum of all files lengths to be read / minPartitions, blockSize)) = max( 0 , 128M) = 128M
一個文件按照splitSize=128M可以分3個,故一共分區數 31*3=93
(b). 54 parquet files, 40 MB each, blocksize at default 128 MB, core is 10
splitSize = Math.max(minSize, Math.min(Sum of all files lengths to be read / minPartitions, blockSize)) = max( 0 , min(2264924160/1,128M)) = 128M
splitSize=128M ,40 <128 1個文件長度的文件塊 故為54個分區
(c) 31 parquet files, 330 MB each, blocksize at default 128 MB, minPartitions specified as 1000
splitSize = Math.max(minSize, Math.min(Sum of all files lengths to be read / minPartitions, blockSize)) = max( 0 , min(31 * 330 * 1024 * 1024/1000 ,128 * 1024 * 1024)) = 10726932 = 10.23M
一個文件分為 ceil(330/10.23) = 33塊 共計:31 * 33 = 1023 共計分區: 1023個
(d) 31 parquet files, 330 MB each, blocksize at default 128 MB, minPartitions not specified, ‘mapred.min.split.size’ set at 256 MB, No. of core equal to 10
splitSize = Math.max(minSize, Math.min(Sum of all files lengths to be read / minPartitions, blockSize)) = max( 256 , min(31 * 330 * 1024 * 1024/1 ,128 * 1024 * 1024)) =256M 330/256.0 = 2 , 31 * 2 = 62個分區
總結:
分區的最佳數量是高效可靠的Spark應用程序的關鍵。