[Spark][pyspark]cache persist checkpoint 對RDD與DataFrame的使用記錄


結論

  1. cache操作通過調用persist實現,默認將數據持久化至內存(RDD)內存和硬盤(DataFrame),效率較高,存在內存溢出等潛在風險。
  2. persist操作可通過參數調節持久化地址,內存,硬盤,堆外內存,是否序列化,存儲副本數,存儲文件為臨時文件,作業完成后數據文件自動刪除。
  3. checkpoint操作,將數據持久化至硬盤,會切斷血緣,存在磁盤IO操作,速度較慢,作業完成后數據文件不會自動刪除。

注:

  • 當我們對RDD進行checkpoint操作時,只是暫時加上標記,表明該RDD需要被checkpoint,在之后的action操作后,runJob計算完RDD后,才會進行doCheckpoint操作,也就是數據進行持久化的過程,RDD需要被重新生成,第二次計算或者從persist后的存儲區中讀取。
  • DataFrame在進行checkpoint操作時,默認參數eager=True,會立刻進行一次count的action,這樣就完成了DataFrame數據的獲取,然后返回一個新的DataFrame,以此清理掉了前序依賴,降低DAG和physicalPlan復雜度。

持久化數據原因——lazy evaluation

Spark框架有惰性評估(lazy evaluation)性質,也稱懶執行性質,懶執行就是等到絕對需要時才執行計算。在Spark中,當用戶表達一些對數據的操作時,不是立即修改數據,而是建立一個作用到原始數據的轉換計划。Spark首先會將這個計划編譯為可以在集群中高效運行的流水線式的物理執行計划,然后等待,直到最后時刻才開始執行計算。

lazy evaluation的意義與影響:

  • 不運行action就不觸發計算,避免了大量的無意義的計算,即避免了大量的無意義的中間結果的產生,即避免產生無意義的磁盤I/O及網絡傳輸
  • 更深層次的意義在於,執行運算時,看到之前的計算操作越多,執行優化的可能性就越高
  • 不保留數據只保留操作流程的性質使得對象可重用,但數據不可重用,使得某些場景下需要使用持久化

常見使用場景

1. 存在遞歸式Join操作 eg:[Sample,feature1,feature2,...,featureN]→Table

由於Join屬於transformation算子,不屬於action算子,由於懶執行性質,每一次的join並不會執行,只是記錄執行計划,在最后table.show()時才會執行,這導致spark會在重復join操作時形成十分復雜的依賴關系。由於存在復雜的依賴關系,在引擎進行計算時,會不停出現資源申請和回收操作,最終導致任務的失敗,加入持久化算子后,如DF.persist()后,會添加-InMemoryTableScan依賴,達到數據的持久化,避免了每次都需要從頭運行的問題。

2. 存在數據重用 eg:df -操作A->A結果;df-操作B→B結果

由於懶執行性質,對象可以重用,但數據並不能重用,如果需要對同一個RDD/DF進行不同操作以得到不同結果時,若不進行持久化操作,將會從頭開始計算數據,使用數據持久化可以避免對一個RDD反復進行計算。提高作業運行效率。

3. 注重數據安全性 eg: 數據可能丟失的情況下,還要保證高性能

當數據丟失時,Spark會依據數據的依賴重新計算數據,若想保證運行效率,則需要對一些關鍵數據進行checkpoint操作,寫入HDFS或本地磁盤中,保證數據的安全。

RDD的持久化

持久化使用記錄

點擊查看RDD重用demo
import findspark #如果要使用findspark配置,必須寫在import pyspark之前
spark_home = r'D:\Programs\spark-2.4.5-bin-hadoop2.7'
python_home = r'D:\Programs\anaconda3\python'
findspark.init(spark_home,python_home)
import pyspark
 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('testPersist')\
    .config("master","local[4]")\
    .enableHiveSupport()\
    .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("Error")
sc.setCheckpointDir('D:\\code')
 
l = ["hello python","hello spark"]
rdd = sc.parallelize(l)
print(rdd.collect())
 
flatmaprdd = rdd.flatMap(lambda x:x.split(' '))
def fun(x):
    print("fun is call")
    return (x,1)
maprdd = flatmaprdd.map(fun)
 
reducerdd = maprdd.reduceByKey(lambda x,y:x+y)
 
reducerdd.collect() #action算子,觸發執行
 
grouprdd = maprdd.groupByKey()
 
grouprdd.collect()
 
 
 
#############################
##output--除去stage[Stage 1:>                                                          (0 + 8) / 8]
fun is call
fun is call
fun is call
fun is call
fun is call
fun is call
fun is call
fun is call
 
#不使用持久化,使得作業在執行ruducerdd.collect()與grouprdd.collect()時都會從頭運行,這導致map被執行了8次
 
#############################
import findspark
spark_home = r'D:\Programs\spark-2.4.5-bin-hadoop2.7'
python_home = r'D:\Programs\anaconda3\python'
findspark.init(spark_home,python_home)
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('testPersist')\
    .config("master","local[4]")\
    .enableHiveSupport()\
    .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("Error")
sc.setCheckpointDir('D:\\code')
 
l = ["hello python","hello spark"]
rdd = sc.parallelize(l)
print(rdd.collect())
 
flatmaprdd = rdd.flatMap(lambda x:x.split(' '))
def fun(x):
    print("fun is call")
    return (x,1)
maprdd = flatmaprdd.map(fun)
maprdd.cache() #執行持久化
reducerdd = maprdd.reduceByKey(lambda x,y:x+y)
 
reducerdd.collect() #action算子,觸發執行
 
grouprdd = maprdd.groupByKey()
 
grouprdd.collect()
 
##############################
##output
fun is call
fun is call
fun is call
fun is call
 
#使用rdd.cache()操作,完成了數據的重用,map操作僅執行4次
 
##############################

RDD下三種持久化的區別

cache pyspark文檔 源碼 demo

Persist this RDD with the default storage level (MEMORY_ONLY).

RDD cache
def cache(self):
    """
    Persist this RDD with the default storage level (`MEMORY_ONLY`).
    """
    self.is_cached = True
    self.persist(StorageLevel.MEMORY_ONLY)
    return self
1. cache底層調用persist實現,默認持久化至內存,效率較高,但是當內存占滿時將會出錯。
  1. cache屬於懶執行算子,需要進行action操作后才會在內存中持久化數據,會為rdd添加血緣關系,todebugstring()輸出如下,在rdd.cache()執行后並沒有增加血緣關系,而執行action算子后,多出一個-CachedPartitions: 8; MemorySize: 311.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B,表示內存中有空間存儲了該數據,程序會優先進行讀取。
rdd.toDebugString()
import findspark
spark_home = r'D:\Programs\spark-2.4.5-bin-hadoop2.7'
python_home = r'D:\Programs\anaconda3\python'
findspark.init(spark_home,python_home)
import pyspark
 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('testPersist')\
    .config("master","local[4]")\
    .enableHiveSupport()\
    .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("Error")
sc.setCheckpointDir('D:\\code')
l = ["hello python","hello spark"]
rdd = sc.parallelize(l)
print(rdd.collect())
 
flatmaprdd = rdd.flatMap(lambda x:x.split(' '))
def fun(x):
    print("fun is call")
    return (x,1)
maprdd = flatmaprdd.map(fun)
print(str(maprdd.toDebugString(),'utf-8'))
maprdd.cache()
# maprdd.checkpoint()
print(str(maprdd.toDebugString(),'utf-8'))
reducerdd = maprdd.reduceByKey(lambda x,y:x+y)
 
reducerdd.collect() #action算子,觸發執行
 
print(str(maprdd.toDebugString(),'utf-8'))
grouprdd = maprdd.groupByKey()
 
 
 
###########################
##output
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 []
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 []
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 [Memory Serialized 1x Replicated]
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 [Memory Serialized 1x Replicated]
fun is call
fun is call
fun is call
fun is call
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 [Memory Serialized 1x Replicated]
 |       CachedPartitions: 8; MemorySize: 311.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 [Memory Serialized 1x Replicated]

persist pyspark文檔 源碼

Set this RDD’s storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_ONLY).

RDD persist
def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):
    """
    Set this RDD's storage level to persist its values across operations
    after the first time it is computed. This can only be used to assign
    a new storage level if the RDD does not have a storage level set yet.
    If no storage level is specified defaults to (`MEMORY_ONLY`).
 
    Examples
    --------
    >>> rdd = sc.parallelize(["b", "a", "c"])
    >>> rdd.persist().is_cached
    True
    """
    self.is_cached = True
    javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
    self._jrdd.persist(javaStorageLevel)
    return self
1.persist與cache的不同在於cache屬於persist的特例,cache()與persist(StorageLevel.MEMORY_ONLY)等價,persist操作可以通過參數StorageLevel指定對應的存儲地址

StorageLevel (_useDisk:Boolen , _useMemory:Boolen , _useOffHeap:Boolen , _deserialized:Boolen , _replication:Int=1)

是否存入磁盤,是否存入內存,是否使用堆外內存,是否不進行序列化,副本數

checkpoint pyspark文檔 源碼 demo

Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint directory set with SparkContext.setCheckpointDir() and all references to its parent RDDs will be removed. This function must be called before any job has been executed on this RDD. It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.

RDD checkpoint

def checkpoint(self):
    """
    Mark this RDD for checkpointing. It will be saved to a file inside the
    checkpoint directory set with :meth:`SparkContext.setCheckpointDir` and
    all references to its parent RDDs will be removed. This function must
    be called before any job has been executed on this RDD. It is strongly
    recommended that this RDD is persisted in memory, otherwise saving it
    on a file will require recomputation.
    """
    self.is_checkpointed = True
    self._jrdd.rdd().checkpoint()
toDebugstring

import findspark
spark_home = r'D:\Programs\spark-2.4.5-bin-hadoop2.7'
python_home = r'D:\Programs\anaconda3\python'
findspark.init(spark_home,python_home)
import pyspark
 
from pyspark.sql import SparkSession
 
 
 
spark = SparkSession.builder.appName('testPersist')\
    .config("master","local[4]")\
    .enableHiveSupport()\
    .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("Error")
sc.setCheckpointDir('D:\\code')
list0 = ["hello python","hello spark"]
rdd0 = sc.parallelize(list0)
print(rdd0.collect())
 
flatmaprdd = rdd0.flatMap(lambda x:x.split(' '))
def fun(x):
    print("fun is call")
    return (x,1)
maprdd = flatmaprdd.map(fun)
print(str(maprdd.toDebugString(),'utf-8'))
# maprdd.persist()
maprdd.checkpoint()
print(str(maprdd.toDebugString(),'utf-8'))
print('-----------------------------------')
reducerdd = maprdd.reduceByKey(lambda x,y:x+y)
 
reducerdd.collect() #action算子,觸發執行
 
print(str(maprdd.toDebugString(),'utf-8'))
grouprdd = maprdd.groupByKey()
 
grouprdd.collect()
 
##############################
##output
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 []
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 []
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 []
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 []
-----------------------------------
fun is call
fun is call
fun is call
fun is call
fun is call
fun is call
fun is call
fun is call
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 []
 |  ReliableCheckpointRDD[7] at collect at D:/code/Test_CheckPoint.py:74 []
1.checkpoint 切斷了rdd的血緣關系,直接變成了ReliableCheckpointRDD[7] at collect at D:/code/Test_CheckPoint.py:74 []

2.checkpoint()為了保證數據的安全性,在存儲的時候會重新進行數據的獲取,所以有8個fun is call,並且是懶執行,在運行到.collect()才會觸發checkpoint操作

如果不希望checkpoint()重新對數據進行計算,可結合cache()/persist()使用

checkpoint&persist
list0 = ["hello python","hello spark"]
rdd0 = sc.parallelize(list0)
print(rdd0.collect())
 
flatmaprdd = rdd0.flatMap(lambda x:x.split(' '))
def fun(x):
    print("fun is call")
    return (x,1)
maprdd = flatmaprdd.map(fun)
print(str(maprdd.toDebugString(),'utf-8'))
maprdd.persist()
maprdd.checkpoint()
print(str(maprdd.toDebugString(),'utf-8'))
print('-----------------------------------')
reducerdd = maprdd.reduceByKey(lambda x,y:x+y)
 
print(reducerdd.collect()) #action算子,觸發執行
#maprdd.unpersist()
print(str(maprdd.toDebugString(),'utf-8'))
grouprdd = maprdd.groupByKey()
 
##############################
##output
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 []
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 []
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 [Memory Serialized 1x Replicated]
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 [Memory Serialized 1x Replicated]
-----------------------------------
fun is call
fun is call
fun is call
fun is call
[('python', 1), ('hello', 2), ('spark', 1)]
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 [Memory Serialized 1x Replicated]
 |       CachedPartitions: 8; MemorySize: 311.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
 |  ReliableCheckpointRDD[7] at collect at D:/code/Test_CheckPoint.py:74 [Memory Serialized 1x Replicated]
此時只有4個fun is call,多出兩個血緣關系,如果想去除persist保留在內存中的數據,可以使用unpersist操作,需要加在action之后,checkpoint存儲完畢,否則依然需要重新計算(輸出8個fun is call)

DataFrame的持久化

持久化操作Demo

df持久化
import findspark
spark_home = r'D:\Programs\spark-2.4.5-bin-hadoop2.7'
python_home = r'D:\Programs\anaconda3\python'
findspark.init(spark_home,python_home)
import pyspark
 
from pyspark.sql import SparkSession
 
 
spark = SparkSession.builder.appName('testPersist')\
    .config("master","local[4]")\
    .enableHiveSupport()\
    .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("Error")
sc.setCheckpointDir('D:\\code')
rdd1 = sc.parallelize([('Bob',13,65),('Alice',9,35),('Elf',45,24)])
rdd2 = sc.parallelize([('Bob',131,651),('Alice',91,351),('Elf',451,241)])
rdd3 = sc.parallelize([('Bob',132,652),('Alice',92,352),('Elf',452,242)])
rdd4 = sc.parallelize([('Bob',133,653),('Alice',93,353),('Elf',453,243)])
 
df1 = rdd1.toDF(['name','a','b'])
# df1.checkpoint()
df2 = rdd2.toDF(['name','aa','bb'])
# df2.checkpoint()
df3 = rdd3.toDF(['name','aaa','bbb'])
# df3.checkpoint()
df4 = rdd4.toDF(['name','aaaa','bbbb'])
# df4.checkpoint()
tmp1 = df1.join(df2,['name'],'inner')
# tmp1.persist()
# tmp1.checkpoint()
tmp1.show()
tmp2 = df3.join(df4,['name'],'inner')
# tmp2.persist()
# tmp2.checkpoint()
tmp2.show()
final = tmp1.join(tmp2,['name'],'inner')
# final.checkpoint()
final.explain()
final.show()
 
 
 
 
 
 
##############################
== Physical Plan ==
*(11) Project [name#0, a#1L, b#2L, aa#7L, bb#8L, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
+- *(11) SortMergeJoin [name#0], [name#12], Inner
   :- *(5) Project [name#0, a#1L, b#2L, aa#7L, bb#8L]
   :  +- *(5) SortMergeJoin [name#0], [name#6], Inner
   :     :- *(2) Sort [name#0 ASC NULLS FIRST], false, 0
   :     :  +- Exchange hashpartitioning(name#0, 200)
   :     :     +- *(1) Filter isnotnull(name#0)
   :     :        +- Scan ExistingRDD[name#0,a#1L,b#2L]
   :     +- *(4) Sort [name#6 ASC NULLS FIRST], false, 0
   :        +- Exchange hashpartitioning(name#6, 200)
   :           +- *(3) Filter isnotnull(name#6)
   :              +- Scan ExistingRDD[name#6,aa#7L,bb#8L]
   +- *(10) Project [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
      +- *(10) SortMergeJoin [name#12], [name#18], Inner
         :- *(7) Sort [name#12 ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(name#12, 200)
         :     +- *(6) Filter isnotnull(name#12)
         :        +- Scan ExistingRDD[name#12,aaa#13L,bbb#14L]
         +- *(9) Sort [name#18 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(name#18, 200)
               +- *(8) Filter isnotnull(name#18)
                  +- Scan ExistingRDD[name#18,aaaa#19L,bbbb#20L]
##############################
import findspark
spark_home = r'D:\Programs\spark-2.4.5-bin-hadoop2.7'
python_home = r'D:\Programs\anaconda3\python'
findspark.init(spark_home,python_home)
import pyspark
 
from pyspark.sql import SparkSession
 
 
spark = SparkSession.builder.appName('testPersist')\
    .config("master","local[4]")\
    .enableHiveSupport()\
    .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("Error")
sc.setCheckpointDir('D:\\code')
rdd1 = sc.parallelize([('Bob',13,65),('Alice',9,35),('Elf',45,24)])
rdd2 = sc.parallelize([('Bob',131,651),('Alice',91,351),('Elf',451,241)])
rdd3 = sc.parallelize([('Bob',132,652),('Alice',92,352),('Elf',452,242)])
rdd4 = sc.parallelize([('Bob',133,653),('Alice',93,353),('Elf',453,243)])
 
df1 = rdd1.toDF(['name','a','b'])
# df1.checkpoint()
df2 = rdd2.toDF(['name','aa','bb'])
# df2.checkpoint()
df3 = rdd3.toDF(['name','aaa','bbb'])
# df3.checkpoint()
df4 = rdd4.toDF(['name','aaaa','bbbb'])
# df4.checkpoint()
tmp1 = df1.join(df2,['name'],'inner')
tmp1.persist()
tmp1 = tmp1.checkpoint()
tmp1.show()
tmp2 = df3.join(df4,['name'],'inner')
tmp2.persist()
tmp2 = tmp2.checkpoint()
tmp2.show()
final = tmp1.join(tmp2,['name'],'inner')
# final.checkpoint()
final.explain()
final.show()
 
 
## 使用persist()后的執行計划
== Physical Plan ==
*(2) Project [name#0, a#1L, b#2L, aa#7L, bb#8L, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
+- *(2) BroadcastHashJoin [name#0], [name#12], Inner, BuildRight
   :- *(2) Filter isnotnull(name#0)
   :  +- InMemoryTableScan [name#0, a#1L, b#2L, aa#7L, bb#8L], [isnotnull(name#0)]
   :        +- InMemoryRelation [name#0, a#1L, b#2L, aa#7L, bb#8L], StorageLevel(disk, memory, 1 replicas)
   :              +- *(5) Project [name#0, a#1L, b#2L, aa#7L, bb#8L]
   :                 +- *(5) SortMergeJoin [name#0], [name#6], Inner
   :                    :- *(2) Sort [name#0 ASC NULLS FIRST], false, 0
   :                    :  +- Exchange hashpartitioning(name#0, 200)
   :                    :     +- *(1) Filter isnotnull(name#0)
   :                    :        +- Scan ExistingRDD[name#0,a#1L,b#2L]
   :                    +- *(4) Sort [name#6 ASC NULLS FIRST], false, 0
   :                       +- Exchange hashpartitioning(name#6, 200)
   :                          +- *(3) Filter isnotnull(name#6)
   :                             +- Scan ExistingRDD[name#6,aa#7L,bb#8L]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
      +- *(1) Filter isnotnull(name#12)
         +- InMemoryTableScan [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L], [isnotnull(name#12)]
               +- InMemoryRelation [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L], StorageLevel(disk, memory, 1 replicas)
                     +- *(5) Project [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
                        +- *(5) SortMergeJoin [name#12], [name#18], Inner
                           :- *(2) Sort [name#12 ASC NULLS FIRST], false, 0
                           :  +- Exchange hashpartitioning(name#12, 200)
                           :     +- *(1) Filter isnotnull(name#12)
                           :        +- Scan ExistingRDD[name#12,aaa#13L,bbb#14L]
                           +- *(4) Sort [name#18 ASC NULLS FIRST], false, 0
                              +- Exchange hashpartitioning(name#18, 200)
                                 +- *(3) Filter isnotnull(name#18)
                                    +- Scan ExistingRDD[name#18,aaaa#19L,bbbb#20L]
#############################
##使用checkpoint后的執行計划
== Parsed Logical Plan ==
'Join UsingJoin(Inner,Buffer(name))
:- LogicalRDD [name#0, a#1L, b#2L, aa#7L, bb#8L], false
+- LogicalRDD [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L], false
 
== Analyzed Logical Plan ==
name: string, a: bigint, b: bigint, aa: bigint, bb: bigint, aaa: bigint, bbb: bigint, aaaa: bigint, bbbb: bigint
Project [name#0, a#1L, b#2L, aa#7L, bb#8L, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
+- Join Inner, (name#0 = name#12)
   :- LogicalRDD [name#0, a#1L, b#2L, aa#7L, bb#8L], false
   +- LogicalRDD [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L], false
 
== Optimized Logical Plan ==
Project [name#0, a#1L, b#2L, aa#7L, bb#8L, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
+- Join Inner, (name#0 = name#12)
   :- Filter isnotnull(name#0)
   :  +- LogicalRDD [name#0, a#1L, b#2L, aa#7L, bb#8L], false
   +- Filter isnotnull(name#12)
      +- LogicalRDD [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L], false
 
== Physical Plan ==
*(3) Project [name#0, a#1L, b#2L, aa#7L, bb#8L, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
+- *(3) SortMergeJoin [name#0], [name#12], Inner
   :- *(1) Filter isnotnull(name#0)
   :  +- Scan ExistingRDD[name#0,a#1L,b#2L,aa#7L,bb#8L]
   +- *(2) Filter isnotnull(name#12)
      +- Scan ExistingRDD[name#12,aaa#13L,bbb#14L,aaaa#19L,bbbb#20L]
  1. 由於dataframe沒有todebugstring() 所以使用explain()代替,觀察執行計划

  2. 使用persist()后執行計划中多出InMemoryTableScan條目

  3. 使用checkpoint()后執行計划截斷了tmp1和tmp2的執行,轉而變成了 +- Scan ExistingRDD[name#12,aaa#13L,bbb#14L,aaaa#19L,bbbb#20L]

DAG圖記錄

不使用Checkpoint的DAG

使用Checkpoint的DAG

job checkpoint persist() 使用與否的DAG圖

使用persist后的Checkpoint

不使用persist的Checkpoint
重要注:
DataFrame的checkpoint與RDD的實現不同
DataFrame返回了一個新對象

df&rdd checkpoint
rdd = spark.sparkContext.parallelize([[1,2,3]])
persisted_rdd = rdd.persist()
rdd == persisted_rdd
out: True

df = rdd.toDF(['A','B','C'])
spark.sparkContext.setCheckpointDir('.')
checkpointed_df = df.checkpoint()
checkpointed_df == df
out:False

type(checkpointed_df)
out:DataFrame

checkpointed_rdd = rdd.checkpoint()
checkpointed_rdd == rdd
out:False

type(checkpointed_rdd)
out:NoneType

Dataframe的三種持久化的區別

cache pyspark文檔 源碼

Persists the DataFrame with the default storage level (MEMORY_AND_DISK).

df cache

def cache(self):
    """Persists the :class:`DataFrame` with the default storage level (`MEMORY_AND_DISK`).
 
    .. versionadded:: 1.3.0
 
    Notes
    -----
    The default storage level has changed to `MEMORY_AND_DISK` to match Scala in 2.0.
    """
    self.is_cached = True
    self._jdf.cache()
    return self

persist pyspark文檔 源碼

Sets the storage level to persist the contents of the DataFrame across operations after the first time it is computed. This can only be used to assign a new storage level if the DataFrame does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_AND_DISK_DESER)

df persist
def persist(self, storageLevel=StorageLevel.MEMORY_AND_DISK_DESER):
    """Sets the storage level to persist the contents of the :class:`DataFrame` across
    operations after the first time it is computed. This can only be used to assign
    a new storage level if the :class:`DataFrame` does not have a storage level set yet.
    If no storage level is specified defaults to (`MEMORY_AND_DISK_DESER`)
 
    .. versionadded:: 1.3.0
 
    Notes
    -----
    The default storage level has changed to `MEMORY_AND_DISK_DESER` to match Scala in 3.0.
    """
    self.is_cached = True
    javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel)
    self._jdf.persist(javaStorageLevel)
    return self

checkpoint pyspark文檔 源碼

Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the logical plan of this DataFrame, which is especially useful in iterative algorithms where the plan may grow exponentially. It will be saved to files inside the checkpoint directory set with SparkContext.setCheckpointDir().

df checkpoint

def checkpoint(self, eager=True):
    """Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the
    logical plan of this :class:`DataFrame`, which is especially useful in iterative algorithms
    where the plan may grow exponentially. It will be saved to files inside the checkpoint
    directory set with :meth:`SparkContext.setCheckpointDir`.
 
    .. versionadded:: 2.1.0
 
    Parameters
    ----------
    eager : bool, optional
        Whether to checkpoint this :class:`DataFrame` immediately
 
    Notes
    -----
    This API is experimental.
    """
    jdf = self._jdf.checkpoint(eager)
    return DataFrame(jdf, self.sql_ctx)
 
##############################################################
/**
* Returns a checkpointed version of this Dataset.
*
* @param eager Whether to checkpoint this dataframe immediately
* @param reliableCheckpoint Whether to create a reliable checkpoint saved to files inside the
*                           checkpoint directory. If false creates a local checkpoint using
*                           the caching subsystem
*/
private def checkpoint(eager: Boolean, reliableCheckpoint: Boolean): Dataset[T] = {
val actionName = if (reliableCheckpoint) "checkpoint" else "localCheckpoint"
withAction(actionName, queryExecution) { physicalPlan =>
  val internalRdd = physicalPlan.execute().map(_.copy())
  if (reliableCheckpoint) {
    internalRdd.checkpoint()
  } else {
    internalRdd.localCheckpoint()
  }
 
  if (eager) {
    internalRdd.count()
  }

參考資料

https://blog.csdn.net/liuwei063608/article/details/79805901 關於checkpoint的探索
https://www.cnblogs.com/renyang/p/12597350.html 三種持久化方法的對比
https://blog.csdn.net/don_chiang709/article/details/84065510 Storage子模塊分析
http://spark.apache.org/docs/latest/api/python/search.html?q=checkpoint# pyspark源碼
https://github.com/apache/spark/blob/v3.1.2/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala spark源碼


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM