1、前置知識:
(1)sc.defaultMinPartitions
sc.defaultMinPartitions=min(sc.defaultParallelism,2)
也就是sc.defaultMinPartitions只有兩個值1和2,當sc.defaultParallelism>1時值為2,當sc.defaultParallelism=1時,值為1
上面的公式是在源碼里定義的(均在類SparkContext里):
def defaultMinPartitions: Int = math.min(defaultParallelism, 2) def defaultParallelism: Int = { assertNotStopped() taskScheduler.defaultParallelism }
(2)sc.defaultParallelism
a、首先可通過spark.default.parallelism設置sc.defaultParallelism的值
- 在文件中配置
在文件spark-defaults.conf添加一行
spark.default.parallelism=20
驗證:
在spark-shell里輸入sc.defaultParallelism,輸出結果為20。
- 在代碼中配置
val spark = SparkSession.builder() .appName("TestPartitionNums") .master("local") .config("spark.default.parallelism", 20) .getOrCreate() val sc = spark.sparkContext println(sc.defaultParallelism) spark.stop
b、sc.defaultParallelism 沒有配置時候,會有默認值
- spark-shell:spark-shell里的值等於cpu的核數,比如我的windows的cpu的核數為
再比如測試機的核數為8。
- 指定master為local:在spark-shell里通過–master local和在代碼里通過.master(“local”)的結果是一樣的,這里以spark-shell為例當master為local時,值為1,當master為local[n]時,值為n
- master為local[*]和不指定master一樣,都為cpu核數
- master為yarn模式時為分配的所有的Executor的cpu核數的總和或者2,兩者取最大值,將2.1.2的代碼的master注釋掉並打包,然后用下面的腳本執行測試。
test.sh
spark-submit --num-executors $1 --executor-cores 1 --executor-memory 640M --master yarn --class com.dkl.leanring.spark.TestPartitionNums spark-scala_2.11-1.0.jar
2、HDFS文件創建RDD時的默認分區數:
這里及后面討論的是rdd和dataframe的分區,也就是讀取hdfs文件並不會改變前面講的sc.defaultParallelism和sc.defaultMinPartitions的值。
(1)sc.textFile(): rdd的分區數 = max(hdfs文件的block數目, sc.defaultMinPartitions)
- 測試大文件(block的數量大於2):上傳了一個1.52G的txt到hdfs上用來測試,其中每個block的大小為默認的128M,那么該文件有13個分區* 1.52*1024/128=12.16。
用下面代碼可以測試讀取hdfs文件的分區數
val rdd = sc.textFile("hdfs://ambari.master.com/data/egaosu/txt/20180416.txt")
rdd.rdd.getNumPartitions
這種方式無論是sc.defaultParallelism大於block數還是sc.defaultParallelism小於block數,rdd的默認分區數都為block數。
* 注:之所以說是默認分區,因為textFile可以指定分區數,sc.textFile(path, minPartitions),通過第二個參數可以指定分區數
sc.defaultMinPartitions大於block數
sc.defaultMinPartitions小於block數
當用參數指定分區數時,有兩種情況,當參數大於block數時,則rdd的分區數為指定的參數值,否則分區數為block數。
- 測試小文件(block數量等於1):默認分區數為sc.defaultMinPartitions,下面是對應的hdfs文件:
將上面的hdfs路徑改為:hdfs://ambari.master.com/tmp/dkl/data.txt,結果:

當用參數指定分區數時,rdd的分區數大於等於參數值,本次測試為等於參數值或參數值+1。
(2)讀取hive表創建的DataFrame的分區數:
hdfs文件的block的數目為10(2*5)。
//切換數據庫 spark.sql("use route_analysis") //讀取該數據庫下的egaosu表為df val df = spark.table("egaosu") //打印df對應的rdd的分區數 df.rdd.getNumPartitions
測試發現,當sc.defaultParallelism大於block時,df的分區是等於sc.defaultParallelism,當小於block時,df的分區數介於sc.defaultParallelism和block之間,至於詳細的分配策略,我還沒有查到~
用spark.sql(“select * from egaosu”)這種方式得到df和上面的分區數是一樣的
3、從代碼里的內部數據集創建RDD時的默認分區數:
(1)sc.parallelize()創建RDD:默認分區數等於sc.defaultParallelism,指定參數時分區數值等於參數值。
(2)spark.createDataFrame(data)創建DataFrame:當data的長度小於sc.defaultParallelism,分區數等於data長度,否則分區數等於sc.defaultParallelism。
