根據業務需求,需要對pyspark內存資源進行限制
本文使用的環境為pyspark 3.1.2,standalone模式
不足之處還請指出
pyspark進程說明
首先我們需要知道對pyspark進行內存限制,是限制哪部分的內存。
先看一下執行pyspark任務需要啟動哪些進程
pyspark與原版基於scala的spark啟動的進程大體相似但略有不同。
當啟動一個pyspark任務時,可以看到產生了2個系列的進程,分別是負責driver和executor
driver:
編號 | 說明 | 內存 |
---|---|---|
d1 | spark的driver端,spark-submit進程,運行在jvm,啟動sparkContext,構建dag等 | spark算子在driver端用到的內存,包括collect等 |
d2 | spark的driver端啟動的pyspark的driver端,執行python部分代碼,通過py4j與d1通信 | python代碼中所用到的內存,包括創建一些變量等 |
executor:
編號 | 說明 | 內存 |
---|---|---|
e1 | spark的worker節點 | 不關注 |
e2 | 設備上其他spark任務的executor backend,與本文無關 | 不關注 |
e3 | 本任務對應的spark的executor backend,運行jvm中 | spark在executor端使用的內存,包括collect等算子、shuffle等 |
e4 | 本任務對應的pyspark的executor backend,管理具體執行task的worker | 占用少量內存 |
e5 | 具體執行pyspark中的python task的任務的worker,由e4 fork得到,執行算子中的自定義Python函數等 | pyspark在executor端使用的內存,通過python執行,包括map中的func等 |
可以看到,pyspark任務中,主要需要對4處進行內存限制
- spark的driver
- spark的executor
- pyspark的driver
- pyspark的executor
后兩個是pyspark比spark多出來的。
官方配置
關於spark中的內存,可以關注官方配置文檔
其中,重點關注3條配置信息
Property Name | Default | Meaning | Since Version |
---|---|---|---|
spark.driver.memory | 1g | Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m , 2g ). Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file. |
1.1.1 |
spark.executor.memory | 1g | Amount of memory to use per executor process, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m , 2g ). |
0.7.0 |
spark.executor.pyspark.memory | Not set | The amount of memory to be allocated to PySpark in each executor, in MiB unless otherwise specified. If set, PySpark memory for an executor will be limited to this amount. If not set, Spark will not limit Python's memory use and it is up to the application to avoid exceeding the overhead memory space shared with other non-JVM processes. When PySpark is run in YARN or Kubernetes, this memory is added to executor resource requests. Note: This feature is dependent on Python's resource module; therefore, the behaviors and limitations are inherited. For instance, Windows does not support resource limiting and actual resource is not limited on MacOS. |
2.4.0 |
spark.driver.memory和spark.executor.memory這兩個參數限制就是spark端driver和executor的內存,
對需要在jvm中執行的任務進行了很好的限制,
但如上文所述,還需要對pyspark端的內存進行限制。
pyspark的executor內存限制
spark.executor.pyspark.memory這個參數是對pyspark的executor內存進行了限制
根據pyspark中worker.py
# set up memory limits
memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1"))
if memory_limit_mb > 0 and has_resource_module:
total_memory = resource.RLIMIT_AS
try:
(soft_limit, hard_limit) = resource.getrlimit(total_memory)
msg = "Current mem limits: {0} of max {1}\n".format(soft_limit, hard_limit)
print(msg, file=sys.stderr)
# convert to bytes
new_limit = memory_limit_mb * 1024 * 1024
if soft_limit == resource.RLIM_INFINITY or new_limit < soft_limit:
msg = "Setting mem limits to {0} of max {1}\n".format(new_limit, new_limit)
print(msg, file=sys.stderr)
resource.setrlimit(total_memory, (new_limit, new_limit))
except (resource.error, OSError, ValueError) as e:
# not all systems support resource limits, so warn instead of failing
print("WARN: Failed to set memory limit: {0}\n".format(e), file=sys.stderr)
看到,其實這個參數主要是使用了Python的resource模塊進行了內存限制
然而,這里面設置的resource.RLIMIT_AS是對虛擬內存進行限制
我們通常想限制的是駐留內存。
例如一個小測試
import resource
resource.setrlimit(resource.RLIMIT_AS, (1*1024**3, -1))
def fun():
tmp = []
for i in range(1024**3):
try:
tmp.append('a'*1024)
except MemoryError:
break
return tmp
x = fun(), fun(), fun(), fun()
通過resource.setrlimit限制了1g內存。resource.RLIMIT_AS為虛擬內存的flag,RLIMIT_RSS為駐留內存,但只在老linux內核中生效,現在無法對內核態操作
運行后資源如下
virt達到了限制的1g,但res只有900m。在其他情況下,通常virt遠遠大於res,這樣virt達到了我們限制好的數值,但是res很小,內存遠遠沒得到充分利用,造成資源浪費。
另注:
在standalone模式下,每個worker(e5)限制的virt內存是在application啟動時計算好的。通過spark.executor.pyspark.memory 除以 --executor-cores 得到。
\(workerMemoryMb =memoryMb / execCores\)
減少每個stage的task個數並不能增加每個worker的virt內存限制大小
pyspark的driver內存限制
pyspark的driver負責執行python流程代碼,內存包含Python中創建的各種變量等
spark官方似乎沒有參數對這部分內存進行限制
可以自行使用resource模塊,對virt內存進行限制
報錯信息參考
spark的driver和executor出現oom后,會產生java.lang.OutOfMemoryError: Java heap space報錯信息
pyspark的driver和executor出現oom后,產生MemoryError,附有對應python代碼
cgroup管理內存
Control groups,是一種Linux內核特性,對進程進行分級分組管理,不同組用不同資源限制並監控。
可以對pyspark的駐留內存進行管理
安裝
以centos為例
yum install -y libcgroup libcgroup-tools
分組配置
這里先設置了一個組,用作pyspark的總體控制
再在這個組中設置兩個組,分別對driver端的進程和executor的進程進行了限制
/sys/fs/cgroup/memory這個路徑是cgroup對memory進行控制的配置,在這里建立對應文件夾來建立具體分組
首先是整體分組
mkdir /sys/fs/cgroup/memory/pyspark
再driver和executor分別建組控制
mkdir /sys/fs/cgroup/memory/pyspark/driver
mkdir /sys/fs/cgroup/memory/pyspark/executor
建組后,cgroup會自動生成一些配置文件,如下圖
關於每一項的說明可以參考大佬的文檔
在這里主要關注memory.limit_in_bytes和cgroup.procs
memory.limit_in_bytes為當前限制的內存額度。超過額度的話會對相應進程進行kill
可以使用echo重定向對這個進行限制
echo 10g > /sys/fs/cgroup/memory/ai_pyspark/driver/memory.limit_in_bytes
echo 50g > /sys/fs/cgroup/memory/ai_pyspark/executor/memory.limit_in_bytes
則將這個分組的內存限制為10g和50g
cgroup.procs中包含這個分組中的pid
可將spark-submit和worker的pid追加進這個文件,cgroup便將這個進行拉進這個分組進行管理
echo 160224 >> /sys/fs/cgroup/memory/ai_pyspark/driver/cgroup.procs
echo 167910 >> /sys/fs/cgroup/memory/ai_pyspark/executor/cgroup.procs
cgroup會將進程中新產生的子進程自動加入cgroup.procs
例如當產生新的pyspark.daemon時,cgroup會將對應的pid添加到executor分組中
linux系統中每一個進程都有自己的分組,我們沒配置分組的進程會在/sys/fs/cgroup/memory分組中,如果想將某個分組中的某個pid移除這個分組,只需將他移動到另一個分組,例如
echo 167910 >> /sys/fs/cgroup/memory/cgroup.procs
另注:
如果executor發生oom,當前spark executor backend進程掛掉,spark會啟動一個新的executor backend,不要忘記將新的executor pid再加入cgroup.procs
參考
cgroups(7) — Linux manual page