Spark-Join優化之Broadcast


適用場景

  • 進行join中至少有一個RDD的數據量比較少(比如幾百M,或者1-2G)
  • 因為,每個Executor的內存中,都會駐留一份廣播變量的全量數據

Broadcast與map進行join代碼示例

創建RDD

val list1 = List((jame,23), (wade,3), (kobe,24))
val list2 = List((jame,cave), (wade,bulls), (kobe,lakers))
val rdd1 = sc.makeRDD(list1)
val rdd2 = sc.makeRDD(list2)

傳統的join

// 傳統的join操作會導致shuffle操作。
// 因為兩個RDD中,相同的key都需要通過網絡拉取到一個節點上,由一個task進行join操作。
val rdd3 = rdd1.join(rdd2)
// 結果如下
scala> rdd1.join(rdd2).collect
res27: Array[(String, (Int, String))] = Array((kobe,(24,lakers)), (wade,(3,bulls)), (jame,(23,cave)))

使用Broadcast+map的join操作

// Broadcast+map的join操作,不會導致shuffle操作。
// 使用Broadcast將一個數據量較小的RDD作為廣播變量
val rdd2Data = rdd2.collect()
val rdd2Bc = sc.broadcast(rdd2Data)

// 在rdd1.map算子中,可以從rdd2DataBroadcast中,獲取rdd2的所有數據。
// 然后進行遍歷,如果發現rdd2中某條數據的key與rdd1的當前數據的key是相同的,那么就判定可以進行join。
def function(tuple: (String,Int)): (String,(Int,String)) ={
    for(value <- rdd2Bc.value){
     if(value._1.equals(tuple._1))
        return (tuple._1,(tuple._2,value._2.toString))
         }
         (tuple._1,(tuple._2,null))
         }

// 在rdd1.map算子中,可以從rdd2DataBroadcast中,獲取rdd2的所有數據。
// 然后進行遍歷,如果發現rdd2中某條數據的key與rdd1的當前數據的key是相同的,那么就判定可以進行join。
// 此時就可以根據自己需要的方式,將rdd1當前數據與rdd2中可以連接的數據,拼接在一起(String或Tuple)。
val rdd3 = rdd1.map(function(_))

//結果如下,達到了與傳統join相同的效果
scala> rdd1.map(function(_)).collect
res31: Array[(String, (Int, String))] = Array((jame,(23,cave)), (wade,(3,bulls)), (kobe,(24,lakers)))

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM