算法說明
協同過濾(Collaborative Filtering,簡稱CF,WIKI上的定義是:簡單來說是利用某個興趣相投、擁有共同經驗之群體的喜好來推薦感興趣的資訊給使用者,個人透過合作的機制給予資訊相當程度的回應(如評分)並記錄下來以達到過濾的目的,進而幫助別人篩選資訊,回應不一定局限於特別感興趣的,特別不感興趣資訊的紀錄也相當重要。
協同過濾常被應用於推薦系統。這些技術旨在補充用戶—商品關聯矩陣中所缺失的部分。
MLlib 當前支持基於模型的協同過濾,其中用戶和商品通過一小組隱性因子進行表達,並且這些因子也用於預測缺失的元素。MLLib 使用交替最小二乘法(ALS) 來學習這些隱性因子。
用戶對物品或者信息的偏好,根據應用本身的不同,可能包括用戶對物品的評分、用戶查看物品的記錄、用戶的購買記錄等。其實這些用戶的偏好信息可以分為兩類:
- 顯式的用戶反饋:這類是用戶在網站上自然瀏覽或者使用網站以外,顯式地提供反饋信息,例如用戶對物品的評分或者對物品的評論。
- 隱式的用戶反饋:這類是用戶在使用網站是產生的數據,隱式地反映了用戶對物品的喜好,例如用戶購買了某物品,用戶查看了某物品的信息,等等。
顯式的用戶反饋能准確地反映用戶對物品的真實喜好,但需要用戶付出額外的代價;而隱式的用戶行為,通過一些分析和處理,也能反映用戶的喜好,只是數據不是很精確,有些行為的分析存在較大的噪音。但只要選擇正確的行為特征,隱式的用戶反饋也能得到很好的效果,只是行為特征的選擇可能在不同的應用中有很大的不同,例如在電子商務的網站上,購買行為其實就是一個能很好表現用戶喜好的隱式反饋。
推薦引擎根據不同的推薦機制可能用到數據源中的一部分,然后根據這些數據,分析出一定的規則或者直接對用戶對其他物品的喜好進行預測計算。這樣推薦引擎可以在用戶進入時給他推薦他可能感興趣的物品。
MLlib目前支持基於協同過濾的模型,在這個模型里,用戶和產品被一組可以用來預測缺失項目的潛在因子來描述。特別是我們實現交替最小二乘(ALS)算法來學習這些潛在的因子,在 MLlib 中的實現有如下參數:
- numBlocks是用於並行化計算的分塊個數(設置為-1時 為自動配置);
- rank是模型中隱性因子的個數;
- iterations是迭代的次數;
- lambda是ALS 的正則化參數;
- implicitPrefs決定了是用顯性反饋ALS 的版本還是用隱性反饋數據集的版本;
- alpha是一個針對於隱性反饋 ALS 版本的參數,這個參數決定了偏好行為強度的基准。
實例介紹
在本實例中將使用協同過濾算法對GroupLens Research(http://grouplens.org/datasets/movielens/)提供的數據進行分析,該數據為一組從20世紀90年末到21世紀初由MovieLens用戶提供的電影評分數據,這些數據中包括電影評分、電影元數據(風格類型和年代)以及關於用戶的人口統計學數據(年齡、郵編、性別和職業等)。根據不同需求該組織提供了不同大小的樣本數據,不同樣本信息中包含三種數據:評分、用戶信息和電影信息。
對這些數據分析進行如下步驟:
1. 裝載如下兩種數據:
a)裝載樣本評分數據,其中最后一列時間戳除10的余數作為key,Rating為值;
b)裝載電影目錄對照表(電影ID->電影標題)
2.將樣本評分表以key值切分成3個部分,分別用於訓練 (60%,並加入用戶評分), 校驗 (20%), and 測試 (20%)
3.訓練不同參數下的模型,並再校驗集中驗證,獲取最佳參數下的模型
4.用最佳模型預測測試集的評分,計算和實際評分之間的均方根誤差
5.根據用戶評分的數據,推薦前十部最感興趣的電影(注意要剔除用戶已經評分的電影)
測試數據說明
在MovieLens提供的電影評分數據分為三個表:評分、用戶信息和電影信息,在該系列提供的附屬數據提供大概6000位讀者和100萬個評分數據,具體位置為/data/class8/movielens/data目錄下,對三個表數據說明可以參考該目錄下README文檔。
1.評分數據說明(ratings.data)
該評分數據總共四個字段,格式為UserID::MovieID::Rating::Timestamp,分為為用戶編號::電影編號::評分::評分時間戳,其中各個字段說明如下:
- 用戶編號范圍1~6040
- 電影編號1~3952
- 電影評分為五星評分,范圍0~5
- 評分時間戳單位秒
- 每個用戶至少有20個電影評分
使用的ratings.dat的數據樣本如下所示:
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
2.用戶信息(users.dat)
用戶信息五個字段,格式為UserID::Gender::Age::Occupation::Zip-code,分為為用戶編號::性別::年齡::職業::郵編,其中各個字段說明如下:
- 用戶編號范圍1~6040
- 性別,其中M為男性,F為女性
- 不同的數字代表不同的年齡范圍,如:25代表25~34歲范圍
- 職業信息,在測試數據中提供了21中職業分類
- 地區郵編
使用的users.dat的數據樣本如下所示:
1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117
4::M::45::7::02460
5::M::25::20::55455
6::F::50::9::55117
7::M::35::1::06810
8::M::25::12::11413
3.電影信息(movies.dat)
電影數據分為三個字段,格式為MovieID::Title::Genres,分為為電影編號::電影名::電影類別,其中各個字段說明如下:
- 電影編號1~3952
- 由IMDB提供電影名稱,其中包括電影上映年份
- 電影分類,這里使用實際分類名非編號,如:Action、Crime等
使用的movies.dat的數據樣本如下所示:
1::Toy Story (1995)::Animation|Children's|Comedy 2::Jumanji (1995)::Adventure|Children's|Fantasy 3::Grumpier Old Men (1995)::Comedy|Romance 4::Waiting to Exhale (1995)::Comedy|Drama 5::Father of the Bride Part II (1995)::Comedy 6::Heat (1995)::Action|Crime|Thriller 7::Sabrina (1995)::Comedy|Romance 8::Tom and Huck (1995)::Adventure|Children's
程序代碼
import java.io.File import scala.io.Source import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd._ import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel} object MovieLensALS { def main(args: Array[String]) {
// 屏蔽不必要的日志顯示在終端上 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) if (args.length != 2) { println("Usage: /path/to/spark/bin/spark-submit --driver-memory 2g --class week7.MovieLensALS " + "week7.jar movieLensHomeDir personalRatingsFile") sys.exit(1) } // 設置運行環境 val conf = new SparkConf().setAppName("MovieLensALS").setMaster("local[4]") val sc = new SparkContext(conf) // 裝載用戶評分,該評分由評分器生成 val myRatings = loadRatings(args(1)) val myRatingsRDD = sc.parallelize(myRatings, 1) // 樣本數據目錄 val movieLensHomeDir = args(0) // 裝載樣本評分數據,其中最后一列Timestamp取除10的余數作為key,Rating為值,即(Int,Rating) val ratings = sc.textFile(new File(movieLensHomeDir, "ratings.dat").toString).map { line => val fields = line.split("::") (fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)) } // 裝載電影目錄對照表(電影ID->電影標題) val movies = sc.textFile(new File(movieLensHomeDir, "movies.dat").toString).map { line => val fields = line.split("::") (fields(0).toInt, fields(1)) }.collect().toMap val numRatings = ratings.count() val numUsers = ratings.map(_._2.user).distinct().count() val numMovies = ratings.map(_._2.product).distinct().count() println("Got " + numRatings + " ratings from " + numUsers + " users on " + numMovies + " movies.") // 將樣本評分表以key值切分成3個部分,分別用於訓練 (60%,並加入用戶評分), 校驗 (20%), and 測試 (20%) // 該數據在計算過程中要多次應用到,所以cache到內存 val numPartitions = 4 val training = ratings.filter(x => x._1 < 6) .values .union(myRatingsRDD) //注意ratings是(Int,Rating),取value即可 .repartition(numPartitions) .cache() val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8) .values .repartition(numPartitions) .cache() val test = ratings.filter(x => x._1 >= 8).values.cache() val numTraining = training.count() val numValidation = validation.count() val numTest = test.count() println("Training: " + numTraining + ", validation: " + numValidation + ", test: " + numTest)
// 訓練不同參數下的模型,並在校驗集中驗證,獲取最佳參數下的模型 val ranks = List(8, 12) val lambdas = List(0.1, 10.0) val numIters = List(10, 20) var bestModel: Option[MatrixFactorizationModel] = None var bestValidationRmse = Double.MaxValue var bestRank = 0 var bestLambda = -1.0 var bestNumIter = -1 for (rank <- ranks; lambda <- lambdas; numIter <- numIters) { val model = ALS.train(training, rank, numIter, lambda) val validationRmse = computeRmse(model, validation, numValidation) println("RMSE (validation) = " + validationRmse + " for the model trained with rank = " + rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".") if (validationRmse < bestValidationRmse) { bestModel = Some(model) bestValidationRmse = validationRmse bestRank = rank bestLambda = lambda bestNumIter = numIter } } // 用最佳模型預測測試集的評分,並計算和實際評分之間的均方根誤差 val testRmse = computeRmse(bestModel.get, test, numTest) println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".") // create a naive baseline and compare it with the best model val meanRating = training.union(validation).map(_.rating).mean val baselineRmse = math.sqrt(test.map(x => (meanRating - x.rating) * (meanRating - x.rating)).mean) val improvement = (baselineRmse - testRmse) / baselineRmse * 100 println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.") // 推薦前十部最感興趣的電影,注意要剔除用戶已經評分的電影 val myRatedMovieIds = myRatings.map(_.product).toSet val candidates = sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq) val recommendations = bestModel.get .predict(candidates.map((0, _))) .collect() .sortBy(-_.rating) .take(10) var i = 1 println("Movies recommended for you:") recommendations.foreach { r => println("%2d".format(i) + ": " + movies(r.product)) i += 1 } sc.stop() } /** 校驗集預測數據和實際數據之間的均方根誤差 **/ def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long): Double = { val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product))) val predictionsAndRatings = predictions.map(x => ((x.user, x.product), x.rating)) .join(data.map(x => ((x.user, x.product), x.rating))) .values math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n) } /** 裝載用戶評分文件 **/ def loadRatings(path: String): Seq[Rating] = { val lines = Source.fromFile(path).getLines() val ratings = lines.map { line => val fields = line.split("::") Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) }.filter(_.rating > 0.0) if (ratings.isEmpty) { sys.error("No ratings provided.") } else { ratings.toSeq } } }
IDEA執行情況
第一步 使用如下命令啟動Spark集群
$cd /app/hadoop/spark-1.1.0 $sbin/start-all.sh
第二步 進行用戶評分,生成用戶樣本數據
由於該程序中最終推薦給用戶十部電影,這需要用戶提供對樣本電影數據的評分,然后根據生成的最佳模型獲取當前用戶推薦電影。用戶可以使用/home/hadoop/upload/class8/movielens/bin/rateMovies程序進行評分,最終生成personalRatings.txt文件:
第三步 在IDEA中設置運行環境
在IDEA運行配置中設置MovieLensALS運行配置,需要設置輸入數據所在文件夾和用戶的評分文件路徑:
- 輸入數據所在目錄:輸入數據文件目錄,在該目錄中包含了評分信息、用戶信息和電影信息,這里設置為/home/hadoop/upload/class8/movielens/data/
- 用戶的評分文件路徑:前一步驟中用戶對十部電影評分結果文件路徑,在這里設置為/home/hadoop/upload/class8/movielens/personalRatings.txt
第四步 執行並觀察輸出
- 輸出Got 1000209 ratings from 6040 users on 3706 movies,表示本算法中計算數據包括大概100萬評分數據、6000多用戶和3706部電影;
- 輸出Training: 602252, validation: 198919, test: 199049,表示對評分數據進行拆分為訓練數據、校驗數據和測試數據,大致占比為6:2:2;
- 在計算過程中選擇8種不同模型對數據進行訓練,然后從中選擇最佳模型,其中最佳模型比基准模型提供22.30%
RMSE (validation) = 0.8680885498009973 for the model trained with rank = 8, lambda = 0.1, and numIter = 10.
RMSE (validation) = 0.868882967482595 for the model trained with rank = 8, lambda = 0.1, and numIter = 20.
RMSE (validation) = 3.7558695311242833 for the model trained with rank = 8, lambda = 10.0, and numIter = 10.
RMSE (validation) = 3.7558695311242833 for the model trained with rank = 8, lambda = 10.0, and numIter = 20.
RMSE (validation) = 0.8663942501841964 for the model trained with rank = 12, lambda = 0.1, and numIter = 10.
RMSE (validation) = 0.8674684744165418 for the model trained with rank = 12, lambda = 0.1, and numIter = 20.
RMSE (validation) = 3.7558695311242833 for the model trained with rank = 12, lambda = 10.0, and numIter = 10.
RMSE (validation) = 3.7558695311242833 for the model trained with rank = 12, lambda = 10.0, and numIter = 20.
The best model was trained with rank = 12 and lambda = 0.1, and numIter = 10, and its RMSE on the test set is 0.8652326018300565.
The best model improves the baseline by 22.30%.
- 利用前面獲取的最佳模型,結合用戶提供的樣本數據,最終推薦給用戶如下影片:
Movies recommended for you:
1: Bewegte Mann, Der (1994)
2: Chushingura (1962)
3: Love Serenade (1996)
4: For All Mankind (1989)
5: Vie est belle, La (Life is Rosey) (1987)
6: Bandits (1997)
7: King of Masks, The (Bian Lian) (1996)
8: I'm the One That I Want (2000)
9: Big Trees, The (1952)
10: First Love, Last Rites (1997)