本文內容和代碼是接着上篇文章來寫的,推薦先看一下哈~
我們上一篇文章是寫了電影推薦的實現,但是推薦內容是否合理呢,這就需要我們對模型進行評估
針對推薦模型,這里根據 均方差 和 K值平均准確率 來對模型進行評估,MLlib也對這幾種評估方法都有提供內置的函數
在真實情況下,是要不斷地對推薦模型的三個關鍵參數 rank、iterations、lambda 分別選取不同的值,然后對不同參數生成的模型進行評估,從而選取出最好的模型。
下面就對兩種推薦模型評估的方法進行說明~
1、均方差(MSE) 和 均方根誤差(RMSE)
定義:各平方誤差的和與總數目的商。其實可以理解為 預測到的評級 與 真實評級的差值 的平方。
均方根誤差的使用也很普遍,其計算只需在MSE上取平方根即可~
評估代碼為:
//格式:(userID,電影)
val userProducts: RDD[(Int, Int)] = ratings.map(rating => (rating.user, rating.product)) //模型推測出的評分信息,格式為:((userID,電影), 推測評分)
val predictions: RDD[((Int, Int), Double)] = model.predict(userProducts).map(rating => ((rating.user, rating.product),rating.rating)) //格式為:((userID,電影), (真實平評分,推測評分))
val ratingsAndPredictions: RDD[((Int, Int), (Double, Double))] = ratings.map(rating => ((rating.user, rating.product), rating.rating)) .join(predictions) //均方差
val MSE = ratingsAndPredictions.map(rap => math.pow(rap._2._1 - rap._2._2, 2)).reduce(_+_) / ratingsAndPredictions.count() println("MSE:" + MSE) //均方根誤差
val RMSE: Double = math.sqrt(MSE)
println("RMSE:" + RMSE)
上面是我們自己算出來的,也可以用MLlib內置的函數來算:
import org.apache.spark.mllib.evaluation.{RegressionMetrics, RankingMetrics} val predictedAndTrue: RDD[(Double, Double)] = ratingsAndPredictions.map{ case((userID, product),(actual, predict)) => (actual, predict)} val regressionMetrics: RegressionMetrics = new RegressionMetrics(predictedAndTrue) println("MSE:" + regressionMetrics.meanSquaredError) println("RMSE:" + regressionMetrics.rootMeanSquaredError)
輸出為:
MSE:0.08231947642632852 RMSE:0.2869137090247319
2、K值平均准確率(MAPK)
K值平均准確率(MAPK)的意思是整個數據集上的K值平均准確率(APK)的均值。APK是信息檢索中常用的一個指標。它用於衡量針對某個查詢所返回的“前K個”文檔的平均相關性。
如果結果中文檔的實際相關性越高且排名也更靠前,那APK分值也就越高。如果在預測結果中得分更高(在推薦列表中排名也更靠前)的物品實際上也與用戶更相關,那自然這個模型就更好。
ok,MAPK評估代碼如下:
package ml import org.apache.spark.mllib.evaluation.RankingMetrics import org.apache.spark.mllib.recommendation.{Rating, ALS} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext, SparkConf} import org.jblas.DoubleMatrix import sql.StreamingExamples import scala.collection.Map object MAPKTest{ def main(args: Array[String]) { StreamingExamples.setStreamingLogLevels() val conf = new SparkConf().setAppName("MAPKTest").setMaster("local[*]") val sc = new SparkContext(conf) /*用戶 電影 評分*/ val rawData: RDD[String] = sc.textFile("file:///E:/spark/ml-100k/u.data") //去掉時間的字段,rawRatings:Array
val rawRatings = rawData.map(_.split("\\t").take(3)) //user moive rating
val ratings = rawRatings.map{case Array(user, movie, rating) =>{ Rating(user.toInt, movie.toInt, rating.toDouble) }} /** * 得到訓練的模型 * 注意:50代表我們得到的模型的因子的列的數量,名稱叫 因子維數 */ val model = ALS.train(ratings, 50, 10, 0.01) /*獲取模型中所有商品的 factor,並轉換成矩陣*/ val itemFactors: Array[Array[Double]] = model.productFeatures.map{case (id, factor) => factor}.collect() val itemMatrix: DoubleMatrix = new DoubleMatrix(itemFactors) // println(itemMatrix.rows, itemMatrix.columns)
/*獲得模型中每個用戶對應的每個電影的評分*/ val allRecs = model.userFeatures.map{ case(userId, factor) => { val userVector = new DoubleMatrix(factor) /** * socres是一個DoubleMatrix類型,值為1行N列的 Vector * 為什么可以通過判斷這兩個矩陣的乘積的大小,從而來判斷分數呢? * 這歸根於ALS算法,該算法是將一個 用戶-商品 的矩陣 拆分成 用戶、商品兩個矩陣 * 因此這兩個矩陣的乘積就是實際的 分數 */ val scores = itemMatrix.mmul(userVector)//矩陣和向量的乘積,求出每個用戶的分數 //根據評分倒數排序
val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1) //(score, itemId)
val recommendIds = sortedWithId.map(_._2 + 1).toSeq //返回用戶 和 各個商品評分的倒數的值 的 tuple: (userId,(sorce, itemId))
(userId, recommendIds) }} /*獲取實際中的 每個用戶對應的有評分過的電影的評分*/ val userMoives: RDD[(Int, Iterable[(Int, Int)])] = ratings.map{ case Rating(user, product, rating) => { (user, product) }}.groupBy(_._1) val predictedAndTrueForRanking = allRecs.join(userMoives).map{ case( userId, (predicted, actualWithIds) ) => { //實際的商品編號
val actual = actualWithIds.map(_._2) (actual.toArray, predicted.toArray) }} val rankingMetrics: RankingMetrics[Int] = new RankingMetrics(predictedAndTrueForRanking) println("使用內置的計算MAP:" + rankingMetrics.meanAveragePrecision) }
輸出結果為:
使用內置的計算MAP:0.0630466936422453
3、推薦模型完整代碼
package ml import org.apache.spark.mllib.evaluation.{RegressionMetrics, RankingMetrics} import org.apache.spark.mllib.recommendation.{Rating, ALS} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext, SparkConf} import org.jblas.DoubleMatrix import sql.StreamingExamples import scala.collection.Map /** * 基於Spark MLlib 的推薦算法 * ALS:最小二乘法 * * @author lwj * @date 2018/05/04 */
object Recommend{ /** * 用於商品推薦 * 通過傳入兩個向量,返回這兩個向量之間的余弦相似度 * * @param vec1 * @param vec2 * @return */ def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = { vec1.dot(vec2) / (vec1.norm2() * vec2.norm2()) } /** * 模型評估 * K值平均准確率(APK) * * @param actual * @param predicted * @param k * @return */ def avgPrecisionK(actual: Seq[Int], predicted: Seq[Int], k: Int) : Double = { val predK: Seq[Int] = predicted.take(k) var score = 0.0
var numHits = 0.0
for ((p, i) <- predK.zipWithIndex){ if (actual.contains(p)){ numHits += 1.0 score += numHits / (i.toDouble + 1.0) //TODO 為什么除以i.toDouble
} } if (actual.isEmpty){ 1.0 }else{ score / math.min(actual.size, k).toDouble //TODO 為什么是min
} } def main(args: Array[String]) { StreamingExamples.setStreamingLogLevels() val conf = new SparkConf().setAppName("recommandTest").setMaster("local[*]") val sc = new SparkContext(conf) /*用戶 電影 評分*/ val rawData: RDD[String] = sc.textFile("file:///E:/spark/ml-100k/u.data") //去掉時間的字段,rawRatings:Array
val rawRatings = rawData.map(_.split("\\t").take(3)) //user moive rating
val ratings = rawRatings.map{case Array(user, movie, rating) =>{ Rating(user.toInt, movie.toInt, rating.toDouble) }} //電影
val movies: RDD[String] = sc.textFile("file:///E:/spark/ml-100k/u.item") //電影ID 電影名
val titles: Map[Int, String] = movies.map(_.split("\\|").take(2)).map(array => (array(0).toInt, array(1))).collectAsMap() /** * 得到訓練的模型 * 注意:50代表我們得到的模型的因子的列的數量,名稱叫 因子維數 */ val model = ALS.train(ratings, 50, 10, 0.01) /** * 基於用戶進行推薦 */
//用戶因子的數量 // println(mode.userFeatures.count()) //商品因子的數量 // println(mode.productFeatures.count()) //查看某個用戶對某個商品的預測評分,ALS模型的初始化是隨機的,所以產生的結果可能會不同 // println(mode.predict(789, 123)) //為指定的用戶推薦 N 個商品
val userID = 789 val K = 10 val topKRecs: Array[Rating] = model.recommendProducts(userID, 10) // println(topKRecs.mkString("\n")) //獲取指定用戶所評價過的電影
val moviesForUser: Seq[Rating] = ratings.keyBy(_.user).lookup(789) //打印出指定用戶評價最高的10部電影的名稱和評分
println("真實的:") moviesForUser.sortBy(-_.rating).take(10).map(rating => { (titles(rating.product),rating.rating) }).foreach(println) //打印出推薦給用戶的10部電影的名稱和評分,和上面的進行比較
println("推薦的:") topKRecs.map(rating => { (titles(rating.product),rating.rating) }).foreach(println) println("\n-----------------------\n") /** * 基於商品進行推薦 */
/*通過商品ID獲得與該商品相似的商品*/ val itemId = 567 val itemFactor: Array[Double] = model.productFeatures.lookup(itemId).head val itemVector: DoubleMatrix = new DoubleMatrix(itemFactor) //獲得每個商品與給出的商品的余弦相似度
val sims = model.productFeatures.map{case (id, factor) => { val factorVector = new DoubleMatrix(factor) val sim = cosineSimilarity(factorVector, itemVector) (id, sim) }} //打印出前N的商品
val topItem: Array[(Int, Double)] = sims.sortBy(-_._2).take(10 + 1) println("與567商品相似的商品:\n" + topItem.mkString("\n") + "\n") /*校驗商品*/ println("給定的商品名稱為: " + titles(itemId)) println("相似的商品名稱為:") topItem.slice(1, 11).foreach(item => println(titles(item._1))) println("\n-----------------------\n") /*模型評估*/
/** * 均方差評估 * 對model全量數據進行評估 */
// val actualRating: Rating = moviesForUser.take(1)(0) // val predictedRating: Double = model.predict(789, actualRating.product) // println("\n真實分:" + actualRating.rating + " 預測分:" + predictedRating) //格式:(userID,電影)
val userProducts: RDD[(Int, Int)] = ratings.map(rating => (rating.user, rating.product)) //模型推測出的評分信息,格式為:((userID,電影), 推測評分)
val predictions: RDD[((Int, Int), Double)] = model.predict(userProducts).map(rating => ((rating.user, rating.product),rating.rating)) //格式為:((userID,電影), (真實平評分,推測評分))
val ratingsAndPredictions: RDD[((Int, Int), (Double, Double))] = ratings.map(rating => ((rating.user, rating.product), rating.rating)) .join(predictions) //均方差
val MSE = ratingsAndPredictions.map(rap => math.pow(rap._2._1 - rap._2._2, 2)).reduce(_+_) / ratingsAndPredictions.count() println("均方差MSE為: " + MSE) //均方根誤差
val RMSE: Double = math.sqrt(MSE) println("均方根誤差RMSE為: " + RMSE) /** * K值平均准確率評估 * 注意:該評估模型是針對對用戶感興趣和回去接觸的物品的預測能力 * 也是就是說:這時針對基於用戶推薦的 模型的評估 */
/*計算 單個 指定用戶推薦的APK指標*/ val actualMovies: Seq[Int] = moviesForUser.map(_.product) val predictedMovies: Array[Int] = topKRecs.map(_.product) val apk10: Double = avgPrecisionK(actualMovies, predictedMovies, 10) println("789的APK值為:" + apk10) /*獲取模型中所有商品的 factor,並轉換成矩陣*/ val itemFactors: Array[Array[Double]] = model.productFeatures.map{case (id, factor) => factor}.collect() val itemMatrix: DoubleMatrix = new DoubleMatrix(itemFactors) // println(itemMatrix.rows, itemMatrix.columns)
/*獲得模型中每個用戶對應的每個電影的評分*/ val allRecs = model.userFeatures.map{ case(userId, factor) => { val userVector = new DoubleMatrix(factor) /** * socres是一個DoubleMatrix類型,值為1行N列的 Vector * 為什么可以通過判斷這兩個矩陣的乘積的大小,從而來判斷分數呢? * 這歸根於ALS算法,該算法是將一個 用戶-商品 的矩陣 拆分成 用戶、商品兩個矩陣 * 因此這兩個矩陣的乘積就是實際的 分數 */ val scores = itemMatrix.mmul(userVector)//矩陣和向量的乘積,求出每個用戶的分數 //根據評分倒數排序
val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1) //(score, itemId)
val recommendIds = sortedWithId.map(_._2 + 1).toSeq //返回用戶 和 各個商品評分的倒數的值 的 tuple: (userId,(sorce, itemId))
(userId, recommendIds) }} /*獲取實際中的 每個用戶對應的有評分過的電影的評分*/ val userMoives: RDD[(Int, Iterable[(Int, Int)])] = ratings.map{ case Rating(user, product, rating) => { (user, product) }}.groupBy(_._1) val MAPK = allRecs.join(userMoives).map{ case( userId, (predicted, actualWithIds) ) => { //實際的商品編號
val actual = actualWithIds.map(_._2).toSeq avgPrecisionK(actual, predicted, 10) }}.reduce(_ + _) / allRecs.count println("MAPK:" + MAPK) println("\n-----------------------\n") /** * 使用MLlib內置的評估器 */
/*RMSE 和 MSE*/ val predictedAndTrue: RDD[(Double, Double)] = ratingsAndPredictions.map{ case((userID, product),(actual, predict)) => (actual, predict)} val regressionMetrics: RegressionMetrics = new RegressionMetrics(predictedAndTrue) println("使用內置的計算MSE:" + regressionMetrics.meanSquaredError) println("使用內置的計算RMSE:" + regressionMetrics.rootMeanSquaredError) /*MAPK*/ val predictedAndTrueForRanking = allRecs.join(userMoives).map{ case( userId, (predicted, actualWithIds) ) => { //實際的商品編號
val actual = actualWithIds.map(_._2) (actual.toArray, predicted.toArray) }} val rankingMetrics: RankingMetrics[Int] = new RankingMetrics(predictedAndTrueForRanking) println("使用內置的計算MAP:" + rankingMetrics.meanAveragePrecision) } }
