Spark緩存策略


當對同一個rdd多次執行action時,如果在磁盤上則每次執行action都會從磁盤將數據加載,如果將其緩存到內存中會提高再次action的讀取速度,Spark緩存主要有cache()和persist()兩種,當緩存一個rdd時,每一個節點上都會存放這個rdd的partition,當要使用rdd的時候可以直接從內存讀出。
cache源碼:
def cache(self): """ Persist this RDD with the default storage level (C{MEMORY_ONLY}). """ self.is_cached = True self.persist(StorageLevel.MEMORY_ONLY) return self

從源碼可以看出,cache底層調用的是persist方法,傳入的參數是:StorageLevel.MEMORY_ONLY,再看persist()方法:

def persist(self, storageLevel=StorageLevel.MEMORY_ONLY): self.is_cached = True javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) self._jrdd.persist(javaStorageLevel) return self

persist方法,傳入的參數是StorageLevel,從StorageLevel的源碼可以看出它的值總共有6種,因此persist()相比cache()在緩存形式上更為豐富,不僅支持內存的方式,還支持內存和磁盤、內存副本等方式。

 

StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False) StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2) StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False) StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2) StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False) StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2) StorageLevel.OFF_HEAP = StorageLevel(True, True, True, False, 1)

 

持久化到內存和直接從磁盤讀取時間對比:

import os import time from pyspark import SparkContext, SparkConf conf = SparkConf() sc = SparkContext(conf=conf) current_dir = os.path.dirname(os.path.realpath(__file__)) file_path = "{}/name_age.txt".format(current_dir) def cached(): start_time = time.time() text_rdd = sc.textFile("file://{}".format(file_path)).cache() text_rdd.count() text_rdd.count() end_time = time.time() print("{}:{}".format("first cache", end_time - start_time)) start1_time = time.time() text1_rdd = sc.textFile("file://{}".format(file_path)).cache() text1_rdd.count() text1_rdd.count() end1_time = time.time() print("{}:{}".format("second cache", end1_time - start1_time)) def uncached(): start_time = time.time() text_rdd = sc.textFile("file://{}".format(file_path)) text_rdd.count() text_rdd.count() end_time = time.time() print("{}:{}".format("first uncache", end_time - start_time)) start1_time = time.time() text1_rdd = sc.textFile("file://{}".format(file_path)) text1_rdd.count() text1_rdd.count() end1_time = time.time() print("{}:{}".format("second uncache", end1_time - start1_time)) sc.stop() 執行cached()結果: first cache:1.7104301452636719 second cache:0.2717571258544922 執行uncached()結果: first uncache:1.4453039169311523 second uncache:0.49161386489868164

從執行結果可以看出,當第二次執行rdd.count()時,有cache情況下是0.2717571258544922;無cache情況下是0.49161386489868164,由於我的內存空間不足,所以不太明顯,當數據量大且內存充足的時候,持久化到內存的效率會遠遠高於磁盤。

對pyspark有興趣的小伙伴可以關注我的github,spark for python 持續更新

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM