030 RDD Join中寬依賴與窄依賴的判斷


1.規律

   如果JoinAPI之前被調用的RDD API是寬依賴(存在shuffle), 而且兩個join的RDD的分區數量一致,join結果的rdd分區數量也一樣,這個時候join api是窄依賴
  除此之外的,rdd 的join api是寬依賴

 

2.Join的理解

  

  

3.舉例

 1 A表數據:
 2   1 a
 3   2 b
 4   3 c
 5 B表數據:
 6   1 aa1
 7   1 aa2
 8   2 bb1
 9   2 bb2
10   2 bb3
11   4 dd1
12 
13 A inner join B:
14   1    a 1 aa1
15   1    a 1 aa2
16   2   b 2 bb1
17   2   b 2 bb2
18   2   b 2 bb3
19 
20 A left outer join B:
21   1    a 1 aa1
22   1    a 1 aa2
23   2   b 2 bb1
24   2   b 2 bb2
25   2   b 2 bb3
26   3   c null null
27 
28 A right outer join B:
29   1    a 1 aa1
30   1    a 1 aa2
31   2   b 2 bb1
32   2   b 2 bb2
33   2   b 2 bb3
34   null null 4 dd1
35 
36 A full outer join B:
37   1    a 1 aa1
38   1    a 1 aa2
39   2   b 2 bb1
40   2   b 2 bb2
41   2   b 2 bb3
42   3   c null null
43   null null 4 dd1
44 
45 A left semi join B:
46   1 a
47   2 b

  

4.API

  必須是Key/value鍵值對

  

 

5.測試程序

 1 import org.apache.spark.{SparkConf, SparkContext}
 2 
 3 /**
 4   * RDD數據Join相關API講解
 5   * Created by ibf on 02/09.
 6   */
 7 object RDDJoin {
 8   def main(args: Array[String]): Unit = {
 9     val conf = new SparkConf()
10       .setMaster("local[*]")
11       .setAppName("RDD-Join")
12     val sc = SparkContext.getOrCreate(conf)
13 
14     // ==================具體代碼======================
15     // 模擬數據產生
16     val rdd1 = sc.parallelize(Array(
17       (1, "張三1"),
18       (1, "張三2"),
19       (2, "李四"),
20       (3, "王五"),
21       (4, "Tom"),
22       (5, "Gerry"),
23       (6, "莉莉")
24     ), 1)
25 
26     val rdd2 = sc.parallelize(Array(
27       (1, "上海"),
28       (2, "北京1"),
29       (2, "北京2"),
30       (3, "南京"),
31       (4, "紐約"),
32       (6, "深圳"),
33       (7, "香港")
34     ), 1)
35 
36     // 調用RDD API實現內連接
37     val joinResultRDD = rdd1.join(rdd2).map {
38       case (id, (name, address)) => {
39         (id, name, address)
40       }
41     }
42     println("----------------")
43     joinResultRDD.foreachPartition(iter => {
44       iter.foreach(println)
45     })
46     // 調用RDD API實現左外連接
47     val leftJoinResultRDd = rdd1.leftOuterJoin(rdd2).map {
48       case (id, (name, addressOption)) => {
49         (id, name, addressOption.getOrElse("NULL"))
50       }
51     }
52     println("----------------")
53     leftJoinResultRDd.foreachPartition(iter => {
54       iter.foreach(println)
55     })
56     // 左外連接稍微變化一下:需要左表出現,右表不出現的數據(not in)
57     println("----------------")
58     rdd1.leftOuterJoin(rdd2).filter(_._2._2.isEmpty).map {
59       case (id, (name, _)) => (id, name)
60     }.foreachPartition(iter => {
61       iter.foreach(println)
62     })
63 
64     // 右外連接
65     println("----------------")
66     rdd1
67       .rightOuterJoin(rdd2)
68       .map {
69         case (id, (nameOption, address)) => {
70           (id, nameOption.getOrElse("NULL"), address)
71         }
72       }
73       .foreachPartition(iter => iter.foreach(println))
74 
75     // 全外連接
76     println("----------------")
77     rdd1
78       .fullOuterJoin(rdd2)
79       .map {
80         case (id, (nameOption, addressOption)) => {
81           (id, nameOption.getOrElse("NULL"), addressOption.getOrElse("NULL"))
82         }
83       }
84       .foreachPartition(iter => iter.foreach(println))
85 
86     // 休眠為了看4040頁面
87         Thread.sleep(1000000)
88   }
89 }

 

 

6.說明 

 RDD join API:
  def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
    返回值是RDD,RDD中的類型是一個二元組(a),a第一個元素是KEY類型的值(join的key), a第二個元素又是二元組(b), b的第一個元素是來自調用join函數的RDD的value,
    b的第二個元素是來自參數other這個RDD的value

  def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
    對於右邊的數據返回的是Option類型是數據,所以如果右表數據不存在,返回的是None;否則是一個Some的具體數據

  def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
    對於左邊的數據返回的是Option類型是數據,所以如果左表數據不存在,返回的是None;否則是一個Some的具體數據

  def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
    返回的value類型是Option封裝后的數據,如果數據不存在, 返回的是None,存在返回的是Some具體數據

 

7.缺點

  

 

8.優化程序

  沒有使用API,根據原理寫一個。

  減少shufflw算子的使用。

  1 import org.apache.spark.{SparkConf, SparkContext}
  2 
  3 /**
  4   * RDD數據Join相關API講解
  5   * Created by ibf on 02/09.
  6   */
  7 object RDDJoin {
  8   def main(args: Array[String]): Unit = {
  9     val conf = new SparkConf()
 10       .setMaster("local[*]")
 11       .setAppName("RDD-Join")
 12     val sc = SparkContext.getOrCreate(conf)
 13 
 14     // ==================具體代碼======================
 15     // 模擬數據產生
 16     val rdd1 = sc.parallelize(Array(
 17       (1, "張三1"),
 18       (1, "張三2"),
 19       (2, "李四"),
 20       (3, "王五"),
 21       (4, "Tom"),
 22       (5, "Gerry"),
 23       (6, "莉莉")
 24     ), 1)
 25 
 26     val rdd2 = sc.parallelize(Array(
 27       (1, "上海"),
 28       (2, "北京1"),
 29       (2, "北京2"),
 30       (3, "南京"),
 31       (4, "紐約"),
 32       (6, "深圳"),
 33       (7, "香港")
 34     ), 1)
 35 
 36     // 假設rdd2的數據比較少,將rdd2的數據廣播出去
 37     val leastRDDCollection = rdd2.collect()
 38     val broadcastRDDCollection = sc.broadcast(leastRDDCollection)
 39 
 40     println("++++++++++++++++++")
 41     // 類似Inner Join的操作,Inner Join的功能:將兩個表都出現的數據合並
 42     println("-------------------")
 43     rdd1
 44       // 過濾rdd1中的數據,只要在rdd1中出現的數據,沒有出現的數據過濾掉
 45       .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))
 46       // 數據合並,由於一條rdd1的數據可能在rdd2中存在多條對應數據,所以使用flatMap
 47       .flatMap {
 48       case (id, name) => {
 49         broadcastRDDCollection.value.filter(_._1 == id).map {
 50           case (_, address) => {
 51             (id, name, address)
 52           }
 53         }
 54       }
 55     }
 56       .foreachPartition(iter => iter.foreach(println))
 57 
 58     // 左外連接
 59     println("---------------------")
 60     rdd1
 61       .flatMap {
 62         case (id, name) => {
 63           // 從右表所屬的廣播變量中獲取對應id的集合列表
 64           val list = broadcastRDDCollection.value.filter(_._1 == id)
 65           // 對應id的集合可能為空,也可能數據有多個
 66           if (list.nonEmpty) {
 67             // 存在多個
 68             list.map(tuple => (id, name, tuple._2))
 69           } else {
 70             // id在右表中不存在,填默認值
 71             (id, name, "NULL") :: Nil
 72           }
 73         }
 74       }
 75       .foreachPartition(iter => iter.foreach(println))
 76 
 77     // 右外連接
 78     /**
 79       * rdd2中所有數據出現,由於rdd2中的數據在driver中可以存儲,可以認為rdd1和rdd2通過right join之后的數據也可以在driver中保存下
 80       **/
 81     println("---------------------")
 82     // 將rdd1中符合條件的數據過濾出來保存到driver中
 83     val stage1 = rdd1
 84       .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))
 85       .collect()
 86     // 將driver中兩個集合進行right join
 87     val stage2 = leastRDDCollection.flatMap {
 88       case (id, address) => {
 89         val list = stage1.filter(_._1 == id)
 90         if (list.nonEmpty) {
 91           list.map(tuple => (id, tuple._2, address))
 92         } else {
 93           Iterator.single((id, "NULL", address))
 94         }
 95       }
 96     }
 97     stage2.foreach(println)
 98 
 99     // TODO: 全外連接,不寫代碼,因為代碼比較復雜
100 
101     //====================================
102     // 左半連接:只出現左表數據(要求數據必須在右表中也出現過),如果左表的數據在右表中出現多次,最終結果只出現一次
103     println("+++++++++++++++++")
104     println("-----------------------")
105     rdd1
106       .join(rdd2)
107       .map {
108         case (id, (name, _)) => (id, name)
109       }
110       .distinct()
111       .foreachPartition(iter => iter.foreach(println))
112     println("------------------------")
113     rdd1
114       .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))
115       .foreachPartition(iter => iter.foreach(println))
116 
117     // 休眠為了看4040頁面
118         Thread.sleep(1000000)
119   }
120 }

 

9.Join的窄依賴程序

  使用reduceByKey,里面的程序會給一個分區 

 1 package com.ibeifeng.senior.join
 2 
 3 import org.apache.spark.{SparkConf, SparkContext}
 4 
 5 /**
 6   * RDD數據Join相關API講解
 7   * Created by ibf on 02/09.
 8   */
 9 object RDDJoin2 {
10   def main(args: Array[String]): Unit = {
11     val conf = new SparkConf()
12       .setMaster("local[*]")
13       .setAppName("RDD-Join")
14     val sc = SparkContext.getOrCreate(conf)
15 
16     // ==================具體代碼======================
17     // 模擬數據產生, 添加map、reduceByKey、mapPartitions等api的主要功能是給rdd1和rdd2中添加一個分區器(表示當前rdd是存在shuffle過程的)
18     val rdd1 = sc.parallelize(Array(
19       (1, "張三1"),
20       (1, "張三2"),
21       (2, "李四"),
22       (3, "王五"),
23       (4, "Tom"),
24       (5, "Gerry"),
25       (6, "莉莉")
26     ), 1).map(x => (x, null)).reduceByKey((x,y) => x, 1).mapPartitions(
27       iter => iter.map(tuple => tuple._1),
28       true // 使用上一個RDD的分區器,false表示不使用, 設置為None
29     )
30 
31     val rdd2 = sc.parallelize(Array(
32       (1, "上海"),
33       (2, "北京1"),
34       (2, "北京2"),
35       (3, "南京"),
36       (4, "紐約"),
37       (6, "深圳"),
38       (7, "香港")
39     ), 1).map(x => (x, null)).reduceByKey((x,y) => x, 1).mapPartitions(
40       iter => iter.map(tuple => tuple._1),
41       true // 使用上一個RDD的分區器,false表示不使用, 設置為None
42     )
43 
44     // 調用RDD API實現內連接
45     val joinResultRDD = rdd1.join(rdd2).map {
46       case (id, (name, address)) => {
47         (id, name, address)
48       }
49     }
50     println("----------------")
51     joinResultRDD.foreachPartition(iter => {
52       iter.foreach(println)
53     })
54 
55     // 休眠為了看4040頁面
56         Thread.sleep(1000000)
57   }
58 }

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM