Spark RDD深度解析-RDD計算流程
摘要 RDD(Resilient Distributed Datasets)是Spark的核心數據結構,所有數據計算操作均基於該結構進行,包括Spark sql 、Spark Streaming。理解RDD有助於了解分布式計算引擎的基本架構,更好地使用Spark進行批處理與流計算。本文以Spark2.0源代碼為主,對RDD的生成、計算流程、加載順序等作深入的解析。
RDD印象
直觀上,RDD可理解為下圖所示結構,即RDD包含多個Partition(分區),每個Partition代表一部分數據並位於一個計算節點。
RDD本質上是Spark中的一個抽象類,所有子RDD(HadoopRDD、MapPartitionRDD、JdbcRDD等)都要繼承並實現其中的方法。
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
RDD包含以下成員方法或屬性:
1、compute方法
提供在計算過程中Partition元素的獲取與計算方式
2、partition的列表
每一個partition代表一個並行的最小划分單元;
3、dependencies列表
描述RDD依賴哪些父RDD生成,即RDD的血緣關系;
4、partition的位置列表
定義如何最快速的獲取partition的數據,加快計算,這個是可選的,可作為本地化計算的優化選項;
5、partitioner方法
定義如何對數據進行分區。
RDD生成方式
1、scala集合
Partition的默認值:defaultParallelism
defaultParallelism與spark的部署模式相關:
-
- Local 模式:本機 cpu cores 的數量
- Mesos 模式:8
- Yarn:max(2, 所有 executors 的 cpu cores 個數總和)
2、物理數據載入
默認為min(defaultParallelism, 2)
3、其他RDD轉換
根據具體的轉換算子而定
Partition
Partiton不直接持有數據,僅僅代表了分區的位置(index的值)。
trait Partition extends Serializable {
/**
* Get the partition's index within its parent RDD
*/
def index: Int
// A better default implementation of HashCode
override def hashCode(): Int = index
override def equals(other: Any): Boolean = super.equals(other)
}
Dependency
從名字可以猜想,他描述了RDD之間的依賴關系。成員rdd就是父RDD,會在構造RDD時被賦值。
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}
由上述RDD、Dependcy關系可畫出下圖,通過這種方式,子RDD能輕易找到父RDD的位置等信息,從而構建出RDD的轉換路徑,為DAGScheduler的任務划分及任務執行時尋找依賴的數據提供依據。
到此應該能大致明白RDD中涉及的各個概念的含義及其之間的聯系。但是仔細思考,會發現存在很多問題,比如:
既然RDD不攜帶數據,那么數據是何時加載的?怎么加載的?怎么分布到不同計算節點的?
不同類型的RDD是怎么完成轉換的?
RDD計算流程
以下面幾行代碼為例,解答上述問題。
var sc = new SparkContext();
var hdfs_rdd = sc.textFile(hdfs://master:9000/examples/people.txt); // 加載數據
var rdd = hdfs_rdd.map(_.split(“,”)); // 對每行數據按逗號分隔
print(rdd.count()); // 打印數據的條數
RDD的轉換
首先從直觀上了解上述代碼執行過程中RDD的轉換,如下圖,Spark按照HDFS中文件的block將數據加載到內存,成為初始RDD1,經過每一步操作后轉換為相應RDD。
首先分析textFile方法的作用,源碼如下:
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
着重看紅色語句,textFile方法實際上是先調用了hadoopFile方法,再利用其返回值調用map方法,HadoopFile執行了什么,返回了什么呢?
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// This is a hack to enforce loading hdfs-site.xml.
// See SPARK-11227 for details.
FileSystem.getLocal(hadoopConfiguration)
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
很明顯,hadoopFile實際上是獲取了HADOOP的配置,然后構造並返回了HadoopRDD對象,HadoopRDD是RDD的子類。因此textFile最后調用的是HadoopRDD對象的map方法,其實RDD接口中定義並實現了map方法,所有繼承了RDD的類調用的map方法都來自於此。
觀察RDD的map方法:
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
map方法很簡單,首先包裝一下傳進來的函數,然后返回MapPartitionsRDD對象。至此,textFile結束,他最終只是返回了MapPartitionsRDD,並沒有執行數據讀取、計算操作。
接着看下一語句:var rdd = hdfs_rdd.map(_.split(“,”));
由上面的分析可知hdfs_rdd是一個MapPartitionsRDD對象,於是其map方法內容與上文的一模一樣,也只是返回一個包含用戶函數的MapPartitionsRDD對象。
目前為止每個方法的調用只是返回不同類型的RDD對象,還未真正執行計算。
接着看var cnt = rdd.count();
count是一種action類型的操作,會觸發RDD的計算,為什么說count會觸發RDD的計算呢?需要看看count的實現:
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
可以看到,count方法中調用了sc(sparkContext)的runJob方法,該操作將觸發DagScheduler去分解任務並提交到集群執行。count方法會返回Array[U]類型的結果,數組中每個值代表了當前RDD每個分區中包含的元素個數,因此sum的結果就是RDD中所有元素的個數,本例的結果就是HDFS文件中存在幾行數據。
RDD的計算
下面介紹任務提交后RDD是怎么計算出來的。
任務分解並提交后開始執行,task會在最后一個RDD上執行compute方法。
以上述代碼為例,最后一個RDD的類型是MapPartitionsRDD,看其compute方法:
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
其中split是RDD的分區,firstParent是父RDD;最外層的f其實是構造MapPartitionsRDD時傳入的一個參數,改參數是一個函數對象,接收三個參數並返回Iterator
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
f是何時生成的呢?就看何時生成的MapPartitionsRDD,參考上文可知MapPartitionsRDD是在map方法里構造的,其第二個構造參數就是f的具體實現。
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
綜上可知,MapPartitionsRDD的compute中f的作用就是就是對f的第三個參數iter執行iter.map(cleanF),其中cleanF就是用戶調用map時傳入的函數,而iter又是firstParent[T].iterator(split, context)的返回值。
firstParent[T].iterator(split, context)又是什么呢?他是對父RDD執行iterator方法,該方法是RDD接口的final方法,因此所有子RDD調用的都是該方法。
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
通過進一步查看可知,iterator先判斷 RDD 的 storageLevel 是否為 NONE,若不是,則嘗試從緩存中讀取,讀取不到則通過計算來獲取該 Partition 對應的數據的迭代器;若是,嘗試從 checkpoint 中獲取 Partition 對應數據的迭代器,若 checkpoint 不存在則通過計算來獲取。
Iterator方法將返回一個迭代器,通過迭代器可以訪問父RDD的某個分區的每個元素,如果內存中不存在父RDD的數據,則調用父RDD的compute方法進行計算。
RDD真正的計算由RDD的action 操作觸發,對於action 操作之前的所有Transformation 操作,Spark只記錄Transformation的RDD生成軌跡,即各個RDD之間的相互依賴關系。
總結
Spark RDD的計算方式為:spark是從最后一個RDD開始計算(調用compute),計算時尋找父RDD,若父RDD在內存就直接使用,否則調用父RDD的compute計算得出,以此遞歸,過程可抽象為下圖:
從對象產生的順序看,先生成了HadoopRDD,調用兩次map方法后依次產生兩個MapPartitionsRDD;從執行的角度看,先執行最后一個RDD的compute方法,在計算過程中遞歸執行父RDD的compute,以生成對應RDD的數據;從數據加載角度看,第一個構造出來的RDD在執行compute時才會將數據載入內存(本例中為HDFS讀入內存),然后在這些數據上執行用戶傳入的方法,依次生成子RDD的內存數據。