第三篇:一個Spark推薦系統引擎的實現


前言

       經過2節對MovieLens數據集的學習,想必讀者對MovieLens數據集認識的不錯了;同時也順帶回顧了些Spark編程技巧,Python數據分析技巧。

       本節將是讓人興奮的一節,它將實現一個基於Spark的推薦系統引擎。

       PS1:關於推薦算法的理論知識,請讀者先自行學習,本文僅介紹基於ALS矩陣分解算法的Spark推薦引擎實現

       PS2:全文示例將采用Scala語言。

第一步:提取有效特征

       1. 首先,啟動spark-shell並分配足夠內存:

       

       2. 載入用戶對影片的評級數據:

1 // 載入評級數據
2 val rawData = sc.textFile("/home/kylin/ml-100k/u.data")
3 // 展示一條記錄
4 rawData.first()

       結果為:

       

       3. 切分記錄並返回新的RDD:

1 // 格式化數據集
2 val rawRatings = rawData.map(_.split("\t").take(3))
3 // 展示一條記錄
4 rawRatings.first()

       

       4. 接下來需要將評分矩陣RDD轉化為Rating格式的RDD:

1 // 導入rating類
2 import org.apache.spark.mllib.recommendation.Rating
3 // 將評分矩陣RDD中每行記錄轉換為Rating類型
4 val ratings = rawRatings.map { case Array(user, movie, rating) => Rating(user.toInt, movie.toInt, rating.toDouble) }

       這是因為MLlib的ALS推薦系統算法包只支持Rating格式的數據集。

第二步:訓練推薦模型

       接下來可以進行ALS推薦系統模型訓練了。MLlib中的ALS算法接收三個參數:

- rank:對應的是隱因子的個數,這個值設置越高越准,但是也會產生更多的計算量。一般將這個值設置為10-200;
- iterations:對應迭代次數,一般設置個10就夠了;
- lambda:該參數控制正則化過程,其值越高,正則化程度就越深。一般設置為0.01。

       1. 首先,執行以下代碼,啟動ALS訓練:

1 // 導入ALS推薦系統算法包
2 import org.apache.spark.mllib.recommendation.ALS
3 // 啟動ALS矩陣分解
4 val model = ALS.train(ratings, 50, 10, 0.01)

       這步將會使用ALS矩陣分解算法,對評分矩陣進行分解,且隱特征個數設置為50,迭代10次,正則化參數設為了0.01。

       相對其他步驟,訓練耗費的時間最多。運行結果如下:

       

       2. 返回類型為MatrixFactorizationModel對象,它將結果分別保存到兩個(id,factor)RDD里面,分別名為userFeatures和productFeatures。

       也就是評分矩陣分解后的兩個子矩陣:

       

       上面展示了id為4的用戶的“隱因子向量”。請注意ALS實現的操作都是延遲性的轉換操作。

第三步:使用ALS推薦模型

       1. 預測用戶789對物品123的評分:

       

       2. 為用戶789推薦前10個物品:

1 val userId = 789
2 val K = 10
3  
4 // 獲取推薦列表
5 val topKRecs = model.recommendProducts(userId, K)
6 // 打印推薦列表
7 println(topKRecs.mkString("\n"))

       結果為:

       

       3. 初步檢驗推薦效果

       獲取到各個用戶的推薦列表后,想必大家都想先看看用戶評分最高的電影,和給他推薦的電影是不是有相似。

       3.1 創建電影id - 電影名字典:

1 // 導入電影數據集
2 val movies = sc.textFile("/home/kylin/ml-100k/u.item")
3 // 建立電影id - 電影名字典
4 val titles = movies.map(line => line.split("\\|").take(2)).map(array => (array(0).toInt, array(1))).collectAsMap()
5 // 查看id為123的電影名
6 titles(123)

       結果為:

       

       這樣后面就可以根據電影的id找到電影名了。

       3.2 獲取某用戶的所有觀影記錄並打印:

1 // 建立用戶名-其他RDD,並僅獲取用戶789的記錄
2 val moviesForUser = ratings.keyBy(_.user).lookup(789)
3 // 獲取用戶評分最高的10部電影,並打印電影名和評分值
4 moviesForUser.sortBy(-_.rating).take(10).map(rating => (titles(rating.product), rating.rating)).foreach(println)

       結果為:

       

       3.3 獲取某用戶推薦列表並打印:

       

       讀者可以自行對比這兩組列表是否有相似性。

第四步:物品推薦

       很多時候還有另一種需求:就是給定一個物品,找到它的所有相似物品。

       遺憾的是MLlib里面竟然沒有包含內置的函數,需要自己用jblas庫來實現 = =#。

       1. 導入jblas庫矩陣類,並創建一個余弦相似度計量函數:

1 // 導入jblas庫中的矩陣類
2 import org.jblas.DoubleMatrix
3 // 定義相似度函數
4 def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
5     vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
6 }

       2. 接下來獲取物品(本例以物品567為例)的因子特征向量,並將它轉換為jblas的矩陣格式:

1 // 選定id為567的電影
2 val itemId = 567
3 // 獲取該物品的隱因子向量
4 val itemFactor = model.productFeatures.lookup(itemId).head
5 // 將該向量轉換為jblas矩陣類型
6 val itemVector = new DoubleMatrix(itemFactor)

       3. 計算物品567和所有其他物品的相似度:

 1 // 計算電影567與其他電影的相似度
 2 val sims = model.productFeatures.map{ case (id, factor) => 
 3     val factorVector = new DoubleMatrix(factor)
 4     val sim = cosineSimilarity(factorVector, itemVector)
 5     (id, sim)
 6 }
 7 // 獲取與電影567最相似的10部電影
 8 val sortedSims = sims.top(K)(Ordering.by[(Int, Double), Double] { case (id, similarity) => similarity })
 9 // 打印結果
10 println(sortedSims.mkString("\n"))

       結果為:

       

       其中0.999999當然就是自己跟自己的相似度了。

       4. 查看推薦結果:

1 // 打印電影567的影片名
2 println(titles(567))
3 // 獲取和電影567最相似的11部電影(含567自己)
4 val sortedSims2 = sims.top(K + 1)(Ordering.by[(Int, Double), Double] { case (id, similarity) => similarity })
5 // 再打印和電影567最相似的10部電影
6 sortedSims2.slice(1, 11).map{ case (id, sim) => (titles(id), sim) }.mkString("\n")

       結果為:

       

       看看,這些電影是不是和567相似?

第五步:推薦效果評估

       在Spark的ALS推薦系統中,最常用到的兩個推薦指標分別為MSEMAPK。其中MSE就是均方誤差,是基於評分矩陣的推薦系統的必用指標。那么MAPK又是什么呢?

       它稱為K值平均准確率,最多用於TopN推薦中,它表示數據集范圍內K個推薦物品與實際用戶購買物品的吻合度。具體公式請讀者自行參考有關文檔。

       本文推薦系統就是一個[基於用戶-物品評分矩陣的TopN推薦系統],下面步驟分別用來獲取本文推薦系統中的這兩個指標。

       PS:記得先要導入jblas庫。

       1. 首先計算MSE和RMSE:

 1 // 創建用戶id-影片id RDD
 2 val usersProducts = ratings.map{ case Rating(user, product, rating)  => (user, product)}
 3 // 創建(用戶id,影片id) - 預測評分RDD
 4 val predictions = model.predict(usersProducts).map{
 5     case Rating(user, product, rating) => ((user, product), rating)
 6 }
 7 // 創建用戶-影片實際評分RDD,並將其與上面創建的預測評分RDD join起來
 8 val ratingsAndPredictions = ratings.map{
 9     case Rating(user, product, rating) => ((user, product), rating)
10 }.join(predictions)
11  
12 // 導入RegressionMetrics類
13 import org.apache.spark.mllib.evaluation.RegressionMetrics
14 // 創建預測評分-實際評分RDD
15 val predictedAndTrue = ratingsAndPredictions.map { case ((user, product), (actual, predicted)) => (actual, predicted) }
16 // 創建RegressionMetrics對象
17 val regressionMetrics = new RegressionMetrics(predictedAndTrue)
18  
19 // 打印MSE和RMSE
20 println("Mean Squared Error = " + regressionMetrics.meanSquaredError)
21 println("Root Mean Squared Error = " + regressionMetrics.rootMeanSquaredError)

       基本原理是將實際評分-預測評分扔到RegressionMetrics類里,該類提供了mse和rmse成員,可直接輸出獲取。

       結果為:

       

       2. 計算MAPK:

// 創建電影隱因子RDD,並將它廣播出去
val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect()
val itemMatrix = new DoubleMatrix(itemFactors)
val imBroadcast = sc.broadcast(itemMatrix)
 
// 創建用戶id - 推薦列表RDD
val allRecs = model.userFeatures.map{ case (userId, array) => 
  val userVector = new DoubleMatrix(array)
  val scores = imBroadcast.value.mmul(userVector)
  val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
  val recommendedIds = sortedWithId.map(_._2 + 1).toSeq
  (userId, recommendedIds)
}
 
// 創建用戶 - 電影評分ID集RDD
val userMovies = ratings.map{ case Rating(user, product, rating) => (user, product) }.groupBy(_._1)
 
// 導入RankingMetrics類
import org.apache.spark.mllib.evaluation.RankingMetrics
// 創建實際評分ID集-預測評分ID集 RDD
val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) => 
  val actual = actualWithIds.map(_._2)
  (predicted.toArray, actual.toArray)
}
// 創建RankingMetrics對象
val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
// 打印MAPK
println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)

       結果為:

       

       比較坑的是不能設置K,也就是說,計算的實際是MAP...... 正如屬性名:meanAveragePrecision。

小結

       感覺MLlib的推薦系統真的很一般,一方面支持的類型少 - 只支持ALS;另一方面支持的推薦系統算子也少,連輸出個RMSE指標都要寫好幾行代碼,太不方便了。

       唯一的好處是因為接近底層,所以可以讓使用者看到些實現的細節,對原理更加清晰。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM