一:RDD的依賴關系
1.在代碼中觀察
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
val resultRDD = distData.flatMap(v => (1 to v)).map(v => (v%2,1)).reduceByKey(_+_)
resultRDD.toDebugString ## 查看RDD的依賴情況
2.解釋
+—處表示,這是兩個不同的stage
同時可以知道shuffledRDD依賴於MapPartitionRDD,MapPartitionRDD依賴於MapPartitionRDD,MapPartitionRDD依賴於ParalleCollectionRDD
[2]表示有兩個分區
3.RDD依賴
lineage: 生命線
依賴於RDD之間的依賴,后續的RDD數據是從之前的RDD中獲取
由於存在RDD的依賴,當一個后續的RDD執行失敗的情況下(某個Task執行失敗,eg:數據丟失),可以從父RDD中重新執行
RDD依賴父RDD,依賴的父RDD可以有多個;
特例:第一個RDD是沒有父RDD的
RDD的內部是由多個Partiiton構成的,所以RDD的依賴實質上就是RDD中Partition的依賴關系
4.依賴的情況
當前RDD中的每個分區的數據到下一個RDD都對應一個分區
即:一個分區的數據輸出到下一個RDD的時候還是在同一個分區,也就是一對一
當前RDD中的多個分區的數據到下一個RDD的時候輸出到同一個分區,當前RDD的中一個分區的數據到下一個RDD的時候輸出到多個分區,也就是多對多
5.依賴分類
窄依賴:
子RDD中的每個分區的數據都來自於常數個父RDD的分區,而且父RDD每個分區的數據到子RDD的時候一定在一個分區中
不存在shuffle過程,所有操作在一起進行
寬依賴:
子RDD中的每個分區的數據都依賴所有父RDD的所有的分區數據,而且父RDD的每個分區的數據到子RDD的時候不一定在一個分區中
存在shuffle過程,需要等待上一個RDD的所有Task執行完成
注意點:
join有時候是寬依賴,有時候是窄依賴,這個要看分區數量會不會改變。
6.算子與依賴之間的關系
原本以為Transformation的算子是窄依賴,Action算子是寬依賴。
現在理解更深了一下,發現他們是兩個概念,不要混淆。
二:stage的划分
1.Spark Application Job的Stage划分規則
RDD在調用transformation類型的函數時候形成DAG執行圖(RDD的依賴)
RDD在調用action類型函數的時候會觸發job的執行
在Driver中使用DAGScheduler對DAG圖進行Stage的划分
從DAG圖的最后一步(結果輸出的那一步)往前推,如果發現API是寬依賴(ShuffledRDD), 就結束推斷,將此時構成的DAG圖稱為一個Stage,然后繼續往前推斷,直到第一個RDD
====> Stage與Stage之間的分割是寬依賴
三:兩種RDD依賴的復習
1.說明
主要是添加一個知識點。
什么情況下父RDD需要執行。
2.不是不執行