多次對某個RDD進行transformation或者action,如果沒有做RDD持久化,那么每次都要重新計算一個RDD,會消耗大量時間,降低Spark性能。
Spark非常重要的一個功能特性就是可以將RDD持久化在內存中。當對RDD執行持久化操作時,每個節點都會將自己操作的RDD的partition持久化到內存中,並且在之后對該RDD的反復使用中,直接使用內存緩存的partition。這樣的話,對於針對一個RDD反復執行多個操作的場景,就只要對RDD計算一次即可,后面直接使用該RDD,而不需要反復計算多次該RDD。
巧妙使用RDD持久化,甚至在某些場景下,可以將spark應用程序的性能提升10倍。對於迭代式算法和快速交互式應用來說,RDD持久化,是非常重要的。
要持久化一個RDD,只要調用其cache()或者persist()方法即可。在該RDD第一次被計算出來時,就會直接緩存在每個節點中。而且Spark的持久化機制還是自動容錯的,如果持久化的RDD的任何partition丟失了,那么Spark會自動通過其源RDD,使用transformation操作重新計算該partition。
cache()和persist()的區別在於,cache()是persist()的一種簡化方式,cache()的底層就是調用的persist()的無參版本,同時就是調用persist(MEMORY_ONLY),將數據持久化到內存中。如果需要從內存中清楚緩存,那么可以使用unpersist()方法。
Spark自己也會在shuffle操作時,進行數據的持久化,比如寫入磁盤,主要是為了在節點失敗時,避免需要重新計算整個過程。
RDD持久化是可以手動選擇不同的策略的。比如可以將RDD持久化在內存中、持久化到磁盤上、使用序列化的方式持久化,多持久化的數據進行多路復用。只要在調用persist()時傳入對應的StorageLevel即可。
持久化級別 | 含義 |
MEMORY_ONLY | 以非序列化的Java對象的方式持久化在JVM內存中。如果內存無法完全存儲RDD所有的partition,那么那些沒有持久化的partition就會在下一次需要使用它的時候,重新被計算
|
MEMORY_AND_DISK |
同上,但是當某些partition無法存儲在內存中時,會持久化到磁盤中。下次需要使用這些partition時,需要從磁盤上讀取 |
MEMORY_ONLY_SER | 同MEMORY_ONLY,但是會使用Java序列化方式,將Java對象序列化后進行持久化。可以減少內存開銷,但是需要進行反序列化,因此會加大CPU開銷。 |
MEMORY_AND_DSK_SER | 同MEMORY_AND_DSK。但是使用序列化方式持久化Java對象 |
DISK_ONLY | 使用非序列化Java對象的方式持久化,完全存儲到磁盤上 |
MEMORY_ONLY_2 |
如果是尾部加了2的持久化級別,表示會將持久化數據復用一份,保存到其他節點,從而在數據丟失時,不需要再次計算,只需要使用備份數據即可。 |
如何選擇RDD持久化策略?
Spark提供的多種持久化級別,主要是為了在CPU和內存消耗之間進行取舍。下面是一些通用的持久化級別的選擇建議:
1、優先使用MEMORY_ONLY,如果可以緩存所有數據的話,那么就使用這種策略。因為純內存速度最快,而且沒有序列化,不需要消耗CPU進行反序列化操作。
2、如果MEMORY_ONLY策略,無法存儲的下所有數據的話,那么使用MEMORY_ONLY_SER,將數據進行序列化進行存儲,純內存操作還是非常快,只是要消耗CPU進行反序列化。
3、如果需要進行快速的失敗恢復,那么就選擇帶后綴為_2的策略,進行數據的備份,這樣在失敗時,就不需要重新計算了。
4、能不使用DISK相關的策略,就不用使用,有的時候,從磁盤讀取數據,還不如重新計算一次。
不使用RDD持久化
使用RDD持久化
package cn.spark.study.core; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; /** * @author: yangchun * @description: * @date: Created in 2020-05-07 13:28 */ public class PersistOperation { public static void main(String[] args) { //第一步:創建SparkConf對象,設置Spark應用的配置信息 //setMaster()可以設置Spark應用程序要連接的機器的master機器,設置為local表示本地運行 SparkConf conf = new SparkConf().setAppName("WordCountLocal") .setMaster("local"); //第二步:創建JavaSparkContext對象 /*在Spark,SparkContext是所有Spark所有功能的一個入口,你無論是java,scala,還是python編寫的 都必須有一個SparkContext,它的主要作用,包括初始化Spark應用程序所需的一些核心組件,包括調度器 (DAGSchedule,TaskSchedule),還會去Spark Master節點進行注冊,等等。Spark Context是Spark中 最重要的一個對象。不同類型的Spark應用程序,SparkContext不同 Java的SparkContext,就是JavaSparkContext Spark SQL程序,SQLContext,HiveContext Spark Streaming SparkContext Scala 就是SparkContext */ JavaSparkContext sc = new JavaSparkContext(conf); /* 第三步,針對輸入源創建R初始RDD,輸入源中的數據會打散,分配到RDD的每個partition中,形成一個分布式數據集 SparkContext根據本地文件創建RDD的方法叫做textFile(),Java中,創建的普通RDD,都叫做JavaRDD。RDD中有元素的 概念,如果hdfs和本地文件,創建的RDD每一個元素相當於文件里面的一行 */ JavaRDD<String> lines = sc.textFile("E:\\spark\\spark.txt").cache(); long start = System.currentTimeMillis(); Long num = lines.count(); long end = System.currentTimeMillis(); System.out.println("first "+(end - start)); long start1 = System.currentTimeMillis(); Long num1 = lines.count(); long end1= System.currentTimeMillis(); System.out.println("second "+(end1 - start1)); sc.close(); } }