一、前述
Spark中控制算子也是懶執行的,需要Action算子觸發才能執行,主要是為了對數據進行緩存。
控制算子有三種,cache,persist,checkpoint,以上算子都可以將RDD持久化,持久化的單位是partition。cache和persist都是懶執行的。必須有一個action類算子觸發執行。checkpoint算子不僅能將RDD持久化到磁盤,還能切斷RDD之間的依賴關系。
二、具體算子
1、 cache
默認將RDD的數據持久化到內存中。cache是懶執行。
chche () = persist()=persist(StorageLevel.Memory_Only)
SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("CacheTest"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> lines = jsc.textFile("./NASA_access_log_Aug95"); lines = lines.cache(); long startTime = System.currentTimeMillis(); long count = lines.count();//count是action算子,到這里才能觸發cache執行,所以這一次coun加載是從磁盤讀數據,然后拉回到drive端。 long endTime = System.currentTimeMillis(); System.out.println("共"+count+ "條數據,"+"初始化時間+cache時間+計算時間="+ (endTime-startTime)); long countStartTime = System.currentTimeMillis(); long countrResult = lines.count();//這一次是從內存種中讀數據 long countEndTime = System.currentTimeMillis(); System.out.println("共"+countrResult+ "條數據,"+"計算時間="+ (countEndTime- countStartTime)); jsc.stop();
2、persist(可以指定持久化的級別)

解釋:
1、MEMORY_AND_DISK 意思是先往內存中放數據,內存不夠再放磁盤
2、最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2”表示有副本數。
3、選擇的原則是:首先考慮內存,然后考慮序列化之后再放入內存,最后考慮內存加磁盤。
4、盡量避免使用“_2”和DISK_ONLY級別。
5、deserialized是不序列化的意思。
注意事項:
- 1、cache和persist都是懶執行,必須有一個action類算子觸發執行。
- 2、cache和persist算子的返回值可以賦值給一個變量,在其他job中直接使用這個變量就是使用持久化的數據了。持久化的單位是partition。
- 3、cache和persist算子后不能立即緊跟action算子。
錯誤:
rdd.cache().count() 返回的不是持久化的RDD,而是一個數值了
3、 Checkpoint(對Lineage非常長時使用)
1、概念和特征:
不僅可以將數據持久化到磁盤,還可以切斷RDD之間的依賴關系,checkpoint也是懶執行。
Checkpoin不僅存儲結果,還會存儲邏輯,還可以存儲元數據。
Persisit切斷不了RDD的依賴關系。
2、checkpoint 的執行原理:
2.1.Spark job執行完之后,spark會從finalRDD從后往前回溯。
2.2.當回溯到對某個RDD進行了checkpoint,會對這個RDD標記。
2.3.回溯完成之后,Spark會重新計算標記RDD的結果,然后將結果保存到Checkpint目錄中。
3、優化checekpoint
- 因為最后是要觸發當前application的action算子,所以在觸發之前加一層cache操作,一樣會往前執行cache操作,實現對數據的cache ,所以考慮將cache優化到checkpoin的優化流程里。
- 對RDD執行checkpoint之前,最好對這個RDD先執行cache,這樣新啟動的job(回溯完成之后重新開的job)只需要將內存中的數據(cache緩存好的checkpoint那個點的數據)拷貝到HDFS上就可以。
- 省去了重新計算這一步,不需要重頭開始來走到checkpoint這個點了。
總結:
持久化的最小單位是partition!!!
