Spark實戰電影點評系統(一)


一、通過RDD實戰電影點評系統

  日常的數據來源有很多渠道,如網絡爬蟲、網頁埋點、系統日志等。下面的案例中使用的是用戶觀看電影和點評電影的行為數據,數據來源於網絡上的公開數據,共有3個數據文件:uers.dat、ratings.dat和movies.dat。

  其中,uers.dat的格式如下: UserID::Gender::Age::Occupation::Zip-code ,這個文件里共有6040個用戶的信息,每行中用“::”隔開的詳細信息包括ID、性別(F、M分別表示女性、男性)、年齡(使用7個年齡段標記)、職業和郵編。

    

    ratings.dat的格式如下: UserID::MovieID::Rating::Timestamp ,這個文件共有一百萬多條記錄,記錄的是評分信息,即用戶ID、電影ID、評分(滿分是5分)和時間戳。

    

  movies.dat的格式如下: MovieID::Title::Genres ,這個文件記錄的是電影信息,即電影ID、電影名稱和電影類型。

  

  首先初始化Spark,以及讀取文件。創建一個Scala的object類,在main方法中配置SparkConf和SparkContext,這里指定程序在本地運行,並且把程序名字設置為“RDD_Movie_Users_Analyzer”。

    val conf = new SparkConf().setMaster("local[*]").setAppName("RDD_Movie_User_Analyzer")
    /**
     * Spark2.0引入SparkSession封裝了SparkContext和SQLContext,並且會在builder的getOrCreate方法中判斷是否
     * 含有符合要求的SparkSession存在,有則使用,沒有則進行創建
     */
    val spark = SparkSession.builder.config(conf).getOrCreate()
    // 獲取SparkSession的SparkContext
    val sc = spark.sparkContext
    // 把Spark程序運行時的日志設置為warn級別,以方便查看運行結果
    sc.setLogLevel("WARN")
    // 把用到的數據加載進來轉換為RDD,此時使用sc.textFile並不會讀取文件,而是標記了有這個操作,遇到Action級算子時才回真正去讀取文件
    val usersRDD = sc.textFile("./src/test1/users.dat")
    val moviesRDD = sc.textFile("./src/test1/movies.dat")
    val ratingsRDD = sc.textFile("./src/test1/ratings.dat") 

  首先我們來寫一個案例計算,並打印出所有電影中評分最高的前10個電影名和平均評分。

  第一步:從ratingsRDD中取出MovieID和rating,從moviesRDD中取出MovieID和Name,如果后面的代碼重復使用這些數據,則可以把它們緩存起來。首先把使用map算子上面的RDD中的每一個元素(即文件中的每一行)以“::”為分隔符進行拆分,然后再使用map算子從拆分后得到的數組中取出需要用到的元素,並把得到的RDD緩存起來

  第二步:從ratings的數據中使用map算子獲取到形如(movieID,(rating,1))格式的RDD,然后使用reduceByKey把每個電影的總評分以及點評人數算出來。此時得到的RDD格式為(movieID,Sum(ratings),Count(ratings)))。

  第三步:把每個電影的Sum(ratings)和Count(ratings)相除,得到包含了電影ID和平均評分的RDD:

  第四步:把avgRatings與movieInfo通過關鍵字(key)連接到一起,得到形如(movieID, (MovieName,AvgRating))的RDD,然后格式化為(AvgRating,MovieName),並按照key(也就是平均評分)降序排列,最終取出前10個並打印出來。

    println("所有電影中平均得分最高(口碑最好)的電影:")
    val movieInfo = moviesRDD.map(_.split("::")).map(x=>(x(0),x(1))).cache()
    val ratings = ratingsRDD.map(_.split("::")).map(x=>(x(0),x(1),x(2))).cache()
    val moviesAndRatings = ratings.map(x=>(x._2,(x._3.toDouble,1))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))
    val avgRatings = moviesAndRatings.map(x=>(x._1,x._2._1.toDouble/x._2._2))
    avgRatings.join(movieInfo).map(item=>(item._2._1,item._2._2))
              .sortByKey(false).take(10)
              .foreach(record=>println(record._2+"評分為:"+record._1))

   

  接下來我們來看另外一個功能的實現:分析最受男性喜愛的電影Top10和最受女性喜愛的電影Top10。

  首先來分析一下:單從ratings中無法計算出最受男性或者女性喜愛的電影Top10,因為該RDD中沒有Gender信息,如果需要使用Gender信息進行Gender的分類,此時一定需要聚合。當然,我們力求聚合使用的是mapjoin(分布式計算的一大痛點是數據傾斜,map端的join一定不會數據傾斜),這里是否可使用mapjoin?不可以,因為map端的join是使用broadcast把相對小得多的變量廣播出去,這樣可以減少一次shuffle,這里,用戶的數據非常多,所以要使用正常的join。 

  使用join連接ratings和users之后,對分別過濾出男性和女性的記錄進行處理:

    println("========================================")
    println("所有電影中最受男性喜愛的電影Top10:")
    val usersGender = usersRDD.map(_.split("::")).map(x=>(x(0),x(1)))
    val genderRatings = ratings.map(x=>(x._1,(x._1,x._2,x._3))).join(usersGender).cache()
    // genderRatings.take(10).foreach(println)
    val maleFilteredRatings = genderRatings.filter(x=>x._2._2.equals("M")).map(x=>x._2._1)
    val femaleFilteredRatings = genderRatings.filter(x=>x._2._2.equals("F")).map(x=>x._2._1)
    maleFilteredRatings.map(x=>(x._2,(x._3.toDouble,1))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))
                      .map(x=>(x._1,x._2._1.toDouble/x._2._2))
                      .join(movieInfo)
                      .map(item=>(item._2._1,item._2._2))
                      .sortByKey(false)
                      .take(10)
                      .foreach(record=>println(record._2+"評分為:"+record._1))
      
    println("========================================")
    println("所有電影中最受女性喜愛的電影Top10:")
    femaleFilteredRatings.map(x=>(x._2,(x._3.toDouble,1))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))
                      .map(x=>(x._1,x._2._1.toDouble/x._2._2))
                      .join(movieInfo)
                      .map(item=>(item._2._1,item._2._2))
                      .sortByKey(false)
                      .take(10)
                      .foreach(record=>println(record._2+"評分為:"+record._1))

   

  在現實業務場景中,二次排序非常重要,並且經常遇到。下面來模擬一下這些場景,實現對電影評分數據進行二次排序,以Timestamp和Rating兩個維度降序排列,值得一提的是,Java版本的二次排序代碼非常煩瑣,而使用Scala實現就會很簡捷,首先我們需要一個繼承自Ordered和Serializable的類。

class SecondarySortKey(val first:Double,val second:Double) extends Ordered[SecondarySortKey] with Serializable{
  // 在這個類中重寫compare方法
  override def compare(other:SecondarySortKey):Int={
    // 既然是二次排序,那么首先要判斷第一個排序字段是否相等,如果不相等,就直接排序
    if(this.first-other.first!=0){
      (this.first-other.first).toInt
    }else {
      // 如果第一個字段相等,則比較第二個字段,若想實現多次排序,也可以按照這個模式繼續比較下去
      if(this.second-other.second>0){
        Math.ceil(this.second-other.second).toInt
      }else if (this.second-other.second<0) {
        Math.floor(this.second-other.second).toInt
      }else {
        (this.second-other.second).toInt
      }
    }
  }
}

  然后再把RDD的每條記錄里想要排序的字段封裝到上面定義的類中作為key,把該條記錄整體作為value。  

    println("========================================")
    println("對電影評分數據以Timestamp和Rating兩個維度進行二次降序排列:")
    val pairWithSortkey = ratingsRDD.map(line=>{
      val spilted = line.split("::")
      (new SecondarySortKey(spilted(3).toDouble,spilted(2).toDouble),line)
    })
    // 直接調用sortByKey,此時會按照之前實現的compare方法排序
    val sorted = pairWithSortkey.sortByKey(false)
    val sortedResult = sorted.map(sortedline => sortedline._2)
    sortedResult.take(10).foreach(println)

  取出排序后的RDD的value,此時這些記錄已經是按照時間戳和評分排好序的,最終打印出的結果如圖所示,從圖中可以看到已經按照timestamp和評分降序排列了。

   

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM