Spark機器學習之協同過濾算法
一)、協同過濾
1.1 概念
協同過濾是一種借助"集體計算"的途徑。它利用大量已有的用戶偏好來估計用戶對其未接觸過的物品的喜好程度。其內在思想是相似度的定義
1.2 分類
1.在基於用戶的方法的中,如果兩個用戶表現出相似的偏好(即對相同物品的偏好大體相同),那就認為他們的興趣類似。要對他們中的一個用戶推薦一個未知物品,
便可選取若干與其類似的用戶並根據他們的喜好計算出對各個物品的綜合得分,再以得分來推薦物品。其整體的邏輯是,如果其他用戶也偏好某些物品,那這些物品很可能值得推薦。
2. 同樣也可以借助基於物品的方法來做推薦。這種方法通常根據現有用戶對物品的偏好或是評級情況,來計算物品之間的某種相似度。
這時,相似用戶評級相同的那些物品會被認為更相近。一旦有了物品之間的相似度,便可用用戶接觸過的物品來表示這個用戶,然后找出和這些已知物品相似的那些物品,
並將這些物品推薦給用戶。同樣,與已有物品相似的物品被用來生成一個綜合得分,而該得分用於評估未知物品的相似度。
二)、矩陣分解
Spark推薦模型庫當前只包含基於矩陣分解(matrix factorization)的實現,由此我們也將重點關注這類模型。它們有吸引人的地方。首先,這些模型在協同過濾
中的表現十分出色。而在Netflix Prize等知名比賽中的表現也很拔尖
1,顯式矩陣分解
要找到和“用戶物品”矩陣近似的k維(低階)矩陣,最終要求出如下兩個矩陣:一個用於表示用戶的U × k維矩陣,以及一個表征物品的I × k維矩陣。
這兩個矩陣也稱作因子矩陣。它們的乘積便是原始評級矩陣的一個近似。值得注意的是,原始評級矩陣通常很稀疏,但因子矩陣卻是稠密的。
特點:
因子分解類模型的好處在於,一旦建立了模型,對推薦的求解便相對容易。但也有弊端,即當用戶和物品的數量很多時,其對應的物品或是用戶的因子向量可能達到數以百萬計。
這將在存儲和計算能力上帶來挑戰。另一個好處是,這類模型的表現通常都很出色。
2,隱式矩陣分解(關聯因子分確定,可能隨時會變化)
隱式模型仍然會創建一個用戶因子矩陣和一個物品因子矩陣。但是,模型所求解的是偏好矩陣而非評級矩陣的近似。類似地,此時用戶因子向量和物品因子向量的點積所得到的分數
也不再是一個對評級的估值,而是對某個用戶對某一物品偏好的估值(該值的取值雖並不嚴格地處於0到1之間,但十分趨近於這個區間)
3,最小二乘法(Alternating Least Squares ALS):解決矩陣分解的最優化方法
ALS的實現原理是迭代式求解一系列最小二乘回歸問題。在每一次迭代時,固定用戶因子矩陣或是物品因子矩陣中的一個,然后用固定的這個矩陣以及評級數據來更新另一個矩陣。
之后,被更新的矩陣被固定住,再更新另外一個矩陣。如此迭代,直到模型收斂(或是迭代了預設好的次數)。
三)、Spark下ALS算法的應用
1,數據來源電影集ml-100k
2,代碼實現
基於用戶相似度片段代碼:
val movieFile=sc.textFile(fileName) val RatingDatas=movieFile.map(_.split("\t").take(3)) //轉為Ratings數據 val ratings=RatingDatas.map(x =>Rating(x(0).toInt,x(1).toInt,x(2).toDouble)) //獲取用戶評價模型,設置k因子,和迭代次數,隱藏因子lambda,獲取模型 val model=ALS.train(ratings,50,10,0.01) //基於用戶相似度推薦 println("userNumber:"+model.userFeatures.count()+"\t"+"productNum:"+model.productFeatures.count()) //指定用戶及商品,輸出預測值 println(model.predict(789,123)) //為指定用戶推薦的前N商品 model.recommendProducts(789,11).foreach(println(_)) //為每個人推薦前十個商品 model.recommendProductsForUsers(10).take(1).foreach{ case(x,rating) =>println(rating(0)) }
基於商品相似度代碼:
計算相似度的方法有相似度是通過某種方式比較表示兩個物品的向量而得到的。常見的相似度衡量方法包括皮爾森相關系數(Pearson correlation)、針對實數向量的余弦相
似度(cosine similarity)和針對二元向量的傑卡德相似系數(Jaccard similarity)。
val itemFactory=model.productFeatures.lookup(567).head val itemVector=new DoubleMatrix(itemFactory) //求余弦相似度 val sim=model.productFeatures.map{ case(id,factory)=> val factorVector=new DoubleMatrix(factory) val sim=cosineSimilarity(factorVector,itemVector) (id,sim) } val sortedsim=sim.top(11)(Ordering.by[(Int,Double),Double]{ case(id,sim)=>sim }) println(sortedsim.take(10).mkString("\n")) def cosineSimilarity(vec1:DoubleMatrix,vec2:DoubleMatrix):Double={ vec1.dot(vec2)/(vec1.norm2()*vec2.norm2()) }
均方差評估模型代碼:
//模型評估,通過均誤差 //實際用戶評估值 val actualRatings=ratings.map{ case Rating(user,item,rats) => ((user,item),rats) } val userItems=ratings.map{ case(Rating(user,item,rats)) => (user,item) } //模型的用戶對商品的預測值 val predictRatings=model.predict(userItems).map{ case(Rating(user,item,rats)) =>((user,item),rats) } //聯合獲取rate值 val rates=actualRatings.join(predictRatings).map{ case x =>(x._2._1,x._2._2) } //求均方差 val regressionMetrics=new RegressionMetrics(rates) //越接近0越佳 println(regressionMetrics.meanSquaredError)
全局准確率評估(MAP):使用MLlib的 RankingMetrics 類來計算基於排名的評估指標。類似地,需要向我們之前的平均准確率函數傳入一個鍵值對類型的RDD。
其鍵為給定用戶預測的推薦物品的ID數組,而值則是實際的物品ID數組。
//全局平均准確率(MAP) val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect() val itemMatrix = new DoubleMatrix(itemFactors) //分布式廣播商品的特征矩陣 val imBroadcast = sc.broadcast(itemMatrix) //計算每一個用戶的推薦,在這個操作里,會對用戶因子矩陣和電影因子矩陣做乘積,其結果為一個表示各個電影預計評級的向量(長度為 //1682,即電影的總數目) val allRecs = model.userFeatures.map{ case (userId, array) => val userVector = new DoubleMatrix(array) val scores = imBroadcast.value.mmul(userVector) val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1) val recommendedIds = sortedWithId.map(_._2 + 1).toSeq //+1,矩陣從0開始 (userId, recommendedIds) } //實際評分 val userMovies = ratings.map{ case Rating(user, product, rating) => (user, product)}.groupBy(_._1) val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) => val actual = actualWithIds.map(_._2) (predicted.toArray, actual.toArray) } //求MAP,越大越好吧 val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking) println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)
詳細代碼:

package com.spark.milb.study import org.apache.log4j.{Level, Logger} import org.apache.spark.mllib.evaluation.{RankingMetrics, RegressionMetrics} import org.apache.spark.mllib.recommendation.{ALS, Rating} import org.apache.spark.{SparkConf, SparkContext} import org.jblas.DoubleMatrix /** * Created by hadoop on 17-5-3. * 協同過濾(處理對象movie,使用算法ALS:最小二乘法(實現用戶推薦) * 余弦相似度實現商品相似度推薦 */ object cfTest { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) val conf=new SparkConf().setMaster("local").setAppName("AlsTest") val sc=new SparkContext(conf) CF(sc,"ml-100k/u.data") } def CF(sc:SparkContext,fileName:String): Unit ={ val movieFile=sc.textFile(fileName) val RatingDatas=movieFile.map(_.split("\t").take(3)) //轉為Ratings數據 val ratings=RatingDatas.map(x =>Rating(x(0).toInt,x(1).toInt,x(2).toDouble)) //獲取用戶評價模型,設置k因子,和迭代次數,隱藏因子lambda,獲取模型 /* * rank :對應ALS模型中的因子個數,也就是在低階近似矩陣中的隱含特征個數。因子個 數一般越多越好。但它也會直接影響模型訓練和保存時所需的內存開銷,尤其是在用戶 和物品很多的時候。因此實踐中該參數常作為訓練效果與系統開銷之間的調節參數。通 常,其合理取值為10到200。 iterations :對應運行時的迭代次數。ALS能確保每次迭代都能降低評級矩陣的重建誤 差,但一般經少數次迭代后ALS模型便已能收斂為一個比較合理的好模型。這樣,大部分 情況下都沒必要迭代太多次(10次左右一般就挺好)。 lambda :該參數控制模型的正則化過程,從而控制模型的過擬合情況。其值越高,正則 化越嚴厲。該參數的賦值與實際數據的大小、特征和稀疏程度有關。和其他的機器學習 模型一樣,正則參數應該通過用非樣本的測試數據進行交叉驗證來調整。 * */ val model=ALS.train(ratings,50,10,0.01) //基於用戶相似度推薦 println("userNumber:"+model.userFeatures.count()+"\t"+"productNum:"+model.productFeatures.count()) //指定用戶及商品,輸出預測值 println(model.predict(789,123)) //為指定用戶推薦的前N商品 model.recommendProducts(789,11).foreach(println(_)) //為每個人推薦前十個商品 model.recommendProductsForUsers(10).take(1).foreach{ case(x,rating) =>println(rating(0)) } //基於商品相似度(使用余弦相似度)進行推薦,獲取某個商品的特征值 val itemFactory=model.productFeatures.lookup(567).head val itemVector=new DoubleMatrix(itemFactory) //求余弦相似度 val sim=model.productFeatures.map{ case(id,factory)=> val factorVector=new DoubleMatrix(factory) val sim=cosineSimilarity(factorVector,itemVector) (id,sim) } val sortedsim=sim.top(11)(Ordering.by[(Int,Double),Double]{ case(id,sim)=>sim }) println(sortedsim.take(10).mkString("\n")) //模型評估,通過均誤差 //實際用戶評估值 val actualRatings=ratings.map{ case Rating(user,item,rats) => ((user,item),rats) } val userItems=ratings.map{ case(Rating(user,item,rats)) => (user,item) } //模型的用戶對商品的預測值 val predictRatings=model.predict(userItems).map{ case(Rating(user,item,rats)) =>((user,item),rats) } //聯合獲取rate值 val rates=actualRatings.join(predictRatings).map{ case x =>(x._2._1,x._2._2) } //求均方差 val regressionMetrics=new RegressionMetrics(rates) //越接近0越佳 println(regressionMetrics.meanSquaredError) //全局平均准確率(MAP) val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect() val itemMatrix = new DoubleMatrix(itemFactors) //分布式廣播商品的特征矩陣 val imBroadcast = sc.broadcast(itemMatrix) //計算每一個用戶的推薦,在這個操作里,會對用戶因子矩陣和電影因子矩陣做乘積,其結果為一個表示各個電影預計評級的向量(長度為 //1682,即電影的總數目) val allRecs = model.userFeatures.map{ case (userId, array) => val userVector = new DoubleMatrix(array) val scores = imBroadcast.value.mmul(userVector) val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1) val recommendedIds = sortedWithId.map(_._2 + 1).toSeq //+1,矩陣從0開始 (userId, recommendedIds) } //實際評分 val userMovies = ratings.map{ case Rating(user, product, rating) => (user, product)}.groupBy(_._1) val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) => val actual = actualWithIds.map(_._2) (predicted.toArray, actual.toArray) } //求MAP,越大越好吧 val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking) println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision) } //余弦相似度計算 def cosineSimilarity(vec1:DoubleMatrix,vec2:DoubleMatrix):Double={ vec1.dot(vec2)/(vec1.norm2()*vec2.norm2()) } }