【Spark篇】---Spark中控制算子


一、前述

 

Spark中控制算子也是懶執行的,需要Action算子觸發才能執行,主要是為了對數據進行緩存。

控制算子有三種,cache,persist,checkpoint以上算子都可以將RDD持久化持久化的單位是partitioncache和persist都是懶執行的。必須有一個action類算子觸發執行。checkpoint算子不僅能將RDD持久化到磁盤,還能切斷RDD之間的依賴關系。

 

二、具體算子

 

1、 cache

 

默認將RDD的數據持久化到內存cache是執行

chche () = persist()=persist(StorageLevel.Memory_Only)

 

 SparkConf conf = new SparkConf();
 conf.setMaster("local").setAppName("CacheTest");
 JavaSparkContext jsc = new JavaSparkContext(conf);
 JavaRDD<String> lines = jsc.textFile("./NASA_access_log_Aug95");

 lines = lines.cache();
 long startTime = System.currentTimeMillis();
 long count = lines.count();//count是action算子,到這里才能觸發cache執行,所以這一次coun加載是從磁盤讀數據,然后拉回到drive端。 long endTime = System.currentTimeMillis();
 System.out.println("共"+count+ "條數據,"+"初始化時間+cache時間+計算時間="+ 
          (endTime-startTime));
        
 long countStartTime = System.currentTimeMillis();
 long countrResult = lines.count();//這一次是從內存種中讀數據 long countEndTime = System.currentTimeMillis();
 System.out.println("共"+countrResult+ "條數據,"+"計算時間="+ (countEndTime-
           countStartTime));
        
 jsc.stop();

 

2、persist(可以指定持久化的級別

 

解釋:

 

1、MEMORY_AND_DISK 意思是先往內存中放數據,內存不夠再放磁盤

2、最常用的是MEMORY_ONLYMEMORY_AND_DISK”_2”表示有副本數。

3、選擇的原則是:首先考慮內存,然后考慮序列化之后再放入內存,最后考慮內存加磁盤。

4、盡量避免使用“_2”和DISK_ONLY級別。

5、deserialized是不序列化的意思。

 

注意事項:

 

  1. 1、cache和persist都是懶執行,必須有一個action類算子觸發執行。
  2. 2、cache和persist算子的返回值可以賦值給一個變量,在其他job中直接使用這個變量就是使用持久化的數據了。持久化的單位是partition。
  3. 3、cache和persist算子后不能立即緊跟action算子。

錯誤:

rdd.cache().count() 返回的不是持久化的RDD,而是一個數值了

 

3、 Checkpoint(對Lineage非常長時使用

 

     1、概念和特征:

 

           不僅可以將數據持久化到磁盤,還可以切斷RDD之間的依賴關系,checkpoint也是懶執行。

            Checkpoin不僅存儲結果,還會存儲邏輯,還可以存儲元數據。

            Persisit切斷不了RDD的依賴關系。

 

     2、checkpoint 的執行原理:

 

           2.1.Spark job執行完之后,spark會從finalRDD從后往前回溯。

           2.2.當回溯到對某個RDD進行了checkpoint,會對這個RDD標記。

           2.3.回溯完成之后,Spark會重新計算標記RDD的結果,然后將結果保存到Checkpint目錄中。

 

   3、優化checekpoint

 

  • 因為最后是要觸發當前application的action算子,所以在觸發之前加一層cache操作,一樣會往前執行cache操作,實現對數據的cache ,所以考慮將cache優化到checkpoin的優化流程里。
  • RDD執行checkpoint之前,最好對這個RDD先執行cache,這樣新啟動的job(回溯完成之后重新開的job)只需要將內存中的數據(cache緩存好的checkpoint那個點的數據)拷貝到HDFS上就可以。
  • 省去了重新計算這一步,不需要重頭開始來走到checkpoint這個點了。

 

總結:

 

持久化的最小單位是partition!!!

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM