結論
- cache操作通過調用persist實現,默認將數據持久化至內存(RDD)內存和硬盤(DataFrame),效率較高,存在內存溢出等潛在風險。
- persist操作可通過參數調節持久化地址,內存,硬盤,堆外內存,是否序列化,存儲副本數,存儲文件為臨時文件,作業完成后數據文件自動刪除。
- checkpoint操作,將數據持久化至硬盤,會切斷血緣,存在磁盤IO操作,速度較慢,作業完成后數據文件不會自動刪除。
注:
- 當我們對RDD進行checkpoint操作時,只是暫時加上標記,表明該RDD需要被checkpoint,在之后的action操作后,runJob計算完RDD后,才會進行doCheckpoint操作,也就是數據進行持久化的過程,RDD需要被重新生成,第二次計算或者從persist后的存儲區中讀取。
- DataFrame在進行checkpoint操作時,默認參數eager=True,會立刻進行一次count的action,這樣就完成了DataFrame數據的獲取,然后返回一個新的DataFrame,以此清理掉了前序依賴,降低DAG和physicalPlan復雜度。
持久化數據原因——lazy evaluation
Spark框架有惰性評估(lazy evaluation)性質,也稱懶執行性質,懶執行就是等到絕對需要時才執行計算。在Spark中,當用戶表達一些對數據的操作時,不是立即修改數據,而是建立一個作用到原始數據的轉換計划。Spark首先會將這個計划編譯為可以在集群中高效運行的流水線式的物理執行計划,然后等待,直到最后時刻才開始執行計算。
lazy evaluation的意義與影響:
- 不運行action就不觸發計算,避免了大量的無意義的計算,即避免了大量的無意義的中間結果的產生,即避免產生無意義的磁盤I/O及網絡傳輸
- 更深層次的意義在於,執行運算時,看到之前的計算操作越多,執行優化的可能性就越高
- 不保留數據只保留操作流程的性質使得對象可重用,但數據不可重用,使得某些場景下需要使用持久化
常見使用場景
1. 存在遞歸式Join操作 eg:[Sample,feature1,feature2,...,featureN]→Table
由於Join屬於transformation算子,不屬於action算子,由於懶執行性質,每一次的join並不會執行,只是記錄執行計划,在最后table.show()時才會執行,這導致spark會在重復join操作時形成十分復雜的依賴關系。由於存在復雜的依賴關系,在引擎進行計算時,會不停出現資源申請和回收操作,最終導致任務的失敗,加入持久化算子后,如DF.persist()后,會添加-InMemoryTableScan依賴,達到數據的持久化,避免了每次都需要從頭運行的問題。
2. 存在數據重用 eg:df -操作A->A結果;df-操作B→B結果
由於懶執行性質,對象可以重用,但數據並不能重用,如果需要對同一個RDD/DF進行不同操作以得到不同結果時,若不進行持久化操作,將會從頭開始計算數據,使用數據持久化可以避免對一個RDD反復進行計算。提高作業運行效率。
3. 注重數據安全性 eg: 數據可能丟失的情況下,還要保證高性能
當數據丟失時,Spark會依據數據的依賴重新計算數據,若想保證運行效率,則需要對一些關鍵數據進行checkpoint操作,寫入HDFS或本地磁盤中,保證數據的安全。
RDD的持久化
持久化使用記錄
點擊查看RDD重用demo
import findspark #如果要使用findspark配置,必須寫在import pyspark之前
spark_home = r'D:\Programs\spark-2.4.5-bin-hadoop2.7'
python_home = r'D:\Programs\anaconda3\python'
findspark.init(spark_home,python_home)
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('testPersist')\
.config("master","local[4]")\
.enableHiveSupport()\
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("Error")
sc.setCheckpointDir('D:\\code')
l = ["hello python","hello spark"]
rdd = sc.parallelize(l)
print(rdd.collect())
flatmaprdd = rdd.flatMap(lambda x:x.split(' '))
def fun(x):
print("fun is call")
return (x,1)
maprdd = flatmaprdd.map(fun)
reducerdd = maprdd.reduceByKey(lambda x,y:x+y)
reducerdd.collect() #action算子,觸發執行
grouprdd = maprdd.groupByKey()
grouprdd.collect()
#############################
##output--除去stage[Stage 1:> (0 + 8) / 8]
fun is call
fun is call
fun is call
fun is call
fun is call
fun is call
fun is call
fun is call
#不使用持久化,使得作業在執行ruducerdd.collect()與grouprdd.collect()時都會從頭運行,這導致map被執行了8次
#############################
import findspark
spark_home = r'D:\Programs\spark-2.4.5-bin-hadoop2.7'
python_home = r'D:\Programs\anaconda3\python'
findspark.init(spark_home,python_home)
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('testPersist')\
.config("master","local[4]")\
.enableHiveSupport()\
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("Error")
sc.setCheckpointDir('D:\\code')
l = ["hello python","hello spark"]
rdd = sc.parallelize(l)
print(rdd.collect())
flatmaprdd = rdd.flatMap(lambda x:x.split(' '))
def fun(x):
print("fun is call")
return (x,1)
maprdd = flatmaprdd.map(fun)
maprdd.cache() #執行持久化
reducerdd = maprdd.reduceByKey(lambda x,y:x+y)
reducerdd.collect() #action算子,觸發執行
grouprdd = maprdd.groupByKey()
grouprdd.collect()
##############################
##output
fun is call
fun is call
fun is call
fun is call
#使用rdd.cache()操作,完成了數據的重用,map操作僅執行4次
##############################
RDD下三種持久化的區別
cache pyspark文檔 源碼 demo
Persist this RDD with the default storage level (MEMORY_ONLY).
RDD cache
def cache(self):
"""
Persist this RDD with the default storage level (`MEMORY_ONLY`).
"""
self.is_cached = True
self.persist(StorageLevel.MEMORY_ONLY)
return self
- cache屬於懶執行算子,需要進行action操作后才會在內存中持久化數據,會為rdd添加血緣關系,todebugstring()輸出如下,在rdd.cache()執行后並沒有增加血緣關系,而執行action算子后,多出一個-CachedPartitions: 8; MemorySize: 311.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B,表示內存中有空間存儲了該數據,程序會優先進行讀取。
rdd.toDebugString()
import findspark
spark_home = r'D:\Programs\spark-2.4.5-bin-hadoop2.7'
python_home = r'D:\Programs\anaconda3\python'
findspark.init(spark_home,python_home)
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('testPersist')\
.config("master","local[4]")\
.enableHiveSupport()\
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("Error")
sc.setCheckpointDir('D:\\code')
l = ["hello python","hello spark"]
rdd = sc.parallelize(l)
print(rdd.collect())
flatmaprdd = rdd.flatMap(lambda x:x.split(' '))
def fun(x):
print("fun is call")
return (x,1)
maprdd = flatmaprdd.map(fun)
print(str(maprdd.toDebugString(),'utf-8'))
maprdd.cache()
# maprdd.checkpoint()
print(str(maprdd.toDebugString(),'utf-8'))
reducerdd = maprdd.reduceByKey(lambda x,y:x+y)
reducerdd.collect() #action算子,觸發執行
print(str(maprdd.toDebugString(),'utf-8'))
grouprdd = maprdd.groupByKey()
###########################
##output
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 []
| ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 []
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 [Memory Serialized 1x Replicated]
| ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 [Memory Serialized 1x Replicated]
fun is call
fun is call
fun is call
fun is call
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 [Memory Serialized 1x Replicated]
| CachedPartitions: 8; MemorySize: 311.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 [Memory Serialized 1x Replicated]
persist pyspark文檔 源碼
Set this RDD’s storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_ONLY).
RDD persist
def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):
"""
Set this RDD's storage level to persist its values across operations
after the first time it is computed. This can only be used to assign
a new storage level if the RDD does not have a storage level set yet.
If no storage level is specified defaults to (`MEMORY_ONLY`).
Examples
--------
>>> rdd = sc.parallelize(["b", "a", "c"])
>>> rdd.persist().is_cached
True
"""
self.is_cached = True
javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
self._jrdd.persist(javaStorageLevel)
return self
StorageLevel (_useDisk:Boolen , _useMemory:Boolen , _useOffHeap:Boolen , _deserialized:Boolen , _replication:Int=1)
是否存入磁盤,是否存入內存,是否使用堆外內存,是否不進行序列化,副本數
checkpoint pyspark文檔 源碼 demo
Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint directory set with SparkContext.setCheckpointDir() and all references to its parent RDDs will be removed. This function must be called before any job has been executed on this RDD. It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.
RDD checkpoint
def checkpoint(self):
"""
Mark this RDD for checkpointing. It will be saved to a file inside the
checkpoint directory set with :meth:`SparkContext.setCheckpointDir` and
all references to its parent RDDs will be removed. This function must
be called before any job has been executed on this RDD. It is strongly
recommended that this RDD is persisted in memory, otherwise saving it
on a file will require recomputation.
"""
self.is_checkpointed = True
self._jrdd.rdd().checkpoint()
toDebugstring
import findspark
spark_home = r'D:\Programs\spark-2.4.5-bin-hadoop2.7'
python_home = r'D:\Programs\anaconda3\python'
findspark.init(spark_home,python_home)
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('testPersist')\
.config("master","local[4]")\
.enableHiveSupport()\
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("Error")
sc.setCheckpointDir('D:\\code')
list0 = ["hello python","hello spark"]
rdd0 = sc.parallelize(list0)
print(rdd0.collect())
flatmaprdd = rdd0.flatMap(lambda x:x.split(' '))
def fun(x):
print("fun is call")
return (x,1)
maprdd = flatmaprdd.map(fun)
print(str(maprdd.toDebugString(),'utf-8'))
# maprdd.persist()
maprdd.checkpoint()
print(str(maprdd.toDebugString(),'utf-8'))
print('-----------------------------------')
reducerdd = maprdd.reduceByKey(lambda x,y:x+y)
reducerdd.collect() #action算子,觸發執行
print(str(maprdd.toDebugString(),'utf-8'))
grouprdd = maprdd.groupByKey()
grouprdd.collect()
##############################
##output
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 []
| ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 []
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 []
| ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 []
-----------------------------------
fun is call
fun is call
fun is call
fun is call
fun is call
fun is call
fun is call
fun is call
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 []
| ReliableCheckpointRDD[7] at collect at D:/code/Test_CheckPoint.py:74 []
2.checkpoint()為了保證數據的安全性,在存儲的時候會重新進行數據的獲取,所以有8個fun is call,並且是懶執行,在運行到.collect()才會觸發checkpoint操作
如果不希望checkpoint()重新對數據進行計算,可結合cache()/persist()使用
checkpoint&persist
list0 = ["hello python","hello spark"]
rdd0 = sc.parallelize(list0)
print(rdd0.collect())
flatmaprdd = rdd0.flatMap(lambda x:x.split(' '))
def fun(x):
print("fun is call")
return (x,1)
maprdd = flatmaprdd.map(fun)
print(str(maprdd.toDebugString(),'utf-8'))
maprdd.persist()
maprdd.checkpoint()
print(str(maprdd.toDebugString(),'utf-8'))
print('-----------------------------------')
reducerdd = maprdd.reduceByKey(lambda x,y:x+y)
print(reducerdd.collect()) #action算子,觸發執行
#maprdd.unpersist()
print(str(maprdd.toDebugString(),'utf-8'))
grouprdd = maprdd.groupByKey()
##############################
##output
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 []
| ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 []
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 [Memory Serialized 1x Replicated]
| ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 [Memory Serialized 1x Replicated]
-----------------------------------
fun is call
fun is call
fun is call
fun is call
[('python', 1), ('hello', 2), ('spark', 1)]
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 [Memory Serialized 1x Replicated]
| CachedPartitions: 8; MemorySize: 311.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ReliableCheckpointRDD[7] at collect at D:/code/Test_CheckPoint.py:74 [Memory Serialized 1x Replicated]
DataFrame的持久化
持久化操作Demo
df持久化
import findspark
spark_home = r'D:\Programs\spark-2.4.5-bin-hadoop2.7'
python_home = r'D:\Programs\anaconda3\python'
findspark.init(spark_home,python_home)
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('testPersist')\
.config("master","local[4]")\
.enableHiveSupport()\
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("Error")
sc.setCheckpointDir('D:\\code')
rdd1 = sc.parallelize([('Bob',13,65),('Alice',9,35),('Elf',45,24)])
rdd2 = sc.parallelize([('Bob',131,651),('Alice',91,351),('Elf',451,241)])
rdd3 = sc.parallelize([('Bob',132,652),('Alice',92,352),('Elf',452,242)])
rdd4 = sc.parallelize([('Bob',133,653),('Alice',93,353),('Elf',453,243)])
df1 = rdd1.toDF(['name','a','b'])
# df1.checkpoint()
df2 = rdd2.toDF(['name','aa','bb'])
# df2.checkpoint()
df3 = rdd3.toDF(['name','aaa','bbb'])
# df3.checkpoint()
df4 = rdd4.toDF(['name','aaaa','bbbb'])
# df4.checkpoint()
tmp1 = df1.join(df2,['name'],'inner')
# tmp1.persist()
# tmp1.checkpoint()
tmp1.show()
tmp2 = df3.join(df4,['name'],'inner')
# tmp2.persist()
# tmp2.checkpoint()
tmp2.show()
final = tmp1.join(tmp2,['name'],'inner')
# final.checkpoint()
final.explain()
final.show()
##############################
== Physical Plan ==
*(11) Project [name#0, a#1L, b#2L, aa#7L, bb#8L, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
+- *(11) SortMergeJoin [name#0], [name#12], Inner
:- *(5) Project [name#0, a#1L, b#2L, aa#7L, bb#8L]
: +- *(5) SortMergeJoin [name#0], [name#6], Inner
: :- *(2) Sort [name#0 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(name#0, 200)
: : +- *(1) Filter isnotnull(name#0)
: : +- Scan ExistingRDD[name#0,a#1L,b#2L]
: +- *(4) Sort [name#6 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#6, 200)
: +- *(3) Filter isnotnull(name#6)
: +- Scan ExistingRDD[name#6,aa#7L,bb#8L]
+- *(10) Project [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
+- *(10) SortMergeJoin [name#12], [name#18], Inner
:- *(7) Sort [name#12 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#12, 200)
: +- *(6) Filter isnotnull(name#12)
: +- Scan ExistingRDD[name#12,aaa#13L,bbb#14L]
+- *(9) Sort [name#18 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#18, 200)
+- *(8) Filter isnotnull(name#18)
+- Scan ExistingRDD[name#18,aaaa#19L,bbbb#20L]
##############################
import findspark
spark_home = r'D:\Programs\spark-2.4.5-bin-hadoop2.7'
python_home = r'D:\Programs\anaconda3\python'
findspark.init(spark_home,python_home)
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('testPersist')\
.config("master","local[4]")\
.enableHiveSupport()\
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("Error")
sc.setCheckpointDir('D:\\code')
rdd1 = sc.parallelize([('Bob',13,65),('Alice',9,35),('Elf',45,24)])
rdd2 = sc.parallelize([('Bob',131,651),('Alice',91,351),('Elf',451,241)])
rdd3 = sc.parallelize([('Bob',132,652),('Alice',92,352),('Elf',452,242)])
rdd4 = sc.parallelize([('Bob',133,653),('Alice',93,353),('Elf',453,243)])
df1 = rdd1.toDF(['name','a','b'])
# df1.checkpoint()
df2 = rdd2.toDF(['name','aa','bb'])
# df2.checkpoint()
df3 = rdd3.toDF(['name','aaa','bbb'])
# df3.checkpoint()
df4 = rdd4.toDF(['name','aaaa','bbbb'])
# df4.checkpoint()
tmp1 = df1.join(df2,['name'],'inner')
tmp1.persist()
tmp1 = tmp1.checkpoint()
tmp1.show()
tmp2 = df3.join(df4,['name'],'inner')
tmp2.persist()
tmp2 = tmp2.checkpoint()
tmp2.show()
final = tmp1.join(tmp2,['name'],'inner')
# final.checkpoint()
final.explain()
final.show()
## 使用persist()后的執行計划
== Physical Plan ==
*(2) Project [name#0, a#1L, b#2L, aa#7L, bb#8L, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
+- *(2) BroadcastHashJoin [name#0], [name#12], Inner, BuildRight
:- *(2) Filter isnotnull(name#0)
: +- InMemoryTableScan [name#0, a#1L, b#2L, aa#7L, bb#8L], [isnotnull(name#0)]
: +- InMemoryRelation [name#0, a#1L, b#2L, aa#7L, bb#8L], StorageLevel(disk, memory, 1 replicas)
: +- *(5) Project [name#0, a#1L, b#2L, aa#7L, bb#8L]
: +- *(5) SortMergeJoin [name#0], [name#6], Inner
: :- *(2) Sort [name#0 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(name#0, 200)
: : +- *(1) Filter isnotnull(name#0)
: : +- Scan ExistingRDD[name#0,a#1L,b#2L]
: +- *(4) Sort [name#6 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#6, 200)
: +- *(3) Filter isnotnull(name#6)
: +- Scan ExistingRDD[name#6,aa#7L,bb#8L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(name#12)
+- InMemoryTableScan [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L], [isnotnull(name#12)]
+- InMemoryRelation [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L], StorageLevel(disk, memory, 1 replicas)
+- *(5) Project [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
+- *(5) SortMergeJoin [name#12], [name#18], Inner
:- *(2) Sort [name#12 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#12, 200)
: +- *(1) Filter isnotnull(name#12)
: +- Scan ExistingRDD[name#12,aaa#13L,bbb#14L]
+- *(4) Sort [name#18 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#18, 200)
+- *(3) Filter isnotnull(name#18)
+- Scan ExistingRDD[name#18,aaaa#19L,bbbb#20L]
#############################
##使用checkpoint后的執行計划
== Parsed Logical Plan ==
'Join UsingJoin(Inner,Buffer(name))
:- LogicalRDD [name#0, a#1L, b#2L, aa#7L, bb#8L], false
+- LogicalRDD [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L], false
== Analyzed Logical Plan ==
name: string, a: bigint, b: bigint, aa: bigint, bb: bigint, aaa: bigint, bbb: bigint, aaaa: bigint, bbbb: bigint
Project [name#0, a#1L, b#2L, aa#7L, bb#8L, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
+- Join Inner, (name#0 = name#12)
:- LogicalRDD [name#0, a#1L, b#2L, aa#7L, bb#8L], false
+- LogicalRDD [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L], false
== Optimized Logical Plan ==
Project [name#0, a#1L, b#2L, aa#7L, bb#8L, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
+- Join Inner, (name#0 = name#12)
:- Filter isnotnull(name#0)
: +- LogicalRDD [name#0, a#1L, b#2L, aa#7L, bb#8L], false
+- Filter isnotnull(name#12)
+- LogicalRDD [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L], false
== Physical Plan ==
*(3) Project [name#0, a#1L, b#2L, aa#7L, bb#8L, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
+- *(3) SortMergeJoin [name#0], [name#12], Inner
:- *(1) Filter isnotnull(name#0)
: +- Scan ExistingRDD[name#0,a#1L,b#2L,aa#7L,bb#8L]
+- *(2) Filter isnotnull(name#12)
+- Scan ExistingRDD[name#12,aaa#13L,bbb#14L,aaaa#19L,bbbb#20L]
-
由於dataframe沒有todebugstring() 所以使用explain()代替,觀察執行計划
-
使用persist()后執行計划中多出InMemoryTableScan條目
-
使用checkpoint()后執行計划截斷了tmp1和tmp2的執行,轉而變成了 +- Scan ExistingRDD[name#12,aaa#13L,bbb#14L,aaaa#19L,bbbb#20L]
DAG圖記錄
不使用Checkpoint的DAG
使用Checkpoint的DAG
job checkpoint persist() 使用與否的DAG圖
重要注:
DataFrame的checkpoint與RDD的實現不同
DataFrame返回了一個新對象
df&rdd checkpoint
rdd = spark.sparkContext.parallelize([[1,2,3]])
persisted_rdd = rdd.persist()
rdd == persisted_rdd
out: True
df = rdd.toDF(['A','B','C'])
spark.sparkContext.setCheckpointDir('.')
checkpointed_df = df.checkpoint()
checkpointed_df == df
out:False
type(checkpointed_df)
out:DataFrame
checkpointed_rdd = rdd.checkpoint()
checkpointed_rdd == rdd
out:False
type(checkpointed_rdd)
out:NoneType
Dataframe的三種持久化的區別
cache pyspark文檔 源碼
Persists the DataFrame with the default storage level (MEMORY_AND_DISK).
df cache
def cache(self):
"""Persists the :class:`DataFrame` with the default storage level (`MEMORY_AND_DISK`).
.. versionadded:: 1.3.0
Notes
-----
The default storage level has changed to `MEMORY_AND_DISK` to match Scala in 2.0.
"""
self.is_cached = True
self._jdf.cache()
return self
persist pyspark文檔 源碼
Sets the storage level to persist the contents of the DataFrame across operations after the first time it is computed. This can only be used to assign a new storage level if the DataFrame does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_AND_DISK_DESER)
df persist
def persist(self, storageLevel=StorageLevel.MEMORY_AND_DISK_DESER):
"""Sets the storage level to persist the contents of the :class:`DataFrame` across
operations after the first time it is computed. This can only be used to assign
a new storage level if the :class:`DataFrame` does not have a storage level set yet.
If no storage level is specified defaults to (`MEMORY_AND_DISK_DESER`)
.. versionadded:: 1.3.0
Notes
-----
The default storage level has changed to `MEMORY_AND_DISK_DESER` to match Scala in 3.0.
"""
self.is_cached = True
javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel)
self._jdf.persist(javaStorageLevel)
return self
checkpoint pyspark文檔 源碼
Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the logical plan of this DataFrame, which is especially useful in iterative algorithms where the plan may grow exponentially. It will be saved to files inside the checkpoint directory set with SparkContext.setCheckpointDir().
df checkpoint
def checkpoint(self, eager=True):
"""Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the
logical plan of this :class:`DataFrame`, which is especially useful in iterative algorithms
where the plan may grow exponentially. It will be saved to files inside the checkpoint
directory set with :meth:`SparkContext.setCheckpointDir`.
.. versionadded:: 2.1.0
Parameters
----------
eager : bool, optional
Whether to checkpoint this :class:`DataFrame` immediately
Notes
-----
This API is experimental.
"""
jdf = self._jdf.checkpoint(eager)
return DataFrame(jdf, self.sql_ctx)
##############################################################
/**
* Returns a checkpointed version of this Dataset.
*
* @param eager Whether to checkpoint this dataframe immediately
* @param reliableCheckpoint Whether to create a reliable checkpoint saved to files inside the
* checkpoint directory. If false creates a local checkpoint using
* the caching subsystem
*/
private def checkpoint(eager: Boolean, reliableCheckpoint: Boolean): Dataset[T] = {
val actionName = if (reliableCheckpoint) "checkpoint" else "localCheckpoint"
withAction(actionName, queryExecution) { physicalPlan =>
val internalRdd = physicalPlan.execute().map(_.copy())
if (reliableCheckpoint) {
internalRdd.checkpoint()
} else {
internalRdd.localCheckpoint()
}
if (eager) {
internalRdd.count()
}
參考資料
https://blog.csdn.net/liuwei063608/article/details/79805901 關於checkpoint的探索
https://www.cnblogs.com/renyang/p/12597350.html 三種持久化方法的對比
https://blog.csdn.net/don_chiang709/article/details/84065510 Storage子模塊分析
http://spark.apache.org/docs/latest/api/python/search.html?q=checkpoint# pyspark源碼
https://github.com/apache/spark/blob/v3.1.2/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala spark源碼