Spark(八)【利用廣播小表實現join避免Shuffle】


使用場景

大表join小表 只能廣播小表

普通的join是會走shuffle過程的,而一旦shuffle,就相當於會將相同key的數據拉取到一個shuffle read task中再進行join,此時就是reduce join。但是如果一個RDD是比較小的,則可以采用廣播小RDD全量數據+map算子來實現與join同樣的效果,也就是map join,此時就不會發生shuffle操作,也就不會發生數據傾斜。

注意:RDD是並不能進行廣播的,只能將RDD內部的數據通過collect拉取到Driver內存然后再進行廣播

核心思路

將較小RDD中的數據直接通過collect算子拉取到Driver端的內存中來,然后對其創建一個Broadcast變量;接着對另外一個RDD執行map類算子,在算子函數內,從Broadcast變量中獲取較小RDD的全量數據,與當前RDD的每一條數據按照連接key進行比對,如果連接key相同的話,那么就將兩個RDD的數據用你需要的方式連接起來。

代碼演示

正常join

    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MapJoin")
    val sc: SparkContext = new SparkContext(conf)
    val rdd1: RDD[(String, Int)] = sc.makeRDD(List("key1" -> 2, "key1" -> 10, "key2" -> 20, "key3" -> 30))
    val rdd2: RDD[(String, Int)] = sc.makeRDD(List("key1" -> 5, "key1" -> 20, "key2" -> 40, "key4" -> 30))
	 //join
    rdd1.join(rdd2).collect().foreach(println)

控制台

(key1,(2,5))
(key1,(2,20))
(key1,(10,5))
(key1,(10,20))
(key2,(20,40))

正常left join

//left join
rdd1.leftOuterJoin(rdd2).collect().foreach(println)
(k1,(10,Some(-10)))
(k1,(10,Some(-100)))
(k2,(20,Some(-20)))
(k1,(100,Some(-10)))
(k1,(100,Some(-100)))
(k3,(30,None))

廣播:join

    //廣播rdd2
    val bd: Broadcast[Array[(String, Int)]] = sc.broadcast(rdd2.collect())
    val result = rdd1.flatMap {
      case (key1, value1) => {
        bd.value
          .filter(key1 == _._1)
          .map {
            case (key2, value2) =>
              (key1, (value1, value2))
          }
      }
    }
    result.collect().foreach(println)

廣播:left join

    //廣播rdd2
    val bd: Broadcast[Array[(String, Int)]] = sc.broadcast(rdd2.collect())
    val result: RDD[(String, (Int, Option[Int]))] = rdd1.flatMap {
      case (key1, value1) =>
        val arr = bd.value
        val keys = arr.map(_._1)
        if (keys.contains(key1)) {
          bd.value.filter(key1 == _._1).map {
            case (key2, value2) =>
              (key1, (value1, Some(value2)))
          }
        } else {
          Array(key1 -> (value1, None))
        }
    }
    result.collect.foreach(println)

不適用場景

由於Spark的廣播變量是在每個Executor中保存一個副本,如果兩個RDD數據量都比較大,那么如果將一個數據量比較大的 RDD做成廣播變量,那么很有可能會造成內存溢出


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM