推薦系統之最小二乘法ALS的Spark實現


1.ALS算法流程:

    初始化數據集和Spark環境---->

                   切分測試機和檢驗集------>

                          訓練ALS模型------------>

                                  驗證結果----------------->

                                        檢驗滿足結果---->直接推薦商品,否則繼續訓練ALS模型

2.數據集的含義

     Rating是固定的ALS輸入格式,要求是一個元組類型的數據,其中數值分別是如下的[Int,Int,Double],在建立數據集的時候,用戶名和物品名需要采用數值代替

1 /**
2  * A more compact class to represent a rating than Tuple3[Int, Int, Double].
3  */
4 @Since("0.8.0")
5 case class Rating @Since("0.8.0") (
6     @Since("0.8.0") user: Int,
7     @Since("0.8.0") product: Int,
8     @Since("0.8.0") rating: Double)

  如下:第一列位用戶編號,第二列位產品編號,第三列的評分Rating為Double類型

          

3.ALS的測試數據集源代碼解讀

     3.1ALS類的所有字段如下

@Since("0.8.0")
class ALS private (
    private var numUserBlocks: Int,
    private var numProductBlocks: Int,
    private var rank: Int,
    private var iterations: Int,
    private var lambda: Double,
    private var implicitPrefs: Boolean,  使用顯式反饋ALS變量或隱式反饋
    private var alpha: Double,    ALS隱式反饋變化率用於控制每次擬合修正的幅度
    private var seed: Long = System.nanoTime()
  ) extends Serializable with Logging {

     3.2 ALS.train方法

 1 /**
 2    * Train a matrix factorization model given an RDD of ratings given by users to some products,
 3    * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
 4    * product of two lower-rank matrices of a given rank (number of features). To solve for these
 5    * features, we run a given number of iterations of ALS. This is done using a level of
 6    * parallelism given by `blocks`.
 7    *
 8    * @param ratings    RDD of (userID, productID, rating) pairs
 9    * @param rank       number of features to use  
10    * @param iterations number of iterations of ALS (recommended: 10-20)
11    * @param lambda     regularization factor (recommended: 0.01)
12    * @param blocks     level of parallelism to split computation into  將並行度分解為等級
13    * @param seed       random seed  隨機種子
14    */
15   @Since("0.9.1")
16   def train(
17       ratings: RDD[Rating], //RDD序列由用戶ID 產品ID和評分組成
18       rank: Int,    //模型中的隱藏因子數目
19       iterations: Int,  //算法迭代次數
20       lambda: Double,  //ALS正則化參數
21       blocks: Int,   //塊
22       seed: Long
23     ): MatrixFactorizationModel = {
24     new ALS(blocks, blocks, rank, iterations, lambda, false, 1.0, seed).run(ratings)
25   }

     3.3 基於ALS算法的協同過濾推薦

 1 package com.bigdata.demo
 2 
 3 import org.apache.spark.{SparkContext, SparkConf}
 4 import org.apache.spark.mllib.recommendation.ALS
 5 import org.apache.spark.mllib.recommendation.Rating
 6 
 7 /**
 8   * Created by SimonsZhao on 3/30/2017.
 9   * ALS最小二乘法
10   */
11 object CollaborativeFilter {
12 
13   def main(args: Array[String]) {
14     //設置環境變量
15      val conf=new SparkConf().setMaster("local").setAppName("CollaborativeFilter ")
16     //實例化環境
17      val sc = new SparkContext(conf)
18     //設置數據集
19      val data =sc.textFile("E:/scala/spark/testdata/ALSTest.txt")
20     //處理數據
21      val ratings=data.map(_.split(' ') match{
22       //數據集的轉換
23       case Array(user,item,rate) =>
24         //將數據集轉化為專用的Rating
25         Rating(user.toInt,item.toInt,rate.toDouble)
26     })
27     //設置隱藏因子
28      val rank=2
29     //設置迭代次數
30      val numIterations=2
31     //進行模型訓練
32      val model =ALS.train(ratings,rank,numIterations,0.01)
33     //為用戶2推薦一個商品
34      val rs=model.recommendProducts(2,1)
35     //打印結果
36      rs.foreach(println)
37   }
38 
39 }

     展開代碼可復制

 1 package com.bigdata.demo
 2 
 3 import org.apache.spark.{SparkContext, SparkConf}
 4 import org.apache.spark.mllib.recommendation.ALS
 5 import org.apache.spark.mllib.recommendation.Rating
 6 
 7 /**
 8   * Created by SimonsZhao on 3/30/2017.
 9   * ALS最小二乘法
10   */
11 object CollaborativeFilter {
12 
13   def main(args: Array[String]) {
14     //設置環境變量
15      val conf=new SparkConf().setMaster("local").setAppName("CollaborativeFilter ")
16     //實例化環境
17      val sc = new SparkContext(conf)
18     //設置數據集
19      val data =sc.textFile("E:/scala/spark/testdata/ALSTest.txt")
20     //處理數據
21      val ratings=data.map(_.split(' ') match{
22       //數據集的轉換
23       case Array(user,item,rate) =>
24         //將數據集轉化為專用的Rating
25         Rating(user.toInt,item.toInt,rate.toDouble)
26     })
27     //設置隱藏因子
28      val rank=2
29     //設置迭代次數
30      val numIterations=2
31     //進行模型訓練
32      val model =ALS.train(ratings,rank,numIterations,0.01)
33     //為用戶2推薦一個商品
34      val rs=model.recommendProducts(2,1)
35     //打印結果
36      rs.foreach(println)
37   }
38 
39 }
點擊+復制代碼

4.測試及分析

    根據結果分析為第2個用戶推薦了編號為15的商品,預測評分為3.99

5.基於用戶的推薦源代碼(mllib)

注釋的部分翻譯:

  用戶向用戶推薦產品

  num返回多少產品。 返回的數字可能少於此值。

[[評分]]對象,每個對象包含給定的用戶ID,產品ID和
  評分字段中的“得分”。 每個代表一個推薦的產品,並且它們被排序
  按分數,減少。 第一個返回的是預測最強的一個
  推薦給用戶。 分數是一個不透明的值,表示強列推薦的產品。

 1   /**
 2    * Recommends products to a user.
 3    *
 4    * @param user the user to recommend products to
 5    * @param num how many products to return. The number returned may be less than this.
 6    * @return [[Rating]] objects, each of which contains the given user ID, a product ID, and a
 7    *  "score" in the rating field. Each represents one recommended product, and they are sorted
 8    *  by score, decreasing. The first returned is the one predicted to be most strongly
 9    *  recommended to the user. The score is an opaque value that indicates how strongly
10    *  recommended the product is.
11    */
12   @Since("1.1.0")
13   def recommendProducts(user: Int, num: Int): Array[Rating] =
14     MatrixFactorizationModel.recommend(userFeatures.lookup(user).head, productFeatures, num)
15       .map(t => Rating(user, t._1, t._2))

6.基於物品的推薦源代碼(mllib)

注釋的部分翻譯:

  推薦用戶使用產品,也就是說,這將返回最有可能的用戶對產品感興趣

  每個都包含用戶ID,給定的產品ID和評分字段中的“得分”。

  每個代表一個推薦的用戶,並且它們被排序按得分,減少。

  第一個返回的是預測最強的一個推薦給產品。

   分數是一個不透明的值,表示強烈推薦給用戶。

 1   /**
 2    * Recommends users to a product. That is, this returns users who are most likely to be
 3    * interested in a product.
 4    *
 5    * @param product the product to recommend users to   給用戶推薦的產品
 6    * @param num how many users to return. The number returned may be less than this. 返回個用戶的個數
 7    * @return [[Rating]] objects, each of which contains a user ID, the given product ID, and a
 8    *  "score" in the rating field. Each represents one recommended user, and they are sorted
 9    *  by score, decreasing. The first returned is the one predicted to be most strongly
10    *  recommended to the product. The score is an opaque value that indicates how strongly
11    *  recommended the user is.
12    */
13   @Since("1.1.0")
14   def recommendUsers(product: Int, num: Int): Array[Rating] =
15     MatrixFactorizationModel.recommend(productFeatures.lookup(product).head, userFeatures, num)
16       .map(t => Rating(t._1, product, t._2))

 

END~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM