1.ALS算法流程:
初始化數據集和Spark環境---->
切分測試機和檢驗集------>
訓練ALS模型------------>
驗證結果----------------->
檢驗滿足結果---->直接推薦商品,否則繼續訓練ALS模型
2.數據集的含義
Rating是固定的ALS輸入格式,要求是一個元組類型的數據,其中數值分別是如下的[Int,Int,Double],在建立數據集的時候,用戶名和物品名需要采用數值代替
1 /** 2 * A more compact class to represent a rating than Tuple3[Int, Int, Double]. 3 */ 4 @Since("0.8.0") 5 case class Rating @Since("0.8.0") ( 6 @Since("0.8.0") user: Int, 7 @Since("0.8.0") product: Int, 8 @Since("0.8.0") rating: Double)
如下:第一列位用戶編號,第二列位產品編號,第三列的評分Rating為Double類型
3.ALS的測試數據集源代碼解讀
3.1ALS類的所有字段如下
@Since("0.8.0") class ALS private ( private var numUserBlocks: Int, private var numProductBlocks: Int, private var rank: Int, private var iterations: Int, private var lambda: Double, private var implicitPrefs: Boolean, 使用顯式反饋ALS變量或隱式反饋 private var alpha: Double, ALS隱式反饋變化率用於控制每次擬合修正的幅度 private var seed: Long = System.nanoTime() ) extends Serializable with Logging {
3.2 ALS.train方法
1 /** 2 * Train a matrix factorization model given an RDD of ratings given by users to some products, 3 * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the 4 * product of two lower-rank matrices of a given rank (number of features). To solve for these 5 * features, we run a given number of iterations of ALS. This is done using a level of 6 * parallelism given by `blocks`. 7 * 8 * @param ratings RDD of (userID, productID, rating) pairs 9 * @param rank number of features to use 10 * @param iterations number of iterations of ALS (recommended: 10-20) 11 * @param lambda regularization factor (recommended: 0.01) 12 * @param blocks level of parallelism to split computation into 將並行度分解為等級 13 * @param seed random seed 隨機種子 14 */ 15 @Since("0.9.1") 16 def train( 17 ratings: RDD[Rating], //RDD序列由用戶ID 產品ID和評分組成 18 rank: Int, //模型中的隱藏因子數目 19 iterations: Int, //算法迭代次數 20 lambda: Double, //ALS正則化參數 21 blocks: Int, //塊 22 seed: Long 23 ): MatrixFactorizationModel = { 24 new ALS(blocks, blocks, rank, iterations, lambda, false, 1.0, seed).run(ratings) 25 }
3.3 基於ALS算法的協同過濾推薦
1 package com.bigdata.demo 2 3 import org.apache.spark.{SparkContext, SparkConf} 4 import org.apache.spark.mllib.recommendation.ALS 5 import org.apache.spark.mllib.recommendation.Rating 6 7 /** 8 * Created by SimonsZhao on 3/30/2017. 9 * ALS最小二乘法 10 */ 11 object CollaborativeFilter { 12 13 def main(args: Array[String]) { 14 //設置環境變量 15 val conf=new SparkConf().setMaster("local").setAppName("CollaborativeFilter ") 16 //實例化環境 17 val sc = new SparkContext(conf) 18 //設置數據集 19 val data =sc.textFile("E:/scala/spark/testdata/ALSTest.txt") 20 //處理數據 21 val ratings=data.map(_.split(' ') match{ 22 //數據集的轉換 23 case Array(user,item,rate) => 24 //將數據集轉化為專用的Rating 25 Rating(user.toInt,item.toInt,rate.toDouble) 26 }) 27 //設置隱藏因子 28 val rank=2 29 //設置迭代次數 30 val numIterations=2 31 //進行模型訓練 32 val model =ALS.train(ratings,rank,numIterations,0.01) 33 //為用戶2推薦一個商品 34 val rs=model.recommendProducts(2,1) 35 //打印結果 36 rs.foreach(println) 37 } 38 39 }
展開代碼可復制

1 package com.bigdata.demo 2 3 import org.apache.spark.{SparkContext, SparkConf} 4 import org.apache.spark.mllib.recommendation.ALS 5 import org.apache.spark.mllib.recommendation.Rating 6 7 /** 8 * Created by SimonsZhao on 3/30/2017. 9 * ALS最小二乘法 10 */ 11 object CollaborativeFilter { 12 13 def main(args: Array[String]) { 14 //設置環境變量 15 val conf=new SparkConf().setMaster("local").setAppName("CollaborativeFilter ") 16 //實例化環境 17 val sc = new SparkContext(conf) 18 //設置數據集 19 val data =sc.textFile("E:/scala/spark/testdata/ALSTest.txt") 20 //處理數據 21 val ratings=data.map(_.split(' ') match{ 22 //數據集的轉換 23 case Array(user,item,rate) => 24 //將數據集轉化為專用的Rating 25 Rating(user.toInt,item.toInt,rate.toDouble) 26 }) 27 //設置隱藏因子 28 val rank=2 29 //設置迭代次數 30 val numIterations=2 31 //進行模型訓練 32 val model =ALS.train(ratings,rank,numIterations,0.01) 33 //為用戶2推薦一個商品 34 val rs=model.recommendProducts(2,1) 35 //打印結果 36 rs.foreach(println) 37 } 38 39 }
4.測試及分析
根據結果分析為第2個用戶推薦了編號為15的商品,預測評分為3.99
5.基於用戶的推薦源代碼(mllib)
注釋的部分翻譯:
用戶向用戶推薦產品
num返回多少產品。 返回的數字可能少於此值。
[[評分]]對象,每個對象包含給定的用戶ID,產品ID和
評分字段中的“得分”。 每個代表一個推薦的產品,並且它們被排序
按分數,減少。 第一個返回的是預測最強的一個
推薦給用戶。 分數是一個不透明的值,表示強列推薦的產品。
1 /** 2 * Recommends products to a user. 3 * 4 * @param user the user to recommend products to 5 * @param num how many products to return. The number returned may be less than this. 6 * @return [[Rating]] objects, each of which contains the given user ID, a product ID, and a 7 * "score" in the rating field. Each represents one recommended product, and they are sorted 8 * by score, decreasing. The first returned is the one predicted to be most strongly 9 * recommended to the user. The score is an opaque value that indicates how strongly 10 * recommended the product is. 11 */ 12 @Since("1.1.0") 13 def recommendProducts(user: Int, num: Int): Array[Rating] = 14 MatrixFactorizationModel.recommend(userFeatures.lookup(user).head, productFeatures, num) 15 .map(t => Rating(user, t._1, t._2))
6.基於物品的推薦源代碼(mllib)
注釋的部分翻譯:
推薦用戶使用產品,也就是說,這將返回最有可能的用戶對產品感興趣
每個都包含用戶ID,給定的產品ID和評分字段中的“得分”。
每個代表一個推薦的用戶,並且它們被排序按得分,減少。
第一個返回的是預測最強的一個推薦給產品。
分數是一個不透明的值,表示強烈推薦給用戶。
1 /** 2 * Recommends users to a product. That is, this returns users who are most likely to be 3 * interested in a product. 4 * 5 * @param product the product to recommend users to 給用戶推薦的產品 6 * @param num how many users to return. The number returned may be less than this. 返回個用戶的個數 7 * @return [[Rating]] objects, each of which contains a user ID, the given product ID, and a 8 * "score" in the rating field. Each represents one recommended user, and they are sorted 9 * by score, decreasing. The first returned is the one predicted to be most strongly 10 * recommended to the product. The score is an opaque value that indicates how strongly 11 * recommended the user is. 12 */ 13 @Since("1.1.0") 14 def recommendUsers(product: Int, num: Int): Array[Rating] = 15 MatrixFactorizationModel.recommend(productFeatures.lookup(product).head, userFeatures, num) 16 .map(t => Rating(t._1, product, t._2))
END~