基於Spark的電影推薦系統(推薦系統~4)


第四部分-推薦系統-模型訓練

  • 本模塊基於第3節 數據加工得到的訓練集和測試集數據 做模型訓練,最后得到一系列的模型,進而做 預測。
  • 訓練多個模型,取其中最好,即取RMSE(均方根誤差)值最小的模型

說明幾點

1.ALS 算法不需要自己實現,Spark MLlib 已經實現好了,可以自己 跟源碼學習
花時間鑽研,動手寫,寫代碼 翻譯論文 寫博客 多下功夫
2. 最新http://spark.apache.org/docs/latest/ml-guide.html
3. spark1.6.3
spark.mllib contains the original API built on top of RDDs.
spark.ml provides higher-level API built on top of DataFrames for constructing ML pipelines.

==》 我們采用spark.mllib ,也就是基於RDD之上來構建

學習:http://spark.apache.org/docs/1.6.3/mllib-collaborative-filtering.html#collaborative-filtering

import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating

// Load and parse the data
val data = sc.textFile("data/mllib/als/test.data")
val ratings = data.map(_.split(',') match { case Array(user, item, rate) =>
  Rating(user.toInt, item.toInt, rate.toDouble)
})

// Build the recommendation model using ALS
val rank = 10
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations, 0.01)

// Evaluate the model on rating data
val usersProducts = ratings.map { case Rating(user, product, rate) =>
  (user, product)
}
val predictions =
  model.predict(usersProducts).map { case Rating(user, product, rate) =>
    ((user, product), rate)
  }
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
  ((user, product), rate)
}.join(predictions)
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
  val err = (r1 - r2)
  err * err
}.mean()
println("Mean Squared Error = " + MSE)

// Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
val sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")

看官方是怎么寫代碼的,參照着寫

開始項目Coding

步驟一: 繼續在前面的項目中,新建ml包,再新建ModelTraining

package com.csylh.recommend.ml

import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

/**
  * Description:
  * 訓練多個模型,取其中最好,即取RMSE(均方根誤差)值最小的模型
  *
  * @Author: 留歌36
  * @Date: 2019-07-17 16:56
  */
object ModelTraining {
  def main(args: Array[String]): Unit = {
    // 面向SparkSession編程
    val spark = SparkSession.builder()
      .enableHiveSupport() //開啟訪問Hive數據, 要將hive-site.xml等文件放入Spark的conf路徑
      .getOrCreate()

    val sc = spark.sparkContext

    //  在生產環境中一定要注意設置spark.sql.shuffle.partitions,默認是200,及需要配置分區的數量
    val shuffleMinPartitions = "8"
    spark.sqlContext.setConf("spark.sql.shuffle.partitions",shuffleMinPartitions)

    // 訓練集,總數據集的70%
    val trainingData = spark.sql("select * from trainingData")
    // 測試集,總數據集的30%
    val testData = spark.sql("select * from testData")


    //--------------------------
    // 訓練集,轉為Rating格式
    val ratingRDD = trainingData.rdd.map(x => Rating(x.getInt(0), x.getInt(1), x.getDouble(2)))
    // 用於計算模型的RMSE      Rating(userid, movieid, rating)   ==>轉為tuple  (userid, movieid)
    val training2 :RDD[(Int,Int)]  = ratingRDD.map{ case Rating(userid, movieid, rating) => (userid, movieid)}

    // 測試集,轉為Rating格式
    val testRDD = testData.rdd.map(x => Rating(x.getInt(0), x.getInt(1), x.getDouble(2)))
    val test2 :RDD[((Int,Int),Double)]= testRDD.map {case Rating(userid, movieid, rating) => ((userid, movieid), rating)}
    //--------------------------


    // 特征向量的個數
    val rank = 1
    // 正則因子
    // val lambda = List(0.001, 0.005, 0.01, 0.015)
    val lambda = List(0.001, 0.005, 0.01)
    // 迭代次數
    val iteration = List(10, 15, 18)
    var bestRMSE = Double.MaxValue

    var bestIteration = 0
    var bestLambda = 0.0

    // persist可以根據情況設置其緩存級別
    ratingRDD.persist() // 持久化放入內存,迭代中使用到的RDD都可以持久化
    training2.persist()

    test2.persist()

    for (l <- lambda; i <- iteration) {
      // 循環收斂這個模型
      //lambda 用於表示過擬合的這樣一個參數,值越大,越不容易過擬合,但精確度就低
      val model = ALS.train(ratingRDD, rank, i, l)

      //---------這里是預測-----------------
      val predict = model.predict(training2).map {
        // 根據 (userid, movieid) 預測出相對應的rating
        case Rating(userid, movieid, rating) => ((userid, movieid), rating)
      }
      //-------這里是實際的predictAndFact-------------------

      // 根據(userid, movieid)為key,將提供的rating與預測的rating進行比較
      val predictAndFact = predict.join(test2)

      // 計算RMSE(均方根誤差)
      val MSE = predictAndFact.map {
        case ((user, product), (r1, r2)) =>
          val err = r1 - r2
          err * err
      }.mean()  // 求平均

      val RMSE = math.sqrt(MSE) // 求平方根

      // RMSE越小,代表模型越精確
      if (RMSE < bestRMSE) {
        // 將模型存儲下來
        model.save(sc, s"/tmp/BestModel/$RMSE")
        bestRMSE = RMSE
        bestIteration = i
        bestLambda = l
      }

      println(s"Best model is located in /tmp/BestModel/$RMSE")
      println(s"Best RMSE is $bestRMSE")
      println(s"Best Iteration is $bestIteration")
      println(s"Best Lambda is $bestLambda")
    }
  }
}


步驟二:將創建的項目進行打包上傳到服務器
mvn clean package -Dmaven.test.skip=true

步驟三:編寫shell 執行腳本

[root@hadoop001 ml]# vim model.sh 
export HADOOP_CONF_DIR=/root/app/hadoop-2.6.0-cdh5.7.0/etc/hadoop

$SPARK_HOME/bin/spark-submit \
--class com.csylh.recommend.ml.ModelTraining \
--master spark://hadoop001:7077 \
--name ModelTraining \
--driver-memory 10g \
--executor-memory 5g \
/root/data/ml/movie-recommend-1.0.jar

步驟四:執行 sh model.sh 即可

sh model.sh之前:

[root@hadoop001 ~]# hadoop fs -ls /tmp
19/10/20 20:53:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 10 items
drwx------   - root supergroup          0 2019-04-01 16:27 /tmp/hadoop-yarn
drwx-wx-wx   - root supergroup          0 2019-04-02 09:33 /tmp/hive
drwxr-xr-x   - root supergroup          0 2019-10-20 19:42 /tmp/links
drwxr-xr-x   - root supergroup          0 2019-10-20 19:42 /tmp/movies
drwxr-xr-x   - root supergroup          0 2019-10-20 19:43 /tmp/ratings
drwxr-xr-x   - root supergroup          0 2019-10-20 19:43 /tmp/tags
drwxr-xr-x   - root supergroup          0 2019-10-20 20:19 /tmp/testData
drwxr-xr-x   - root supergroup          0 2019-10-20 20:19 /tmp/trainingData
drwxr-xr-x   - root supergroup          0 2019-10-20 20:18 /tmp/trainingDataAsc
drwxr-xr-x   - root supergroup          0 2019-10-20 20:19 /tmp/trainingDataDesc
[root@hadoop001 ~]#

sh model.sh之后:
這里運行很長時間,而且很有可能出現OOM。耐心等待~~
在這里插入圖片描述
這些點都是要關注的,再就是shuffle 很重要
在這里插入圖片描述
等待中。。。
在這里插入圖片描述

[root@hadoop001 ~]# hadoop fs -ls /tmp/BestModel
19/10/20 21:26:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
drwxr-xr-x   - root supergroup          0 2019-10-20 21:00 /tmp/BestModel/0.8521581387523667
drwxr-xr-x   - root supergroup          0 2019-10-20 20:56 /tmp/BestModel/0.853805599360297
[root@hadoop001 ~]#

這里得到model /tmp/BestModel/0.8521581387523667 ,感覺不是很好。資源要是多一點的話,可以把迭代次數調大一點,估計模型可以更好。這里為了演示整個流程,模型差點就差點吧。思路搞懂就好。

有任何問題,歡迎留言一起交流~~
更多文章:基於Spark的電影推薦系統:https://blog.csdn.net/liuge36/column/info/29285


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM