Dpark內存溢出
Spark內存溢出
堆內內存溢出
堆外內存溢出
堆內內存溢出
java.lang.OutOfMemoryError: GC overhead limit execeeded
java.lang.OutOfMemoryError: Java heap space
具體說明
Heap size JVM堆的設置是指java程序運行過程中JVM可以調配使用的內存空間的設置.
JVM在啟動的時候會自動設置Heap size的值,
Heap size 的大小是Young Generation 和Tenured Generaion 之和。
提示:在JVM中如果98%的時間是用於GC且可用的Heap size 不足2%的時候將拋出此異常信息
Driver heap OOM的三大原因:
(1).用戶在Driver端口生成大對象, 比如創建了一個大的集合數據結構
(2).從Executor端收集數據回Driver端
Executor heap OOM
堆外內存溢出
報錯情況
Container killed by YARN for exceeding memory limits. 1*.4 GB of 1* GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.
基本內容介紹:
1.executor 和 container
01.Spark中的 executor 進程是跑在 container 中,所以container的最大內存會直接影響到executor的最大可用內存
02. yarn.nodemanager.pmem-check-enabled 該參數默認是true,也就是會由它來控制監控container的內存使用
03. yarn.scheduler.maximum-allocation-mb 設置值6114,也就是說單個container申請的最大內存是這么多,
執行任務的時候你的executer需要的內存超出這個值,那么就會被殺掉
container超過了內存的限制從而被kill掉
04.executor執行的時候,用的內存可能會超過 executor-memory
所以會為executor額外預留一部分內存。spark.yarn.executor.memoryOverhead 代表了這部分內存
即實際的內存
val executorMem = args.executorMemory + executorMemoryOverhead
05.memoryOverhead
如果沒有設置 spark.yarn.executor.memoryOverhead ,則這部分的內存大小為
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
其中 MEMORY_OVERHEAD_FACTOR 默認為0.1, MEMORY_OVERHEAD_MIN 默認為384m executorMemory 為設置的 executor-memory
實際 executorMem= X+max(X*0.1,384)
設置了的話
executorMem=X +spark.yarn.executor.memoryOverhead 其中 X 是值 args.executorMemory
06. executorMem 需要滿足的條件: executorMem< yarn.scheduler.maximum-allocation-mb
2.Yarn 中 contaimer 和 Spark中 partition 之間的關系
job會被切分成stages,每個stage切分成task,每個task單獨調度,可以把executor的jvm進程看做task執行池
spark.executor.memory 每個executor使用的內存
一個executor可以並行執行多個task,實際上一個executor是一個進程,task是executor里的一個線程。
一個task至少要獨占executor里的一個虛擬核心vcore, 一個task要占用幾個核心,可以由.config("spark.task.cpus", 1)配置,默認是1即一個task占用一個vcore
同時並行執行的task最大數量 = executor數目 * (每個executor核數 / 每個task占用核心數)
總核數= executor-cores * num-executor
例如: 每個 executor具有3個 cores 理論上每個executor可以處理1-4個task
3.分區與Task的情況
讀取階段
01.從內存中創建 RDD:sc.parallelize(...),那么默認的分區數量為該程序所分配的資源的 CPU 數量。
02.如果是讀取hdfs的文件,
一般來說,partition的數量等於文件的數量。
如果單個文件的大小大於hdfs的分塊大小,partition的數量就等於 “文件大小/分塊大小”。
同時,也可以使用rdd的repartition方法重新划分partition。
運算階段
經過不同的算子計算后,分區數目又會變化
Task 的數量是由 Partition 決定的
在Spark中有兩類task,一類是shuffleMapTask,一類是resultTask,
第一類task的輸出是shuffle所需數據,
第二類task的輸出是result,
可能的原因:
1、數據出現了傾斜等原因導致其中一個 contaimer 內存負載太大 運行失敗
2. Spark的shuffle部分使用了netty框架進行網絡傳輸,但netty會申請堆外內存緩存 Shuffle時,
每個Reduce都需要獲取每個map對應的輸出,
當一個reduce需要獲取的一個map數據比較大 超出配置的限制就報了這個錯。
通過spark.sql.adaptive.shuffle.targetPostShuffleInputSize 可設置每個 Reducer 讀取的目標數據量,其單位是字節,默認值為 64 MB。
解決內存overhead的問題的方法是:
1.將"spark.executor.memory" 從8g設置為12g。將內存調大
2.將"spark.executor.cores" 從8設置為4。 將core的個數調小。
3.將rdd/dateframe進行重新分區 。 重新分區(repartition)
4.將"spark.yarn.executor.memoryOverhead"設置為最大值,可以考慮一下4096。這個數值一般都是2的次冪。
具體參數配置
set spark.sql.adaptive.repartition.enabled=true;
set spark.sql.shuffle.partitions=2000;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=67108864;
數據傾斜
數據傾斜解決
1. 熱點數據
01.熱點數據已知
過濾: 過濾無關數據以及影響可接受范圍內的熱點數據: NULL值以及空字符串等值
分離:
02.熱點數據未知:
打散: 添加隨機字段前綴等方式 隨機key實現雙重聚合
2.執行方式
數據源數據文件不均勻
Shuffle過程中數據分布不均
緩解 - 處理shuffle的情況:
join中有數據傾斜:
一大一小: 將reduce join轉換為map join
兩個數據都比較大: 隨機數以及擴容表進行join
專有名詞解釋
1.常用配置
配置任務可用executor數量
每個Executor占用內存
每個Executor的core數目 spark.executor.cores
The maximum memory size of container to running driver
is determined by
the sum of
spark.driver.memoryOverhead
spark.driver.memory.
The maximum memory size of container to running executor
is determined by
the sum of
spark.executor.memory,
spark.executor.memoryOverhead,
spark.memory.offHeap.size
spark.executor.pyspark.memory.
Shuffle Behavior
Memory Management
spark.memory.fraction
在Spark中,執行和存儲共享一個統一的區域M
代表整體JVM堆內存中M的百分比(默認0.6)。
剩余的空間(40%)是為用戶數據結構、Spark內部metadata預留的,並在稀疏使用和異常大記錄的情況下避免OOM錯誤
spark.memory.storageFraction
Note: Non-heap memory includes off-heap memory (when spark.memory.offHeap.enabled=true)
and memory used by other driver processes (e.g. python process that goes with a PySpark driver)
and memory used by other non-driver processes running in the same container
spark.executor.memoryOverhead
This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
spark.memory.offHeap.size
spark.memory.offHeap.enabled
源碼
package org.apache.spark.deploy.yarn
DRIVER_MEMORY_OVERHEAD
EXECUTOR_MEMORY : Amount of memory to use per executor process
EXECUTOR_MEMORY_OVERHEAD: The amount of off-heap memory to be allocated per executor in cluster mode
EXECUTOR_CORES = ConfigBuilder("spark.executor.cores")
EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.executor.memoryOverhead")
// Executor memory in MB.
protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
// Additional memory overhead.
protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
// Resource capability requested for each executors
private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
package org.apache.spark.memory;
public enum MemoryMode { ON_HEAP, OFF_HEAP}
private[spark] abstract class MemoryManager(
conf: SparkConf,
numCores: Int,
onHeapStorageMemory: Long,
onHeapExecutionMemory: Long) extends Logging {
# Tracks whether Tungsten memory will be allocated on the JVM heap or off-heap using sun.misc.Unsafe.
final val tungstenMemoryMode: MemoryMode = {
if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0,
"spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
require(Platform.unaligned(),
"No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
MemoryMode.OFF_HEAP
} else {
MemoryMode.ON_HEAP
}
}
參考:
https://spark.apache.org/docs/latest/configuration.html
https://spark.apache.org/docs/latest/running-on-yarn.html#configuration