Spark是基於內存的計算模型,但是當compute chain非常長或者某個計算代價非常大時,能將某些計算的結果進行緩存就顯得很方便了。Spark提供了兩種緩存的方法 Cache 和 checkPoint。本章只關注 Cache (基於spark-core_2.10),在后續的章節中會提到 checkPoint.
主要從以下三方面來看
- persist時發生什么
- 執行action時如何去緩存及讀取緩存
- 如何釋放緩存
定義緩存
spark的計算是lazy的,只有在執行action時才真正去計算每個RDD的數據。要使RDD緩存,必須在執行某個action之前定義RDD.persist(),此時也就定義了緩存,但是沒有真正去做緩存。RDD.persist會調用到SparkContext.persistRDD(rdd),同時將RDD注冊到ContextCleaner中(后面會講到這個ContextCleaner)。
def persist(newLevel: StorageLevel): this.type = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level") } sc.persistRDD(this) // Register the RDD with the ContextCleaner for automatic GC-based cleanup sc.cleaner.foreach(_.registerRDDForCleanup(this)) storageLevel = newLevel this }
sc.persistRDD很簡單,將(rdd.id, rdd)加到persistentRdds中。persistentRDDs一個HashMap,key就是rdd.id,value是一個包含時間戳的對rdd的弱引用。persistentRDDs用來跟蹤已經被標記為persist的RDD的引用的。
所以在定義緩存階段,做了兩件事:一是設置了rdd的StorageLevel,而是將rdd加到了persistentRdds中並在ContextCleaner中注冊。
緩存
當執行到某個action時,真正計算才開始,這時會調用DAGScheduler.submitJob去提交job,通過rdd.iterator()來計算partition。
final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) } else { computeOrReadCheckpoint(split, context) } }
iterator的邏輯很清楚,如果srorageLevel被標記過了就去CacheManager取,否則自己compute或者從checkPoint讀取。
在cacheManager.getOrCompute中,通過RDDBlockId嘗試去BlockManager中得到緩存的數據。如果緩存得不到(第一次計算),並調用computeOrReadCheckPoint去計算,並將結果cache起來,cache是通過putInBlockManger實現。根據StorageLevel,如果是緩存在內存中,會將結果存在MemoryStore的一個HashMap中,如果是在disk,結果通過DiskStore.put方法存到磁盤的某個文件夾中。這個文件及最終由Utils中的方法確定
private def getOrCreateLocalRootDirsImpl(conf: SparkConf): Array[String] = { if (isRunningInYarnContainer(conf)) { // If we are in yarn mode, systems can have different disk layouts so we must set it // to what Yarn on this system said was available. Note this assumes that Yarn has // created the directories already, and that they are secured so that only the // user has access to them. getYarnLocalDirs(conf).split(",") } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) { conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator) } else { // In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user // configuration to point to a secure directory. So create a subdirectory with restricted // permissions under each listed directory. Option(conf.getenv("SPARK_LOCAL_DIRS")) .getOrElse(conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) .split(",") .flatMap { root => try { val rootDir = new File(root) if (rootDir.exists || rootDir.mkdirs()) { val dir = createTempDir(root) chmod700(dir) Some(dir.getAbsolutePath) } else { logError(s"Failed to create dir in $root. Ignoring this directory.") None } } catch { case e: IOException => logError(s"Failed to create local root dir in $root. Ignoring this directory.") None } } .toArray } }
如果已經緩存了,那么cacheManager.getOrCompute在調用blockManger.get(RDDBlockId)時會返回結果。get會先調用getLocal在本地獲取,如果本地沒有則調用getRemote去遠程尋找,getRemote會call BlockMangerMaster.getLocation得到緩存的地址。
釋放
Spark通過調用rdd.unpersit來釋放緩存,這是通過SparkContext.unpersistRDD來實現的。在unpersistRDD中,rdd會從persistentRdds中移除,並通知BlockManagerMaster去刪除數據緩存。BlockManagerMaster會通過消息機制告訴exectutor去刪除內存或者disk上的緩存數據。
那么問題來了,如果用戶不通過手動來unpersit,那緩存豈不是越積越多,最后爆掉嗎?
是的,你的想法完全合理。因此Spark會自動刪除不在scope內的緩存。“不在scope”指的是在用戶程序中已經沒有了該RDD的引用,RDD的數據是不可讀取的。這里就要用到之前提到的ContextCleaner。ContextCleaner存了CleanupTaskWeakReference弱引用及存放該引用的隊列。當系統發生GC將沒有強引用的rdd對象回收后,這個弱引用會加入到隊列中。ContextCleaner起了單獨的一個線程輪詢該隊列,將隊列中的弱引用取出,根據引用中的rddId觸發sc.unpersistRDD。通過這樣Spark能及時的將已經垃圾回收的RDD對應的cache進行釋放。這里要清楚rdd與數據集的關系,rdd只是一個定義了計算邏輯的對象,對象本身不會包含其所代表的數據,數據要通過rdd.compute計算得到。所以系統回收rdd,只是回收了rdd對象,並沒有回收rdd代表的數據集。
此外,SparkContext中還有一個MetadataCleaner,該cleaner會移除persistentRdds中的過期的rdd。(筆者一直沒清楚這個移除和cache釋放有什么關系??)
Reference:
https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md