spark 2.3 導致driver OOM的一個SparkPlanGraphWrapper源碼的bug



背景

長話短說,我們部門一個同事找到我,說他的spark 2.3 structured streaming程序頻繁報OOM,從來沒有堅持過超過三四天的,叫幫看一下。
這種事情一般我是不願意看的,因為大部分情況下spark oom就那么幾種可能:

  • 數據量拉太大,executor內存爆了;
  • shuffle過程中數據量太大,shuffle數太少,內存又爆了;
  • 閑着蛋疼調用collect之類的方法,把數據往dirver上一聚合,driver內存爆了
  • 閑着蛋疼又調用了一下persist還把結果存內存,還是爆了

這些問題基本都可以通過限制每次拉取的數據/加大內存/該分片分片解決。

但這次這個我看了一下,還真不是上面這些日常問題,值得記錄一下。

過程

過了一遍程序和數據,肉眼感覺沒毛病,這些地方都沒問題,只好祭出大殺器:

-XX:+HeapDumpOnOutOfMemoryError

順便還加上了printGC全家桶。

程序再次掛掉后,先看了一眼gc日志,發現老年代內存使用量持續增大,fgc前后幾乎無變化,那么就不是數據太大裝不下,應該是內存泄漏沒跑了,再看dump文件。

拿MAT打開文件,很容易就定位到了內存泄漏點,如下圖所示:
image_1ceja10mg1fcklvg17a8n90dbjm.png-143.9kB
直奔源碼:

public class InMemoryStore implements KVStore {

  private Object metadata;
  //這里就是那個5個多g大的map
  private ConcurrentMap<Class<?>, InstanceList> data = new ConcurrentHashMap<>();
  
  ......
}

沒毛病,能對上。所以問題應該比較清晰了,spark應該是每次執行batch時在什么地方往這個map里加了很多數據,但是又忘記了移除掉已經過期的部分,所以導致gc無效了。

那接下來要問的就是,什么地方做了put操作而又沒有remove呢?我們再來看看下這個5個g的InmemoryStore的引用到底被誰持有:
image_1cejc3sm31qt11esr11er16dmerl13.png-55.2kB

圖里很明顯,接下來我們要看ElementTrackingStore的實現,我順便把這個類的說明也放在這里:

/**
 * A KVStore wrapper that allows tracking the number of elements of specific types, and triggering
 * actions once they reach a threshold. This allows writers, for example, to control how much data
 * is stored by potentially deleting old data as new data is added.
 *
 * This store is used when populating data either from a live UI or an event log. On top of firing
 * triggers when elements reach a certain threshold, it provides two extra bits of functionality:
 *
 * - a generic worker thread that can be used to run expensive tasks asynchronously; the tasks can
 *   be configured to run on the calling thread when more determinism is desired (e.g. unit tests).
 * - a generic flush mechanism so that listeners can be notified about when they should flush
 *   internal state to the store (e.g. after the SHS finishes parsing an event log).
 *
 * The configured triggers are run on a separate thread by default; they can be forced to run on
 * the calling thread by setting the `ASYNC_TRACKING_ENABLED` configuration to `false`.
 */
 private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore {

  import config._

  private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
  private val flushTriggers = new ListBuffer[() => Unit]()
  private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) {
    ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker")
  } else {
    MoreExecutors.sameThreadExecutor()
  }

  @volatile private var stopped = false

  /**
   * Register a trigger that will be fired once the number of elements of a given type reaches
   * the given threshold.
   *
   * @param klass The type to monitor.
   * @param threshold The number of elements that should trigger the action.
   * @param action Action to run when the threshold is reached; takes as a parameter the number
   *               of elements of the registered type currently known to be in the store.
   */
  def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): Unit = {
    val existing = triggers.getOrElse(klass, Seq())
    triggers(klass) = existing :+ Trigger(threshold, action)
  }
  
  ......
 }
 

這個類的方法里,我們需要關注的就是這個addTrigger方法,其注釋也寫的很明白,就是用來當保存的對象達到一定數目后觸發的操作。

這時候心里就猜一下是不是什么地方的trigger寫錯了,所以我們再看看這個方法都在哪里使用了:
image_1cejd4i1t1e92c0f1sr09n61ne81g.png-41.6kB
考慮到我們溢出的對象都是SparkPlanGraphNode,所以先看最下面我選中的藍色那一行的代碼:

kvstore.addTrigger(classOf[SQLExecutionUIData], conf.get(UI_RETAINED_EXECUTIONS)) { count =>
    cleanupExecutions(count)
  }
  
private def cleanupExecutions(count: Long): Unit = {
    val countToDelete = count - conf.get(UI_RETAINED_EXECUTIONS)
    if (countToDelete <= 0) {
      return
    }

    val view = kvstore.view(classOf[SQLExecutionUIData]).index("completionTime").first(0L)
    val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_.completionTime.isDefined)
    //出錯的就是這一行
    toDelete.foreach { e => kvstore.delete(e.getClass(), e.executionId) }
  }

看到了吧,這里在觸發trigger的時候,壓根沒有刪除SparkPlanGraphWrapper的相關邏輯,難怪會報oom!

結果

按理說到這里就差不多了,這個OOM的鍋還真不能讓同事背,的確是spark的一個bug。但是我很好奇,這么大一個問題,spark社區難道就沒有動靜嗎?所以我就去社區搜了一下,發現了這個:
Memory leak of SparkPlanGraphWrapper in sparkUI
所以確認了,這個地方確實是spark2.3的一個隱藏bug,在2.3.1和2.4.0中被修復了,有興趣的童鞋可以點進去看看。


全文完。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM