前言
經過2節對MovieLens數據集的學習,想必讀者對MovieLens數據集認識的不錯了;同時也順帶回顧了些Spark編程技巧,Python數據分析技巧。
本節將是讓人興奮的一節,它將實現一個基於Spark的推薦系統引擎。
PS1:關於推薦算法的理論知識,請讀者先自行學習,本文僅介紹基於ALS矩陣分解算法的Spark推薦引擎實現。
PS2:全文示例將采用Scala語言。
第一步:提取有效特征
1. 首先,啟動spark-shell並分配足夠內存:

2. 載入用戶對影片的評級數據:
1 // 載入評級數據 2 val rawData = sc.textFile("/home/kylin/ml-100k/u.data") 3 // 展示一條記錄 4 rawData.first()
結果為:

3. 切分記錄並返回新的RDD:
1 // 格式化數據集 2 val rawRatings = rawData.map(_.split("\t").take(3)) 3 // 展示一條記錄 4 rawRatings.first()

4. 接下來需要將評分矩陣RDD轉化為Rating格式的RDD:
1 // 導入rating類 2 import org.apache.spark.mllib.recommendation.Rating 3 // 將評分矩陣RDD中每行記錄轉換為Rating類型 4 val ratings = rawRatings.map { case Array(user, movie, rating) => Rating(user.toInt, movie.toInt, rating.toDouble) }
這是因為MLlib的ALS推薦系統算法包只支持Rating格式的數據集。
第二步:訓練推薦模型
接下來可以進行ALS推薦系統模型訓練了。MLlib中的ALS算法接收三個參數:
- rank:對應的是隱因子的個數,這個值設置越高越准,但是也會產生更多的計算量。一般將這個值設置為10-200;
- iterations:對應迭代次數,一般設置個10就夠了;
- lambda:該參數控制正則化過程,其值越高,正則化程度就越深。一般設置為0.01。
1. 首先,執行以下代碼,啟動ALS訓練:
1 // 導入ALS推薦系統算法包 2 import org.apache.spark.mllib.recommendation.ALS 3 // 啟動ALS矩陣分解 4 val model = ALS.train(ratings, 50, 10, 0.01)
這步將會使用ALS矩陣分解算法,對評分矩陣進行分解,且隱特征個數設置為50,迭代10次,正則化參數設為了0.01。
相對其他步驟,訓練耗費的時間最多。運行結果如下:

2. 返回類型為MatrixFactorizationModel對象,它將結果分別保存到兩個(id,factor)RDD里面,分別名為userFeatures和productFeatures。
也就是評分矩陣分解后的兩個子矩陣:

上面展示了id為4的用戶的“隱因子向量”。請注意ALS實現的操作都是延遲性的轉換操作。
第三步:使用ALS推薦模型
1. 預測用戶789對物品123的評分:

2. 為用戶789推薦前10個物品:
1 val userId = 789 2 val K = 10 3 4 // 獲取推薦列表 5 val topKRecs = model.recommendProducts(userId, K) 6 // 打印推薦列表 7 println(topKRecs.mkString("\n"))
結果為:

3. 初步檢驗推薦效果
獲取到各個用戶的推薦列表后,想必大家都想先看看用戶評分最高的電影,和給他推薦的電影是不是有相似。
3.1 創建電影id - 電影名字典:
1 // 導入電影數據集 2 val movies = sc.textFile("/home/kylin/ml-100k/u.item") 3 // 建立電影id - 電影名字典 4 val titles = movies.map(line => line.split("\\|").take(2)).map(array => (array(0).toInt, array(1))).collectAsMap() 5 // 查看id為123的電影名 6 titles(123)
結果為:

這樣后面就可以根據電影的id找到電影名了。
3.2 獲取某用戶的所有觀影記錄並打印:
1 // 建立用戶名-其他RDD,並僅獲取用戶789的記錄 2 val moviesForUser = ratings.keyBy(_.user).lookup(789) 3 // 獲取用戶評分最高的10部電影,並打印電影名和評分值 4 moviesForUser.sortBy(-_.rating).take(10).map(rating => (titles(rating.product), rating.rating)).foreach(println)
結果為:

3.3 獲取某用戶推薦列表並打印:

讀者可以自行對比這兩組列表是否有相似性。
第四步:物品推薦
很多時候還有另一種需求:就是給定一個物品,找到它的所有相似物品。
遺憾的是MLlib里面竟然沒有包含內置的函數,需要自己用jblas庫來實現 = =#。
1. 導入jblas庫矩陣類,並創建一個余弦相似度計量函數:
1 // 導入jblas庫中的矩陣類 2 import org.jblas.DoubleMatrix 3 // 定義相似度函數 4 def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = { 5 vec1.dot(vec2) / (vec1.norm2() * vec2.norm2()) 6 }
2. 接下來獲取物品(本例以物品567為例)的因子特征向量,並將它轉換為jblas的矩陣格式:
1 // 選定id為567的電影 2 val itemId = 567 3 // 獲取該物品的隱因子向量 4 val itemFactor = model.productFeatures.lookup(itemId).head 5 // 將該向量轉換為jblas矩陣類型 6 val itemVector = new DoubleMatrix(itemFactor)
3. 計算物品567和所有其他物品的相似度:
1 // 計算電影567與其他電影的相似度 2 val sims = model.productFeatures.map{ case (id, factor) => 3 val factorVector = new DoubleMatrix(factor) 4 val sim = cosineSimilarity(factorVector, itemVector) 5 (id, sim) 6 } 7 // 獲取與電影567最相似的10部電影 8 val sortedSims = sims.top(K)(Ordering.by[(Int, Double), Double] { case (id, similarity) => similarity }) 9 // 打印結果 10 println(sortedSims.mkString("\n"))
結果為:

其中0.999999當然就是自己跟自己的相似度了。
4. 查看推薦結果:
1 // 打印電影567的影片名 2 println(titles(567)) 3 // 獲取和電影567最相似的11部電影(含567自己) 4 val sortedSims2 = sims.top(K + 1)(Ordering.by[(Int, Double), Double] { case (id, similarity) => similarity }) 5 // 再打印和電影567最相似的10部電影 6 sortedSims2.slice(1, 11).map{ case (id, sim) => (titles(id), sim) }.mkString("\n")
結果為:

看看,這些電影是不是和567相似?
第五步:推薦效果評估
在Spark的ALS推薦系統中,最常用到的兩個推薦指標分別為MSE和MAPK。其中MSE就是均方誤差,是基於評分矩陣的推薦系統的必用指標。那么MAPK又是什么呢?
它稱為K值平均准確率,最多用於TopN推薦中,它表示數據集范圍內K個推薦物品與實際用戶購買物品的吻合度。具體公式請讀者自行參考有關文檔。
本文推薦系統就是一個[基於用戶-物品評分矩陣的TopN推薦系統],下面步驟分別用來獲取本文推薦系統中的這兩個指標。
PS:記得先要導入jblas庫。
1. 首先計算MSE和RMSE:
1 // 創建用戶id-影片id RDD 2 val usersProducts = ratings.map{ case Rating(user, product, rating) => (user, product)} 3 // 創建(用戶id,影片id) - 預測評分RDD 4 val predictions = model.predict(usersProducts).map{ 5 case Rating(user, product, rating) => ((user, product), rating) 6 } 7 // 創建用戶-影片實際評分RDD,並將其與上面創建的預測評分RDD join起來 8 val ratingsAndPredictions = ratings.map{ 9 case Rating(user, product, rating) => ((user, product), rating) 10 }.join(predictions) 11 12 // 導入RegressionMetrics類 13 import org.apache.spark.mllib.evaluation.RegressionMetrics 14 // 創建預測評分-實際評分RDD 15 val predictedAndTrue = ratingsAndPredictions.map { case ((user, product), (actual, predicted)) => (actual, predicted) } 16 // 創建RegressionMetrics對象 17 val regressionMetrics = new RegressionMetrics(predictedAndTrue) 18 19 // 打印MSE和RMSE 20 println("Mean Squared Error = " + regressionMetrics.meanSquaredError) 21 println("Root Mean Squared Error = " + regressionMetrics.rootMeanSquaredError)
基本原理是將實際評分-預測評分扔到RegressionMetrics類里,該類提供了mse和rmse成員,可直接輸出獲取。
結果為:

2. 計算MAPK:
// 創建電影隱因子RDD,並將它廣播出去 val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect() val itemMatrix = new DoubleMatrix(itemFactors) val imBroadcast = sc.broadcast(itemMatrix) // 創建用戶id - 推薦列表RDD val allRecs = model.userFeatures.map{ case (userId, array) => val userVector = new DoubleMatrix(array) val scores = imBroadcast.value.mmul(userVector) val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1) val recommendedIds = sortedWithId.map(_._2 + 1).toSeq (userId, recommendedIds) } // 創建用戶 - 電影評分ID集RDD val userMovies = ratings.map{ case Rating(user, product, rating) => (user, product) }.groupBy(_._1) // 導入RankingMetrics類 import org.apache.spark.mllib.evaluation.RankingMetrics // 創建實際評分ID集-預測評分ID集 RDD val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) => val actual = actualWithIds.map(_._2) (predicted.toArray, actual.toArray) } // 創建RankingMetrics對象 val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking) // 打印MAPK println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)
結果為:

比較坑的是不能設置K,也就是說,計算的實際是MAP...... 正如屬性名:meanAveragePrecision。
小結
感覺MLlib的推薦系統真的很一般,一方面支持的類型少 - 只支持ALS;另一方面支持的推薦系統算子也少,連輸出個RMSE指標都要寫好幾行代碼,太不方便了。
唯一的好處是因為接近底層,所以可以讓使用者看到些實現的細節,對原理更加清晰。
