前言
Spark中RDD的高效與DAG圖有着莫大的關系,在DAG調度中需要對計算過程划分stage,暴力的理解就是stage的划分是按照有沒有涉及到shuffle來划分的,沒涉及的shuffle的都划分在一個stage里面,這種划分依據就是RDD之間的依賴關系。針對不同的轉換函數,RDD之間的依賴關系分類窄依賴(narrow dependency)和寬依賴(wide dependency, 也稱 shuffle dependency).
定義
- 窄依賴是指父RDD的每個分區只被子RDD的一個分區所使用,子RDD分區通常對應常數個父RDD分區(O(1),與數據規模無關)
- 相應的,寬依賴是指父RDD的每個分區都可能被多個子RDD分區所使用,子RDD分區通常對應所有的父RDD分區(O(n),與數據規模有關)
寬依賴和窄依賴關系圖:
為什么要有寬窄依賴?
1.前面已經說過了stage划分的一個很重要的原因就是有沒有涉及到shuffle,如果沒涉及到的被划分到一個stage里面。
2.沒有涉及shuffle的任務直接運行就可以,這個也就是長提到的pipeline。這種面向的就是窄依賴。
√ 每個分區里的數據都被加載到機器的內存里,我們逐一的調用 map, filter, map 函數到這些分區里,Job 就很好的完成。 √ 更重要的是,由於數據沒有轉移到別的機器,我們避免了 Network IO 或者 Disk IO. 唯一的任務就是把 map / filter 的運行環境搬到這些機器上運行,這對現代計算機來說,overhead 幾乎可以忽略不計。 √ 這種把多個操作合並到一起,在數據上一口氣運行的方法在 Spark 里叫 pipeline (其實 pipeline 被廣泛應用的很多領域,比如 CPU)。這時候不同就出現了:只有 narrow transformation 才可以進行 pipleline 操作。對於 wide transformation, RDD 轉換需要很多分區運算,包括數據在機器間搬動,所以失去了 pipeline 的前提。 √ 總結起來一句話:數據和算是否在一起,計算的性能是不一樣的,為了區分,就有了寬依賴和窄依賴。
3.一提到shuffle如果之前對mapreduce有過了解的人都知道,這個對分布式影響巨大,spark也是一步步演變過來的,現在可以說spark2.x以上的shuffle可以認為和經典的mapreduce的shuffle一樣了,到現在可以說spark完全比mp有優勢了。這之前,在一些場景下spark還是比不過mp的。(這個地方會寫一個專門的文章來闡述變化過程)。
4.寬窄依賴如何優化?----得想想
窄依賴對優化的幫助
1.寬依賴往往對應着shuffle操作,需要在運行過程中將同一個父RDD的分區傳入到不同的子RDD分區中,中間可能涉及到多個節點之間的數據傳輸;而窄依賴的每個父RDD的分區只會傳入到一個子RDD分區中,通常可以在一個節點內就可以完成了。
2.當RDD分區丟失時(某個節點故障),spark會對數據進行重算。
- 對於窄依賴,由於父RDD的一個分區只對應一個子RDD分區,這樣只需要重算和子RDD分區對應的父RDD分區即可,所以這個重算對數據的利用率是100%的;
- 對於寬依賴,重算的父RDD分區對應多個子RDD分區,這樣實際上父RDD 中只有一部分的數據是被用於恢復這個丟失的子RDD分區的,另一部分對應子RDD的其它未丟失分區,這就造成了多余的計算;更一般的,寬依賴中子RDD分區通常來自多個父RDD分區,極端情況下,所有的父RDD分區都要進行重新計算。
3.如下圖所示,b1分區丟失,則需要重新計算a1,a2和a3,這就產生了冗余計算(a1,a2,a3中對應b2的數據)。
區分這兩種依賴很有用。首先,窄依賴允許在一個集群節點上以流水線的方式(pipeline)計算所有父分區。例如,逐個元素地執行map、然后filter操作;而寬依賴則需要首先計算好所有父分區數據,然后在節點之間進行Shuffle,這與MapReduce類似。第二,窄依賴能夠更有效地進行失效節點的恢復,即只需重新計算丟失RDD分區的父分區,而且不同節點之間可以並行計算;而對於一個寬依賴關系的Lineage圖,單個節點失效可能導致這個RDD的所有祖先丟失部分分區,因而需要整體重新計算。
窄依賴中每個子RDD可能對應多個父RDD,當子RDD丟失時會導致多個父RDD進行重新計算,所以窄依賴不如寬依賴有優勢。
而實際上應該深入到分區級別去看待這個問題,而且重算的效用也不在於算的多少,而在於有多少是冗余的計算。窄依賴中需要重算的都是必須的,所以重算不冗余
窄依賴的函數有:map, filter, union, join(父RDD是hash-partitioned ), mapPartitions, mapValues
寬依賴的函數有:groupByKey, join(父RDD不是hash-partitioned ), partitionBy