Spark 代碼走讀之 Cache


Spark是基於內存的計算模型,但是當compute chain非常長或者某個計算代價非常大時,能將某些計算的結果進行緩存就顯得很方便了。Spark提供了兩種緩存的方法 Cache 和 checkPoint。本章只關注 Cache (基於spark-core_2.10),在后續的章節中會提到 checkPoint.

主要從以下三方面來看

  1. persist時發生什么
  2. 執行action時如何去緩存及讀取緩存
  3. 如何釋放緩存

定義緩存

spark的計算是lazy的,只有在執行action時才真正去計算每個RDD的數據。要使RDD緩存,必須在執行某個action之前定義RDD.persist(),此時也就定義了緩存,但是沒有真正去做緩存。RDD.persist會調用到SparkContext.persistRDD(rdd),同時將RDD注冊到ContextCleaner中(后面會講到這個ContextCleaner)。

def persist(newLevel: StorageLevel): this.type = {
    // TODO: Handle changes of StorageLevel
    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
      throw new UnsupportedOperationException(
        "Cannot change storage level of an RDD after it was already assigned a level")
    }
    sc.persistRDD(this)
    // Register the RDD with the ContextCleaner for automatic GC-based cleanup
    sc.cleaner.foreach(_.registerRDDForCleanup(this))
    storageLevel = newLevel
    this
  }

sc.persistRDD很簡單,將(rdd.id, rdd)加到persistentRdds中。persistentRDDs一個HashMap,key就是rdd.id,value是一個包含時間戳的對rdd的弱引用。persistentRDDs用來跟蹤已經被標記為persist的RDD的引用的。

所以在定義緩存階段,做了兩件事:一是設置了rdd的StorageLevel,而是將rdd加到了persistentRdds中並在ContextCleaner中注冊。

緩存

當執行到某個action時,真正計算才開始,這時會調用DAGScheduler.submitJob去提交job,通過rdd.iterator()來計算partition。

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

iterator的邏輯很清楚,如果srorageLevel被標記過了就去CacheManager取,否則自己compute或者從checkPoint讀取。

在cacheManager.getOrCompute中,通過RDDBlockId嘗試去BlockManager中得到緩存的數據。如果緩存得不到(第一次計算),並調用computeOrReadCheckPoint去計算,並將結果cache起來,cache是通過putInBlockManger實現。根據StorageLevel,如果是緩存在內存中,會將結果存在MemoryStore的一個HashMap中,如果是在disk,結果通過DiskStore.put方法存到磁盤的某個文件夾中。這個文件及最終由Utils中的方法確定

private def getOrCreateLocalRootDirsImpl(conf: SparkConf): Array[String] = {
    if (isRunningInYarnContainer(conf)) {
      // If we are in yarn mode, systems can have different disk layouts so we must set it
      // to what Yarn on this system said was available. Note this assumes that Yarn has
      // created the directories already, and that they are secured so that only the
      // user has access to them.
      getYarnLocalDirs(conf).split(",")
    } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
      conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
    } else {
      // In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
      // configuration to point to a secure directory. So create a subdirectory with restricted
      // permissions under each listed directory.
      Option(conf.getenv("SPARK_LOCAL_DIRS"))
        .getOrElse(conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
        .split(",")
        .flatMap { root =>
          try {
            val rootDir = new File(root)
            if (rootDir.exists || rootDir.mkdirs()) {
              val dir = createTempDir(root)
              chmod700(dir)
              Some(dir.getAbsolutePath)
            } else {
              logError(s"Failed to create dir in $root. Ignoring this directory.")
              None
            }
          } catch {
            case e: IOException =>
            logError(s"Failed to create local root dir in $root. Ignoring this directory.")
            None
          }
        }
        .toArray
    }
  }

如果已經緩存了,那么cacheManager.getOrCompute在調用blockManger.get(RDDBlockId)時會返回結果。get會先調用getLocal在本地獲取,如果本地沒有則調用getRemote去遠程尋找,getRemote會call BlockMangerMaster.getLocation得到緩存的地址。

釋放

Spark通過調用rdd.unpersit來釋放緩存,這是通過SparkContext.unpersistRDD來實現的。在unpersistRDD中,rdd會從persistentRdds中移除,並通知BlockManagerMaster去刪除數據緩存。BlockManagerMaster會通過消息機制告訴exectutor去刪除內存或者disk上的緩存數據。

那么問題來了,如果用戶不通過手動來unpersit,那緩存豈不是越積越多,最后爆掉嗎?

是的,你的想法完全合理。因此Spark會自動刪除不在scope內的緩存。“不在scope”指的是在用戶程序中已經沒有了該RDD的引用,RDD的數據是不可讀取的。這里就要用到之前提到的ContextCleaner。ContextCleaner存了CleanupTaskWeakReference弱引用及存放該引用的隊列。當系統發生GC將沒有強引用的rdd對象回收后,這個弱引用會加入到隊列中。ContextCleaner起了單獨的一個線程輪詢該隊列,將隊列中的弱引用取出,根據引用中的rddId觸發sc.unpersistRDD。通過這樣Spark能及時的將已經垃圾回收的RDD對應的cache進行釋放。這里要清楚rdd與數據集的關系,rdd只是一個定義了計算邏輯的對象,對象本身不會包含其所代表的數據,數據要通過rdd.compute計算得到。所以系統回收rdd,只是回收了rdd對象,並沒有回收rdd代表的數據集。

此外,SparkContext中還有一個MetadataCleaner,該cleaner會移除persistentRdds中的過期的rdd。(筆者一直沒清楚這個移除和cache釋放有什么關系??)

 

Reference:

https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence

http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/

https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md

http://blog.csdn.net/yueqian_zhu/article/details/48177353

http://www.cnblogs.com/jiaan-geng/p/5189177.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM