Spark 創建RDD、DataFrame各種情況的默認分區數


1、前置知識:

(1)sc.defaultMinPartitions

  sc.defaultMinPartitions=min(sc.defaultParallelism,2)

  也就是sc.defaultMinPartitions只有兩個值1和2,當sc.defaultParallelism>1時值為2,當sc.defaultParallelism=1時,值為1

  上面的公式是在源碼里定義的(均在類SparkContext里):

def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
def defaultParallelism: Int = {
    assertNotStopped()
    taskScheduler.defaultParallelism
  }

(2)sc.defaultParallelism

a、首先可通過spark.default.parallelism設置sc.defaultParallelism的值

  •  在文件中配置

在文件spark-defaults.conf添加一行

spark.default.parallelism=20

驗證:

在spark-shell里輸入sc.defaultParallelism,輸出結果為20。

  • 在代碼中配置
val spark = SparkSession.builder()
  .appName("TestPartitionNums")
  .master("local")
  .config("spark.default.parallelism", 20)
  .getOrCreate()

val sc = spark.sparkContext
println(sc.defaultParallelism)

spark.stop

 

b、sc.defaultParallelism 沒有配置時候,會有默認值

  • spark-shell:spark-shell里的值等於cpu的核數,比如我的windows的cpu的核數為再比如測試機的核數為8。
  • 指定master為local:在spark-shell里通過–master local和在代碼里通過.master(“local”)的結果是一樣的,這里以spark-shell為例當master為local時,值為1,當master為local[n]時,值為n
  • master為local[*]和不指定master一樣,都為cpu核數
  • master為yarn模式時為分配的所有的Executor的cpu核數的總和或者2,兩者取最大值,將2.1.2的代碼的master注釋掉並打包,然后用下面的腳本執行測試。

test.sh

spark-submit --num-executors $1 --executor-cores 1 --executor-memory 640M --master yarn   --class  com.dkl.leanring.spark.TestPartitionNums   spark-scala_2.11-1.0.jar

 

2、HDFS文件創建RDD時的默認分區數:

  這里及后面討論的是rdd和dataframe的分區,也就是讀取hdfs文件並不會改變前面講的sc.defaultParallelism和sc.defaultMinPartitions的值。

(1)sc.textFile():  rdd的分區數 = max(hdfs文件的block數目, sc.defaultMinPartitions)

  • 測試大文件(block的數量大於2):上傳了一個1.52G的txt到hdfs上用來測試,其中每個block的大小為默認的128M,那么該文件有13個分區* 1.52*1024/128=12.16。

用下面代碼可以測試讀取hdfs文件的分區數

val rdd = sc.textFile("hdfs://ambari.master.com/data/egaosu/txt/20180416.txt")
rdd.rdd.getNumPartitions

  這種方式無論是sc.defaultParallelism大於block數還是sc.defaultParallelism小於block數,rdd的默認分區數都為block數。

* 注:之所以說是默認分區,因為textFile可以指定分區數,sc.textFile(path, minPartitions),通過第二個參數可以指定分區數

sc.defaultMinPartitions大於block數

sc.defaultMinPartitions小於block數

當用參數指定分區數時,有兩種情況,當參數大於block數時,則rdd的分區數為指定的參數值,否則分區數為block數。

  • 測試小文件(block數量等於1):默認分區數為sc.defaultMinPartitions,下面是對應的hdfs文件:

將上面的hdfs路徑改為:hdfs://ambari.master.com/tmp/dkl/data.txt,結果:

當用參數指定分區數時,rdd的分區數大於等於參數值,本次測試為等於參數值或參數值+1。

 

(2)讀取hive表創建的DataFrame的分區數:

  hdfs文件的block的數目為10(2*5)。

//切換數據庫
spark.sql("use route_analysis")
//讀取該數據庫下的egaosu表為df
val df = spark.table("egaosu")
//打印df對應的rdd的分區數
df.rdd.getNumPartitions

  測試發現,當sc.defaultParallelism大於block時,df的分區是等於sc.defaultParallelism,當小於block時,df的分區數介於sc.defaultParallelism和block之間,至於詳細的分配策略,我還沒有查到~

  用spark.sql(“select * from egaosu”)這種方式得到df和上面的分區數是一樣的

3、從代碼里的內部數據集創建RDD時的默認分區數:

(1)sc.parallelize()創建RDD:默認分區數等於sc.defaultParallelism,指定參數時分區數值等於參數值。

(2)spark.createDataFrame(data)創建DataFrame:當data的長度小於sc.defaultParallelism,分區數等於data長度,否則分區數等於sc.defaultParallelism。

 

 

參考博客:https://blog.csdn.net/dkl12/article/details/81663018


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM