map join相對reduce join來說,可以減少在shuff階段的網絡傳輸,從而提高效率,所以大表與小表關聯時,盡量將小表數據先用廣播變量導入內存,后面各個executor都可以直接使用
package sogolog import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapred.TextInputFormat import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} class RddFile { def readFileToRdd(path: String): RDD[String] = { val conf = new SparkConf().setMaster("local").setAppName("sougoDemo") val sc = new SparkContext(conf); //使用這種方法能夠避免中文亂碼 readFileToRdd(path,sc) } def readFileToRdd(path: String,sc :SparkContext): RDD[String] = { //使用這種方法能夠避免中文亂碼 sc.hadoopFile(path,classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).map{ pair => new String(pair._2.getBytes, 0, pair._2.getLength, "GBK")} } }
package sogolog import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import scala.collection.mutable.ArrayBuffer object MapSideJoin { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("sougoDemo") val sc = new SparkContext(conf); val userRdd = new RddFile().readFileToRdd("J:\\scala\\workspace\\first-spark-demo\\sougofile\\user",sc) //解析用戶信息 val userMapRDD:RDD[(String,String)] = userRdd.map(line=>(line.split("\t")(0),line.split("\t")(1))) //將用戶信息設置為廣播變量,方便各個任務引用 val userMapBroadCast =sc.broadcast(userMapRDD.collectAsMap()) val searchLogRdd = new RddFile().readFileToRdd("J:\\scala\\workspace\\first-spark-demo\\sougofile\\SogouQ.reduced",sc) val joinResult = searchLogRdd.mapPartitionsWithIndex((index,f)=>{ val userMap = userMapBroadCast.value var result = ArrayBuffer[String]() var count = 0 //搜索日志表join用戶表 //原來日志列為:時間 用戶ID 關鍵詞 排名 URL //新的日志列為:時間 用戶ID 用戶名 關鍵詞 排名 URL f.foreach( log=>{ count=count+1; val lineArrs = log.split("\t") val uid = lineArrs(1) val newLine:StringBuilder = new StringBuilder() if(userMap.contains(uid)){ newLine.append(lineArrs(0)).append("\t") newLine.append(lineArrs(1)).append("\t") newLine.append(userMap.get(uid).get).append("\t") //從廣播變量中根據用戶ID獲取用戶名 for (i<- 2 to lineArrs.length-1){ newLine.append(lineArrs(i)).append("\t") } result .+= (newLine.toString()) } }) println("partition_"+index+"處理的行數為:"+count) result.iterator }) //打印結果 joinResult.collect().foreach(println) } }
結果展示: