cache和persist都是用於將一個RDD進行緩存的,這樣在之后使用的過程中就不需要重新計算了,可以大大節省程序運行時間。
cache和persist的區別
基於Spark 1.6.1 的源碼,可以看到
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): this.type = persist()
說明是cache()調用了persist(), 想要知道二者的不同還需要看一下persist函數:
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
可以看到persist()內部調用了persist(StorageLevel.MEMORY_ONLY),繼續深入:
/** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. This can only be used to assign a new storage level if the RDD does not * have a storage level set yet.. */ def persist(newLevel: StorageLevel): this.type = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level") } sc.persistRDD(this) // Register the RDD with the ContextCleaner for automatic GC-based cleanup sc.cleaner.foreach(_.registerRDDForCleanup(this)) storageLevel = newLevel this }
可以看出來persist有一個 StorageLevel 類型的參數,這個表示的是RDD的緩存級別。
至此便可得出cache和persist的區別了:cache只有一個默認的緩存級別MEMORY_ONLY ,而persist可以根據情況設置其它的緩存級別。
RDD的緩存級別
順便看一下RDD都有哪些緩存級別,查看 StorageLevel 類的源碼:
object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false) ...... }
可以看到這里列出了12種緩存級別,但這些有什么區別呢?可以看到每個緩存級別后面都跟了一個StorageLevel的構造函數,里面包含了4個或5個參數,如下
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
查看其構造函數
class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, private var _replication: Int = 1) extends Externalizable { ...... def useDisk: Boolean = _useDisk def useMemory: Boolean = _useMemory def useOffHeap: Boolean = _useOffHeap def deserialized: Boolean = _deserialized def replication: Int = _replication ...... }
可以看到StorageLevel類的主構造器包含了5個參數:
- useDisk:使用硬盤(外存)
- useMemory:使用內存
- useOffHeap:使用堆外內存,這是Java虛擬機里面的概念,堆外內存意味着把內存對象分配在Java虛擬機的堆以外的內存,這些內存直接受操作系統管理(而不是虛擬機)。這樣做的結果就是能保持一個較小的堆,以減少垃圾收集對應用的影響。
- deserialized:反序列化,其逆過程序列化(Serialization)是java提供的一種機制,將對象表示成一連串的字節;而反序列化就表示將字節恢復為對象的過程。序列化是對象永久化的一種機制,可以將對象及其屬性保存起來,並能在反序列化后直接恢復這個對象
- replication:備份數(在多個節點上備份)
理解了這5個參數,StorageLevel 的12種緩存級別就不難理解了。
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用這種緩存級別的RDD將存儲在硬盤以及內存中,使用序列化(在硬盤中),並且在多個節點上備份2份(正常的RDD只有一份)
另外還注意到有一種特殊的緩存級別
val OFF_HEAP = new StorageLevel(false, false, true, false)
使用了堆外內存,StorageLevel 類的源碼中有一段代碼可以看出這個的特殊性,它不能和其它幾個參數共存。
if (useOffHeap) { require(!useDisk, "Off-heap storage level does not support using disk") require(!useMemory, "Off-heap storage level does not support using heap memory") require(!deserialized, "Off-heap storage level does not support deserialized storage") require(replication == 1, "Off-heap storage level does not support multiple replication") }