推薦系統-05-Spark電影推薦、評估與部署


一、新建scala項目

二、構造程序


代碼如下

package xyz.pl8

import java.io.File

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.mllib.recommendation.{MatrixFactorizationModel, Rating, ALS}
import org.apache.spark.rdd.RDD

import scala.util.Random

object MovieLensALS {

  //1. Define a rating elicitation function
  // Seq[Rating]
  def elicitateRating(movies: Seq[(Int, String)])={
    val prompt="Please rate the following movie(1-5(best) or 0 if not seen: )"
    println(prompt)
    val ratings= movies.flatMap{x=>	
      var rating: Option[Rating] = None  //  Rating(user: Int, product: Int, rating: Double) 
      var valid = false
      while(!valid){
        println(x._2+" :")
        try{
          val r = Console.readInt()
          if (r>5 || r<0){
            println(prompt)
          } else {
            valid = true
            if (r>0){
              rating = Some(Rating(0, x._1, r))
            }
          }
        } catch{
          case e:Exception => println(prompt)
        }
      }
      rating match {
        case Some(r) => Iterator(r)  // FlatMap將結構解散成元素, 這里是Rating
        case None => Iterator.empty
      }
    }
    if (ratings.isEmpty){
      error("No ratings provided!")
    } else {
      ratings
    }
  }

  //2. Define a RMSE computation function
  def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating]) = {
    val prediction = model.predict(data.map(x=>(x.user, x.product)))
    val predDataJoined = prediction.map(x=> ((x.user,x.product),x.rating)).join(data.map(x=> ((x.user,x.product),x.rating))).values
    new RegressionMetrics(predDataJoined).rootMeanSquaredError
  }

  //3. Main
  def main(args: Array[String]) {
  //3.1 Setup env
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

    if (args.length !=1){
      print("Usage: movieLensHomeDir")
      sys.exit(1)
    }

    val conf = new SparkConf().setAppName("MovieLensALS")
    .set("spark.executor.memory","500m")
    val sc = new SparkContext(conf)

  //3.2 Load ratings data and know your data
  // ratings.dat 的格式 UserID::MovieID::Rating::Timestamp
    val movieLensHomeDir=args(0)
	// RDD[long, Rating]
    val ratings = sc.textFile(new File(movieLensHomeDir, "ratings.dat").toString).map {line =>
       val fields = line.split("::")
      //timestamp, user, product, rating
	  // 取模成分成10組
      (fields(3).toLong%10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))
    }
	// movies.dat格式 MovieID::Title::Genres
	// Map[Int,String]
    val movies = sc.textFile(new File(movieLensHomeDir, "movies.dat").toString).map {line =>
      val fields = line.split("::")
      //movieId, movieName
      (fields(0).toInt, fields(1))
    }.collectAsMap()

    val numRatings = ratings.count()
    val numUser = ratings.map(x=>x._2.user).distinct().count()
    val numMovie = ratings.map(_._2.product).distinct().count()

    println("Got "+numRatings+" ratings from "+numUser+" users on "+numMovie+" movies.")

  //3.3 Elicitate personal rating
    // = RDD[(long,Rating) -> Array[int] -> Map[Int, long] -> Seq[(Int, long)] -> Seq[(Int,long)] ->  Seq[Int]
    val topMovies = ratings.map(_._2.product).countByValue().toSeq.sortBy(-_._2).take(50).map(_._1)
    val random = new Random(0)
	// Seq[(Int, String)]
    val selectMovies = topMovies.filter(x=>random.nextDouble() < 0.2).map(x=>(x, movies(x)))

    val myRatings = elicitateRating(selectMovies)
    val myRatingsRDD = sc.parallelize(myRatings, 1)

  //3.4 Split data into train(60%), validation(20%) and test(20%)
    val numPartitions = 10
		// 6組(即60%),並上手工輸入評價
	    val trainSet = ratings.filter(x=>x._1<6).map(_._2).union(myRatingsRDD).repartition(numPartitions).persist()
    val validationSet = ratings.filter(x=>x._1>=6 && x._1<8).map(_._2).persist()
    val testSet = ratings.filter(x=>x._1>=8).map(_._2).persist()

    val numTrain = trainSet.count()
    val numValidation = validationSet.count()
    val numTest = testSet.count()

    println("Training data: "+numTrain+" Validation data: "+numValidation+" Test data: "+numTest)

  //3.5 Train model and optimize model with validation set
    val numRanks = List(8, 12)
    val numIters = List(10, 20)
    val numLambdas = List(0.1, 10.0)
    var bestRmse = Double.MaxValue
    var bestModel: Option[MatrixFactorizationModel] = None
    var bestRanks = -1
    var bestIters = 0
    var bestLambdas = -1.0
	// 尋找優化參數的模型
    for(rank <- numRanks; iter <- numIters; lambda <- numLambdas){
      val model = ALS.train(trainSet, rank, iter, lambda)
      val validationRmse = computeRmse(model, validationSet)
      println("RMSE(validation) = "+validationRmse+" with ranks="+rank+", iter="+iter+", Lambda="+lambda)

      if (validationRmse < bestRmse) {
        bestModel = Some(model)
        bestRmse = validationRmse
        bestIters = iter
        bestLambdas = lambda
        bestRanks = rank
      }
    }

    //3.6 Evaluate model on test set
	// 用測試集來評估模型
	// 測試集均方根差
    val testRmse = computeRmse(bestModel.get, testSet)
    println("The best model was trained with rank="+bestRanks+", Iter="+bestIters+", Lambda="+bestLambdas+
      " and compute RMSE on test is "+testRmse)

    //3.7 Create a baseline and compare it with best model
	// 創建基線 並與模型進行比較
    val meanRating = trainSet.union(validationSet).map(_.rating).mean() // 訓練集與驗證集和的均數
	// 最佳根均方錯誤線(基線)
    val bestlineRmse = new RegressionMetrics(testSet.map(x=>(x.rating, meanRating))).rootMeanSquaredError  // 測試集與均數的均方根差
	// testRmse(這個數應該更優,值更小)
    val improvement = (bestlineRmse - testRmse)/bestlineRmse*100
    println("The best model improves the baseline by "+"%1.2f".format(improvement)+"%.")

    //3.8 Make a personal recommendation
	// 進行個人推薦, 排除自己已經評分內容
    val moviesId = myRatings.map(_.product)
    val candidates = sc.parallelize(movies.keys.filter(!moviesId.contains(_)).toSeq)
    val recommendations = bestModel.get
    .predict(candidates.map(x=>(0, x)))
    .sortBy(-_.rating)
    .take(50)

    var i = 0
    println("Movies recommended for you:")
    recommendations.foreach{ line=>
      println("%2d".format(i)+" :"+movies(line.product))
      i += 1
    }
  sc.stop()
  }
}

導入引用庫

三、打包部署

程序運行時,需要指定輸入數據路徑,數據包含了ratings.dat和movies.dat,數據都包含在了一個數據包。點擊下載, 然后解壓。
配置運行參數

  • 點擊edit configuration,在左側點擊該項目。在右側在右側VM options中輸入“-Dspark.master=local”,指示本程序本地單線程運行

  • 在Program argguemnts指定,上面解壓的路徑。
    然后,在IDEA上選擇MovieLensALS右鍵選擇運行,即可運行了。
    按照引導,輸入自己的評價后,最后輸出形式如下:

    The best model was trained with rank=12, Iter=20, Lambda=0.1 and compute RMSE on test is 0.868464888081759
    The best model improves the baseline by 22.01%.
    Movies recommended for you:
    0 :Julien Donkey-Boy (1999)
    1 :Love Serenade (1996)
    2 :Catwalk (1995)

四、HADOOP集群部署

導出jar包設置


選main類對后,點擊OK確定, 這個時候配置已經完成了, 我們就可以進行編譯 jar文件了, 選擇菜單Build->Build Artifacts..., 生成的文件路徑為/out/artifacts/MovieLensALS_jar/MovieLensALS.jar

准備HADOOP環境

假設我們的HADOOP環境已經搭建成功。 接下來我們要把需要計算的數據文件上傳到hadoop; 首先,在hadoop上面創建文件夾,命令如下:

hdfs dfs -mkdir -p /recommendation/data

上傳數據文件命令如下:

hdfs dfs -put *.dat /recommendation/data

這時時候我們可以通過命令查看,上傳是否成功

hdfs dfs -cat /recommendation/data/users.dat

運行


在上面紅框中,指定了生成的jar文件名, 所在路徑, 以及MainClass。這面就是通過spark執行:

/usr/local/spark/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --master local  --class "xyz.pl8.MovieLensALS" /home/hartifacts/movielensals_jar/movielensals.jar /recommendation/data


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM