問題描述與分析
題目中的問題大致可以描述為:
由於某個 Executor 沒有按時向 Driver 發送心跳,而被 Driver 判斷該 Executor 已掛掉,此時 Driver 要把 該 Executor 上執行的任務發送給另外一個 Executor 重新執行;
默認等待時長為 spark.network.timeout=120s
完整報錯大概如下
17/01/13 09:13:08 WARN spark.HeartbeatReceiver: Removing executor 5 with no recent heartbeats: 161684 ms exceeds timeout 120000 ms 17/01/13 09:13:08 ERROR cluster.YarnClusterScheduler: Lost executor 5 on slave10: Executor heartbeat timed out after 161684 ms 17/01/13 09:13:08 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, slave10): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 161684 ms 17/01/13 09:13:08 INFO scheduler.DAGScheduler: Executor lost: 5 (epoch 0) 17/01/13 09:13:08 INFO cluster.YarnClusterSchedulerBackend: Requesting to kill executor(s) 5 17/01/13 09:13:08 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 0.0 (TID 5, slave06, partition 0,RACK_LOCAL, 8029 bytes) 17/01/13 09:13:08 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 5 from BlockManagerMaster. 17/01/13 09:13:08 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(5, slave10, 34439) 17/01/13 09:13:08 INFO storage.BlockManagerMaster: Removed 5 successfully in removeExecutor 17/01/13 09:13:08 INFO scheduler.DAGScheduler: Host added was in lost list earlier: slave10 17/01/13 09:13:08 INFO yarn.ApplicationMaster$AMEndpoint: Driver requested to kill executor(s) 5. 17/01/13 09:13:08 INFO scheduler.TaskSetManager: Finished task 0.1 in stage 0.0 (TID 5) in 367 ms on slave06 (5/5) 17/01/13 09:13:08 INFO scheduler.DAGScheduler: ResultStage 0 (saveAsNewAPIHadoopFile at DataFrameFunctions.scala:55) finished in 162.495 s
Executor 沒有按時向 Driver 發送心跳,原因可能為
1. 真的掛掉
2. 執行某任務由於資源不夠造成死機
3. 其他原因
我們主要解決第二個
解決方案
增加資源---增加 memoryOverhead
簡單來解釋下
spark.yarn.executor.memoryOverhead 的內存是由 spark tungsten 自己的管理機制去管理,用的時候申請,用完釋放,內存利用率高,【鑒於 JVM 的內存管理,GC 效率低,才有了這種管理機制】
而 spark.executor.memory 的內存是由 JVM 管理,分配,回收,涉及多種垃圾回收機制,用不好效率低
原因分析
如果用於存儲 RDD 的空間不足,那么后存儲的 RDD 的 partition 會覆蓋之前的 RDD 的 partition,導致之前的 RDD 丟失,當使用丟失的 RDD partition 時,需要重新計算;
如果 java 堆或者永久代的內存不足,則會產生各種 OOM 情況,executor 會被殺死,spark 會重新申請一個 container 運行 executor,失敗的 task 或者丟失的數據都會在這個 executor 上重新執行;
如果實際運行過程中,ExecutorMemory + MemoryOverhead 之和(JVM 進程總內存) 超過 container 容量,yarn 會直接殺死該 container,executor 日志中不會有記錄,spark 會重新申請 container 運行 executor;
如果 java 堆以外的 JVM 進程占用內存較多,需要將 MemoryOverhead 設置足夠大,否則 executor 將會被殺死
具體操作
spark.yarn.executor.memoryOverhead 的默認配置為 max(executorMemory * 0.10, 384),單位為 M
我們可以手動設置
--conf spark.yarn.executor.memoryOverhead=512
--conf spark.yarn.driver.memoryOverhead=512
減少資源占用---使用 combineByKey
如 reduceByKey,可以有效的減少內存占用
rdd.repartition(20).map(mymap).groupBy(mygroup).mapValues(len).collect() rdd.repartition(20).map(mymap).map(mygroup).reduceByKey(lambda x, y: x+y).collect()
另外還有幾種較為簡單的方法
1. 增加等待時長 spark.network.timeout
2. 在資源不變的情況下,增加 executor 內存,減少 executor 數量,增加 executor cores,這個自己想想,反正就是總的不變,保證每個 task 有足夠內存
參考資料:
https://blog.csdn.net/gangchengzhong/article/details/76474129
https://www.cnblogs.com/RichardYD/p/6281745.html spark yarn任務的executor 無故 timeout之原因分析
http://f.dataguru.cn/thread-906602-1-1.html spark.yarn.executor.memoryOverhead