spark分片個數的確定及Spark內存錯誤(GC error)的迂回解決方式


我們知道,spark中每個分片都代表着一部分數據,那么分片數量如何被確認的呢?

首先我們使用最常見的HDFS+Spark,sparkDeploy的方式來討論,spark讀取HDFS數據使用的是sparkcontext.textfile(Path, minPartitions):

1   def textFile(
2       path: String,
3       minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
4     assertNotStopped()
5     hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
6       minPartitions).map(pair => pair._2.toString)
7   }

在用戶指定minPartitions時,便會使用用戶指定的分片數量來划分,否則使用defaultMinPartitions。那么defaultMinPartitions是怎么來的?

  def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

...

  def defaultParallelism: Int = {
    assertNotStopped()
    taskScheduler.defaultParallelism
  }

...

  override def defaultParallelism(): Int = backend.defaultParallelism()

...

  override def defaultParallelism(): Int = {
    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
  }

可以看到這個參數是通過SparkConf中的spark.default.parallelism指定的。如果兩邊都沒指定,那么分片數就為2。

在內存小,分片數少而數據量較大的情況下,會產生GC error,因為內存占用過大,java的垃圾回收無法完成,所以在出現內存錯誤的時候不妨試試將默認的分片數量加大,或者干脆在textfile中指定。這樣有助於數據的處理完成。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM