我們知道,spark中每個分片都代表着一部分數據,那么分片數量如何被確認的呢?
首先我們使用最常見的HDFS+Spark,sparkDeploy的方式來討論,spark讀取HDFS數據使用的是sparkcontext.textfile(Path, minPartitions):
1 def textFile( 2 path: String, 3 minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { 4 assertNotStopped() 5 hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 6 minPartitions).map(pair => pair._2.toString) 7 }
在用戶指定minPartitions時,便會使用用戶指定的分片數量來划分,否則使用defaultMinPartitions。那么defaultMinPartitions是怎么來的?
def defaultMinPartitions: Int = math.min(defaultParallelism, 2) ... def defaultParallelism: Int = { assertNotStopped() taskScheduler.defaultParallelism } ... override def defaultParallelism(): Int = backend.defaultParallelism() ... override def defaultParallelism(): Int = { conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) }
可以看到這個參數是通過SparkConf中的spark.default.parallelism指定的。如果兩邊都沒指定,那么分片數就為2。
在內存小,分片數少而數據量較大的情況下,會產生GC error,因為內存占用過大,java的垃圾回收無法完成,所以在出現內存錯誤的時候不妨試試將默認的分片數量加大,或者干脆在textfile中指定。這樣有助於數據的處理完成。