使用Spark進行搜狗日志分析實例——map join的使用


map join相對reduce join來說,可以減少在shuff階段的網絡傳輸,從而提高效率,所以大表與小表關聯時,盡量將小表數據先用廣播變量導入內存,后面各個executor都可以直接使用

package sogolog

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}



class RddFile {
  def readFileToRdd(path: String): RDD[String] = {
    val conf = new SparkConf().setMaster("local").setAppName("sougoDemo")
    val sc = new SparkContext(conf);
    //使用這種方法能夠避免中文亂碼
   readFileToRdd(path,sc)
  }

  def readFileToRdd(path: String,sc :SparkContext): RDD[String] = {
    //使用這種方法能夠避免中文亂碼
    sc.hadoopFile(path,classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).map{
      pair =>  new String(pair._2.getBytes, 0, pair._2.getLength, "GBK")}
  }
}

 

package sogolog


import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

import scala.collection.mutable.ArrayBuffer


object MapSideJoin {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("sougoDemo")
    val sc = new SparkContext(conf);
    val userRdd = new RddFile().readFileToRdd("J:\\scala\\workspace\\first-spark-demo\\sougofile\\user",sc)

    //解析用戶信息
    val userMapRDD:RDD[(String,String)] = userRdd.map(line=>(line.split("\t")(0),line.split("\t")(1)))

    //將用戶信息設置為廣播變量,方便各個任務引用
    val userMapBroadCast =sc.broadcast(userMapRDD.collectAsMap())

    val searchLogRdd = new RddFile().readFileToRdd("J:\\scala\\workspace\\first-spark-demo\\sougofile\\SogouQ.reduced",sc)

    val joinResult = searchLogRdd.mapPartitionsWithIndex((index,f)=>{
      val userMap = userMapBroadCast.value
      var result = ArrayBuffer[String]()

      var count = 0

      //搜索日志表join用戶表
      //原來日志列為:時間 用戶ID 關鍵詞 排名 URL
      //新的日志列為:時間 用戶ID 用戶名 關鍵詞 排名 URL
      f.foreach( log=>{
        count=count+1;
        val lineArrs = log.split("\t")
        val uid = lineArrs(1)
        val newLine:StringBuilder = new StringBuilder()
        if(userMap.contains(uid)){
          newLine.append(lineArrs(0)).append("\t")
          newLine.append(lineArrs(1)).append("\t")
          newLine.append(userMap.get(uid).get).append("\t") //從廣播變量中根據用戶ID獲取用戶名
          for (i<- 2 to lineArrs.length-1){
            newLine.append(lineArrs(i)).append("\t")
          }

          result .+= (newLine.toString())
        }
      })
      println("partition_"+index+"處理的行數為:"+count)
      result.iterator

    })

    //打印結果
    joinResult.collect().foreach(println)
  }
}

 

結果展示:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM