原創:協同過濾之ALS


推薦系統的算法,在上個世紀90年代成型,最早應用於UserCF,基於用戶的協同過濾算法,標志着推薦系統的形成。首先,要明白以下幾個理論:①長尾理論②評判推薦系統的指標。之所以需要推薦系統,是要挖掘冷門物品,增加利潤,這是根本目的。一般的,評判一個推薦系統的好壞,需要以下幾個指標:

下面簡單列舉幾種常用的推薦系統評測指標:
1、准確率與召回率(Precision & Recall)
准確率和召回率是廣泛用於信息檢索和統計學分類領域的兩個度量值,用來評價結果的質量。其中精度是檢索出相關文檔數與檢索出的文檔總數的比率,衡量的是檢索系統的查准率;召回率是指檢索出的相關文檔數和文檔庫中所有的相關文檔數的比率,衡量的是檢索系統的查全率。
一般來說,Precision就是檢索出來的條目(比如:文檔、網頁等)有多少是准確的,Recall就是所有准確的條目有多少被檢索出來了。
正確率、召回率和 F 值是在魚龍混雜的環境中,選出目標的重要評價指標。不妨看看這些指標的定義先:
    1. 正確率 = 提取出的正確信息條數 /  提取出的信息條數     
    2. 召回率 = 提取出的正確信息條數 /  樣本中的信息條數    
兩者取值在0和1之間,數值越接近1,查准率或查全率就越高。   
    3. F值  = 正確率 * 召回率 * 2 / (正確率 + 召回率) (F 值即為正確率和召回率的調和平均值)
不妨舉這樣一個例子:某池塘有1400條鯉魚,300只蝦,300只鱉。現在以捕鯉魚為目的。撒一大網,逮着了700條鯉魚,200只蝦,100只鱉。那么,這些指標分別如下:
正確率 = 700 / (700 + 200 + 100) = 70%
召回率 = 700 / 1400 = 50%
F值 = 70% * 50% * 2 / (70% + 50%) = 58.3%
不妨看看如果把池子里的所有的鯉魚、蝦和鱉都一網打盡,這些指標又有何變化:
正確率 = 1400 / (1400 + 300 + 300) = 70%
召回率 = 1400 / 1400 = 100%
F值 = 70% * 100% * 2 / (70% + 100%) = 82.35%        
由此可見,正確率是評估捕獲的成果中目標成果所占得比例;召回率,顧名思義,就是從關注領域中,召回目標類別的比例;而F值,則是綜合這二者指標的評估指標,用於綜合反映整體的指標。
當然希望檢索結果Precision越高越好,同時Recall也越高越好,但事實上這兩者在某些情況下有矛盾的。比如極端情況下,我們只搜索出了一個結果,且是准確的,那么Precision就是100%,但是Recall就很低;而如果我們把所有結果都返回,那么比如Recall是100%,但是Precision就會很低。因此在不同的場合中需要自己判斷希望Precision比較高或是Recall比較高。如果是做實驗研究,可以繪制Precision-Recall曲線來幫助分析。
2、綜合評價指標(F-Measure)
P和R指標有時候會出現的矛盾的情況,這樣就需要綜合考慮他們,最常見的方法就是F-Measure(又稱為F-Score)。
F-Measure是Precision和Recall加權調和平均:
當參數α=1時,就是最常見的F1,也即
可知F1綜合了P和R的結果,當F1較高時則能說明試驗方法比較有效。
3、E值
E值表示查准率P和查全率R的加權平均值,當其中一個為0時,E值為1,其計算公式:
b越大,表示查准率的權重越大。
4、平均正確率(Average Precision, AP)
平均正確率表示不同查全率的點上的正確率的平均。
 

搜索推薦,主要有以下幾種形式:一、根據人口統計學推薦:此推薦方式需要建立用戶模型,並且需要獲取用戶的具體信息,然后根據矩陣運算,計算相似度,此方式最大缺陷是獲取用戶的隱私,應用不多;

               二、基於內容的推薦:根據產品的屬性,推薦出相似的產品。缺點是需要建立item model,比較費時。

               三、基於協同過濾,是目前搜索推薦中應用最廣泛的,不需要建立item model,省事,效果比較好。協同過濾的本質,可以概括為"物以類聚,人以群分",分別指基於物品的協同過濾和基於user的協同過濾。還有基於機器學習的協同過濾,總共這三種形式。第一種的優點是沒有冷啟動問題,基於用戶歷史行為的推薦,有冷啟動問題。

亞馬遜是搜索推薦的鼻祖,把搜索推薦運用到了極致。主要有以下形式:一,基於內容的推薦,主要有:①每日新產品的推薦;②熱門物品;二,基於協同過濾的推薦:①計算用戶相似度,推薦其他用戶群喜歡的產品(去重)②根據FP-Growth model進行相關度挖掘,捆綁銷售③基於ALS算法,推薦產品。關於ALS算法,有一篇經典的博文,是spark MLlib的源碼貢獻者之一寫的,很深入。地址:http://www.csdn.net/article/2015-05-07/2824641 認真研究幾遍,會有很多收獲。本文主要講ALS應用,所以會比較簡單。在講ALS應用前,有必要詳細論述以下UserCF和Item CF。先講長尾分布,因為推薦系統的目的,就是挖掘長尾物品,同時消除熱門物品的影響(需要加入懲罰因子)。以下內容,全部來自《推薦系統實戰》一書。

 

 

基於用戶的協同過濾算法,需要找出和自己興趣形似的群體,然后推薦別人喜歡的物品給自己,可以用准確率,召回率,覆蓋率和流行度來衡量。影響user CF的一個重要的因素是K,即找出多少個與自己興趣相似的用戶。K值越大,准確率和召回率越高,但是,覆蓋率不一定高。因為K值越大,推薦的物品,越傾向於熱門物品,效果反而不好,不能很好地挖掘長尾物品,所以,在用余弦相似度公式時,需要引入懲罰因子,來消除熱門物品的影響。ItemCF的原理,和spark中的FP-Growth樹的相關度挖掘,是一致的,即根據用戶的歷史興趣,把兩個物品關聯起來(物以類聚).

 

所以說,在購物網站或者電子商務領域,當用戶剛剛登陸系統時,可以利用ALS矩陣分解,對用戶進行推薦(排除冷啟動的情況下),比如,猜你喜歡:xxxx。當用戶購買之后,可以在列表下方,使用ItemCF對用戶推薦,並且說明推薦理由:比如,我買了一本《推薦系統實戰》之后,可以在下方,幫我推薦《web數據挖掘》這本書,說明:購買這本書的用戶還購買了《web數據挖掘》一書,或者說瀏覽這本書的用戶,還購買了xxx……。下面,舉一個例子,來使用ALS:

 

package com.txq.spark.test

import java.io.File

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

import scala.io.Source

/**
  * ALS矩陣分解搜索推薦:
  * 三個數據源:①rating.dat ----->userid  itemid  rate
  *            ②items.dat----itemid->item(Map)
  *            ③users.dat---預測用戶列表
  * 思路:1.實現數據的partitions,數據格式為(key,value)形式,key為時間戳,對10求余數,根據余數分區,提高計算的
    *    並行度;
    * 2.取上述數據的values部分,訓練出一個最優的model出來,使用"三折交叉驗證",評判標准為RMSE;
  * 3.根據最優model對特定用戶推薦產品,注意去除該用戶已經評分過的產品.
  */
object MovieLensALS {
  System.setProperty("hadoop.home.dir", "D://hadoop-2.6.2")
  def main(args: Array[String]): Unit = {
    //屏蔽不必要的日志顯示在終端上
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    if(args.length != 2){
      println("Usage:/path/to/spark/bin/spark-submit --driver-memory 2g --class " +
        "week6.MovieLensALS" + "target/scala-*/movielens-als-ssembly-*.jar movieLensHomeDir personalRatingsFile")
      sys.exit(1)
    }
    //設置運行環境
    val conf = new SparkConf().setAppName("MovieLensALS")
    val sc = new SparkContext(conf)

    //裝載用戶評分,該評分由評分器生成
    val myRatings = loadRatings(args(1))
    val myRatingsRDD = sc.parallelize(myRatings,1)

    //樣本數據目錄
    val movieLensHomeDir = args(0)
    //裝載樣本評分數據,其中最后一列Timestamp取除10的余數作為key,Rating為值,即(Int,Rating)
    val ratings = sc.textFile(new File(movieLensHomeDir,"ratings.dat").toString).map{ line =>
      val fields = line.split("::")
      (fields(3).toLong % 10,Rating(fields(0).toInt,fields(1).toInt,fields(2).toDouble))
    }
    //裝載電影目錄對照表(電影ID->電影標題)
    val movies = sc.textFile(new File(movieLensHomeDir,"movies.dat").toString).map{line =>
      val fields = line.split("::")
      (fields(0).toInt,fields(1))
    }

    val numRatings = ratings.count()
    val numUsers = ratings.map(_._2.user).distinct().count()
    val numMovies = ratings.map(_._2.product).distinct().count()

    println("Got " + numRatings + " ratings from " + numUsers + " users on" + numMovies + " movies.")
    //將樣本評分表以key值切分成3個部分,分別用於訓練(60%,並加入用戶評分),校驗(20%),測試(20%)
    //該數據在計算過程中要多次應用到,所以cache到內存
    val numPartitions = 4
    val training = ratings.filter(_._1 < 6).values.union(myRatingsRDD).repartition(numPartitions).cache()
    val test = ratings.filter(_._1 >= 8).values.repartition(numPartitions).cache()
    val validation = ratings.filter(x => x._1>= 6 && x._1 < 8).values.repartition(numPartitions).cache()

    val numTraining = training.count()
    val numValidation = validation.count()
    val numTest = test.count()

    println("Training: "+ numTraining + ",validation: " + numValidation + ",test: " + numTest)
    //訓練不同參數下的模型,並在校驗集中校驗,獲取最佳參數下的模型
    val ranks = List(8,9)
    val lambdas = List(0.1,10.0)
    val numIters = List(10,20)
    var bestModel:Option[MatrixFactorizationModel] = None
    var bestValidationRmse = Double.MaxValue
    var bestRank = 0
    var bestLambda = -1.0
    var bestNumIter = -1
    for(rank <- ranks;lambda <- lambdas;numIter <- numIters){
      val model = ALS.train(training,rank,numIter,lambda)
      val validationRmse = computeRmse(model,validation)
      println("RMSE(validation) = " + validationRmse + " for the model trained with rank = " + rank + " with lambda = " + lambda + " with numIterations = " + numIter)
      if(validationRmse < bestValidationRmse){
        bestModel = Some(model)
        bestValidationRmse = validationRmse
        bestRank = rank
        bestLambda = lambda
        bestNumIter = numIter
      }
    }
    //用最佳模型預測測試集的評分,並計算和實際評分之間的均方根誤差
    val testRmse = computeRmse(bestModel.get,test)
    println("The best model was trained with rank = " + bestRank + " with lambda = " + bestLambda + " with best NumIterations = " + bestNumIter)
    //用基准偏差衡量最佳模型在測試數據上的預測精度(產生的RMSE越接近基准偏差,精度越高)
    val meanRating = training.union(validation).map(_.rating).mean()
    val baselineRmse = math.sqrt(test.map(x => math.pow(meanRating - x.rating,2)).mean())
    val improvement =(testRmse-baselineRmse) / baselineRmse * 100
    println("The best model improves the base line by " + "%1.2f".format(improvement + "%."))

    //推薦前十部最感興趣的電影,注意要剔除用戶已經評分的電影
    val myRatedMovieIds = myRatings.map(_.product).toSet
    val userId:Int = myRatings.map(_.user).distinct(0)//被推薦用戶的id
    val candidates = movies.map(_._1).filter(!myRatedMovieIds.contains(_))
    val recommendations = bestModel.get.predict(candidates.map((userId,_))).sortBy(-_.rating).take(10)//按降序排列

    var i = 1
    val products = movies.collect().toMap
    println("Movies recommended for you: ")
    recommendations.foreach{ r =>
      println("%2d".format(i) + ": " + products.get(r.product))
      i += 1
    }
    sc.stop()
  }
  /**裝載用戶評分文件 **/
  def loadRatings(path:String):Seq[Rating] = {
    val lines = Source.fromFile(path).getLines()
    val ratings = lines.map{line =>
      val fields = line.split("::")
      Rating(fields(0).toInt,fields(1).toInt,fields(2).toDouble)
    }.filter(_.rating > 0.0)
    if(ratings.isEmpty){
      sys.error("No ratings provided.")
    } else {
      ratings.toSeq
    }
  }

  /**
    * 計算RMSE,應該盡量減少shuffle操作,提高效率
    * @param model ALS模型
    * @param data validation數據
    * @return
    */
  def computeRmse(model:MatrixFactorizationModel,data:RDD[Rating]):Double = {
      val precisionAndReal = data.map{ x =>
        val precision = model.predict(x.user,x.product)
        (x.rating,precision)
      }
    math.sqrt(precisionAndReal.map(x => math.pow(x._1 - x._2,2)).mean())
  }
}


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM