Spark開發-Spark內存溢出原因以及解決方式


Dpark內存溢出

 Spark內存溢出
   堆內內存溢出
   堆外內存溢出

堆內內存溢出

java.lang.OutOfMemoryError: GC overhead limit execeeded
java.lang.OutOfMemoryError: Java heap space
 具體說明
  Heap size  JVM堆的設置是指java程序運行過程中JVM可以調配使用的內存空間的設置.
  JVM在啟動的時候會自動設置Heap size的值,
  Heap size 的大小是Young Generation 和Tenured Generaion 之和。
  提示:在JVM中如果98%的時間是用於GC且可用的Heap size 不足2%的時候將拋出此異常信息
  Driver heap OOM的三大原因:
   (1).用戶在Driver端口生成大對象, 比如創建了一個大的集合數據結構
(2).從Executor端收集數據回Driver端
 Executor heap OOM

堆外內存溢出

報錯情況

Container killed by YARN for exceeding memory limits. 1*.4 GB of 1* GB physical memory used. 
 Consider boosting spark.yarn.executor.memoryOverhead.

基本內容介紹:

1.executor 和 container
  01.Spark中的 executor 進程是跑在 container 中,所以container的最大內存會直接影響到executor的最大可用內存
  02. yarn.nodemanager.pmem-check-enabled 該參數默認是true,也就是會由它來控制監控container的內存使用
  03. yarn.scheduler.maximum-allocation-mb 設置值6114,也就是說單個container申請的最大內存是這么多,
	   執行任務的時候你的executer需要的內存超出這個值,那么就會被殺掉
	    container超過了內存的限制從而被kill掉
   04.executor執行的時候,用的內存可能會超過 executor-memory
        所以會為executor額外預留一部分內存。spark.yarn.executor.memoryOverhead 代表了這部分內存
		即實際的內存
		  val executorMem = args.executorMemory + executorMemoryOverhead
	05.memoryOverhead
    如果沒有設置 spark.yarn.executor.memoryOverhead ,則這部分的內存大小為
        math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
	   其中 MEMORY_OVERHEAD_FACTOR 默認為0.1, MEMORY_OVERHEAD_MIN 默認為384m executorMemory 為設置的 executor-memory
	    實際 executorMem= X+max(X*0.1,384)
		設置了的話 
		   executorMem=X +spark.yarn.executor.memoryOverhead  其中 X 是值 args.executorMemory
	06. executorMem 需要滿足的條件: executorMem< yarn.scheduler.maximum-allocation-mb 	

2.Yarn 中 contaimer 和 Spark中 partition 之間的關系   
   job會被切分成stages,每個stage切分成task,每個task單獨調度,可以把executor的jvm進程看做task執行池
   spark.executor.memory  每個executor使用的內存
    一個executor可以並行執行多個task,實際上一個executor是一個進程,task是executor里的一個線程。
    一個task至少要獨占executor里的一個虛擬核心vcore, 一個task要占用幾個核心,可以由.config("spark.task.cpus", 1)配置,默認是1即一個task占用一個vcore
    同時並行執行的task最大數量 = executor數目 * (每個executor核數 / 每個task占用核心數)
	總核數= executor-cores * num-executor
	 例如: 每個 executor具有3個 cores 理論上每個executor可以處理1-4個task
3.分區與Task的情況
     讀取階段
         01.從內存中創建 RDD:sc.parallelize(...),那么默認的分區數量為該程序所分配的資源的 CPU 數量。
         02.如果是讀取hdfs的文件,
             一般來說,partition的數量等於文件的數量。
             如果單個文件的大小大於hdfs的分塊大小,partition的數量就等於 “文件大小/分塊大小”。
             同時,也可以使用rdd的repartition方法重新划分partition。
     運算階段
        經過不同的算子計算后,分區數目又會變化
        Task 的數量是由 Partition 決定的
	在Spark中有兩類task,一類是shuffleMapTask,一類是resultTask,
	     第一類task的輸出是shuffle所需數據,
		 第二類task的輸出是result,

可能的原因:

1、數據出現了傾斜等原因導致其中一個 contaimer 內存負載太大 運行失敗
2.	Spark的shuffle部分使用了netty框架進行網絡傳輸,但netty會申請堆外內存緩存 Shuffle時,
    每個Reduce都需要獲取每個map對應的輸出,
    當一個reduce需要獲取的一個map數據比較大 超出配置的限制就報了這個錯。
	  通過spark.sql.adaptive.shuffle.targetPostShuffleInputSize 可設置每個 Reducer 讀取的目標數據量,其單位是字節,默認值為 64 MB。

解決內存overhead的問題的方法是:

 1.將"spark.executor.memory" 從8g設置為12g。將內存調大
 2.將"spark.executor.cores"  從8設置為4。   將core的個數調小。
 3.將rdd/dateframe進行重新分區 。           重新分區(repartition)
 4.將"spark.yarn.executor.memoryOverhead"設置為最大值,可以考慮一下4096。這個數值一般都是2的次冪。

具體參數配置

set spark.sql.adaptive.repartition.enabled=true;
set spark.sql.shuffle.partitions=2000;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=67108864;

數據傾斜

數據傾斜解決
1. 熱點數據
  01.熱點數據已知
    過濾: 過濾無關數據以及影響可接受范圍內的熱點數據: NULL值以及空字符串等值
    分離:
  02.熱點數據未知:
     打散: 添加隨機字段前綴等方式 隨機key實現雙重聚合
 2.執行方式
    數據源數據文件不均勻
    Shuffle過程中數據分布不均
       緩解 - 處理shuffle的情況:
         join中有數據傾斜: 
              一大一小:    將reduce join轉換為map join
              兩個數據都比較大: 隨機數以及擴容表進行join

專有名詞解釋

1.常用配置
   配置任務可用executor數量
   每個Executor占用內存
   每個Executor的core數目  spark.executor.cores
  
  The maximum memory size of container to running driver 
    is determined  by 
  the sum of 
      spark.driver.memoryOverhead 
      spark.driver.memory.

  The maximum memory size of container to running executor
   is determined by 
  the sum of 
      spark.executor.memory, 
      spark.executor.memoryOverhead, 
      spark.memory.offHeap.size 
	  spark.executor.pyspark.memory.
 Shuffle Behavior
 Memory Management
    spark.memory.fraction
	 在Spark中,執行和存儲共享一個統一的區域M
	   代表整體JVM堆內存中M的百分比(默認0.6)。
	    剩余的空間(40%)是為用戶數據結構、Spark內部metadata預留的,並在稀疏使用和異常大記錄的情況下避免OOM錯誤
	spark.memory.storageFraction

Note: Non-heap memory includes off-heap memory (when spark.memory.offHeap.enabled=true)
   and memory used by other driver processes (e.g. python process that goes with a PySpark driver) 
   and memory used by other non-driver processes running in the same container

spark.executor.memoryOverhead
    This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.

spark.memory.offHeap.size
spark.memory.offHeap.enabled

源碼

package org.apache.spark.deploy.yarn
    DRIVER_MEMORY_OVERHEAD
	EXECUTOR_MEMORY : Amount of memory to use per executor process
    EXECUTOR_MEMORY_OVERHEAD: The amount of off-heap memory to be allocated per executor in cluster mode
	EXECUTOR_CORES = ConfigBuilder("spark.executor.cores")
	EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.executor.memoryOverhead")
     // Executor memory in MB.
      protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
      // Additional memory overhead.
      protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
        math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt

	// Resource capability requested for each executors
     private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)

package org.apache.spark.memory;
    public enum MemoryMode { ON_HEAP, OFF_HEAP}
	private[spark] abstract class MemoryManager(
      conf: SparkConf,
      numCores: Int,
      onHeapStorageMemory: Long,
      onHeapExecutionMemory: Long) extends Logging {
     # Tracks whether Tungsten memory will be allocated on the JVM heap or off-heap using sun.misc.Unsafe.
       final val tungstenMemoryMode: MemoryMode = {
         if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
           require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0,
             "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
           require(Platform.unaligned(),
             "No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
           MemoryMode.OFF_HEAP
         } else {
           MemoryMode.ON_HEAP
         }
       }

參考:

  https://spark.apache.org/docs/latest/configuration.html
  https://spark.apache.org/docs/latest/running-on-yarn.html#configuration


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM