Spark ML協同過濾推薦算法


一.簡介

  協同過濾算法【Collaborative Filtering Recommendation】算法是最經典、最常用的推薦算法。該算法通過分析用戶興趣,在用戶群中找到指定用戶的相似用戶,綜合這些相似用戶對某一信息的評價,形成系統關於該指定用戶對此信息的喜好程度預測。

二.步驟

  1.收集用戶偏好。

  2.找到相似的用戶或物品。

  3.計算推薦。

三.用戶評分

  從用戶的行為和偏好中發現規律,並基於此進行推薦,所以收集用戶的偏好信息成為系統推薦效果最基礎的決定因素。

  數據預處理:

    1.減噪

      因為用戶數據在使用過程中可能存在大量噪聲和誤操作,所以需要過濾掉這些噪聲。

    2.歸一化

      不同行為數據的差別比較大,通過歸一化,數據歸於大致均衡,計算時才能減少異常數據產生的影響。

  組合不同用戶行為方式:

    1.將不同的行為分組

    2.對不同行為進行加權

四.相似度計算

  對用戶的行為分析得到用戶的偏好后,可以根據用戶的偏好計算相似用戶和物品,然后可以基於相似用戶或相似物品進行推薦。我們可以將用戶對所有物品的偏好作為一個矩陣來計算用戶之間的相似度,或者將所有用戶對物品的偏好作為一個矩陣來計算物品之間的相似度。

  1.同現相似度

    指在喜愛物品A的前提下,喜愛物品B的概率。當物品B喜愛率較高時可以使用(A交B)/sqrt(A或B)。

  2.歐式距離

    1/(1+d(x,y))

    備注:d(x,y) 歐式距離

  3.皮爾遜相關系數

    皮爾遜相關系數一般用於計算兩個定距變量間聯系的緊密程度,它的取值為【-1~1】之間。

   4.Cosine相似度【余弦相似度】

    Cosine相似度廣泛應用於計算文檔數據的相似度。

  5.Tanimoto系數

    Tanimoto系數也被稱為Jaccard系數,是Cosine相似度的擴展,也多用於計算文檔數據的相似度。

五.代碼實現

  1 package big.data.analyse.ml
  2 
  3 import _root_.breeze.numerics.sqrt
  4 import org.apache.log4j.{Level, Logger}
  5 import org.apache.spark.{SparkContext, SparkConf}
  6 import org.apache.spark.rdd.RDD
  7 
  8 /**
  9   * 用戶評分
 10   * @param userid 用戶
 11   * @param itemid 物品
 12   * @param pref 評分
 13   */
 14 case class ItemPref(val userid : String,val itemid : String, val pref : Double) extends Serializable
 15 
 16 /**
 17   * 相似度
 18   * @param itemid_1 物品
 19   * @param itemid_2 物品
 20   * @param similar 相似度
 21   */
 22 case class ItemSimilar(val itemid_1 : String, val itemid_2 : String, val similar : Double) extends Serializable
 23 
 24 /**
 25   * 給用戶推薦物品
 26   * @param userid 用戶
 27   * @param itemid 物品
 28   * @param pref 推薦系數
 29   */
 30 case class UserRecommend(val userid : String, val itemid : String, val pref : Double) extends Serializable
 31 
 32 /**
 33   * 相似度計算
 34   */
 35 class ItemSimilarity extends Serializable{
 36   def Similarity(user : RDD[ItemPref], stype : String) : RDD[ItemSimilar] = {
 37     val similar = stype match{
 38       case "cooccurrence" => ItemSimilarity.CooccurenceSimilarity(user) // 同現相似度
 39       //case "cosine" => // 余弦相似度
 40       //case "euclidean" => // 歐式距離相似度
 41       case _ => ItemSimilarity.CooccurenceSimilarity(user)
 42     }
 43     similar
 44   }
 45 }
 46 
 47 object ItemSimilarity{
 48   def CooccurenceSimilarity(user : RDD[ItemPref]) : (RDD[ItemSimilar]) = {
 49     val user_1 = user.map(r => (r.userid, r.itemid, r.pref)).map(r => (r._1, r._2))
 50     /**
 51       * 內連接,默認根據第一個相同字段為連接條件,物品與物品的組合
 52       */
 53     val user_2 = user_1.join(user_1)
 54 
 55     /**
 56       * 統計
 57       */
 58     val user_3 = user_2.map(r => (r._2, 1)).reduceByKey(_+_)
 59 
 60     /**
 61       * 對角矩陣
 62       */
 63     val user_4 = user_3.filter(r => r._1._1 == r._1._2)
 64 
 65     /**
 66       * 非對角矩陣
 67       */
 68     val user_5 = user_3.filter(r => r._1._1 != r._1._2)
 69 
 70     /**
 71       * 計算相似度
 72       */
 73     val user_6 = user_5.map(r => (r._1._1, (r._1._1,r._1._2,r._2)))
 74       .join(user_4.map(r => (r._1._1, r._2)))
 75 
 76     val user_7 = user_6.map(r => (r._2._1._2, (r._2._1._1, r._2._1._2, r._2._1._3, r._2._2)))
 77       .join(user_4.map(r => (r._1._1, r._2)))
 78 
 79     val user_8 = user_7.map(r => (r._2._1._1, r._2._1._2, r._2._1._3, r._2._1._4, r._2._2))
 80       .map(r => (r._1, r._2, (r._3 / sqrt(r._4 * r._5))))
 81 
 82     user_8.map(r => ItemSimilar(r._1, r._2, r._3))
 83   }
 84 }
 85 
 86 class RecommendItem{
 87   def Recommend(items : RDD[ItemSimilar], users : RDD[ItemPref], number : Int) : RDD[UserRecommend] = {
 88     val items_1 = items.map(r => (r.itemid_1, r.itemid_2, r.similar))
 89     val users_1 = users.map(r => (r.userid, r.itemid, r.pref))
 90 
 91     /**
 92       * i行與j列join
 93       */
 94     val items_2 = items_1.map(r => (r._1, (r._2, r._3))).join(users_1.map(r => (r._2, (r._1, r._3))))
 95 
 96     /**
 97       * i行與j列相乘
 98       */
 99     val items_3 = items_2.map(r => ((r._2._2._1, r._2._1._1), r._2._2._2 * r._2._1._2))
100 
101     /**
102       * 累加求和
103       */
104     val items_4 = items_3.reduceByKey(_+_)
105 
106     /**
107       * 過濾已存在的物品
108       */
109     val items_5 = items_4.leftOuterJoin(users_1.map(r => ((r._1, r._2), 1))).filter(r => r._2._2.isEmpty)
110     .map(r => (r._1._1, (r._1._2, r._2._1)))
111 
112     /**
113       * 分組
114       */
115     val items_6 = items_5.groupByKey()
116 
117     val items_7 = items_6.map(r => {
118       val i_2 = r._2.toBuffer
119       val i_2_2 = i_2.sortBy(_._2)
120       if(i_2_2.length > number){
121         i_2_2.remove(0, (i_2_2.length - number))
122       }
123       (r._1, i_2_2.toIterable)
124     })
125 
126     val items_8 = items_7.flatMap(r => {
127       val i_2 = r._2
128       for(v <- i_2) yield (r._1, v._1, v._2)
129     })
130 
131     items_8.map(r => UserRecommend(r._1, r._2, r._3))
132   }
133 }
134 
135 /**
136   * Created by zhen on 2019/8/9.
137   */
138 object ItemCF {
139   def main(args: Array[String]) {
140     val conf = new SparkConf()
141     conf.setAppName("ItemCF")
142     conf.setMaster("local[2]")
143 
144     val sc = new SparkContext(conf)
145 
146     /**
147       * 設置日志級別
148       */
149     Logger.getRootLogger.setLevel(Level.WARN)
150 
151     val array = Array("1,1,0", "1,2,1", "1,4,1", "2,1,0", "2,3,1", "2,4,0", "3,1,0", "3,2,1", "4,1,0", "4,3,1")
152     val cf = sc.parallelize(array)
153 
154     val user_data = cf.map(_.split(",")).map(r => (ItemPref(r(0), r(1), r(2).toDouble)))
155 
156     /**
157       * 建立模型
158       */
159     val mySimilarity = new ItemSimilarity()
160     val similarity = mySimilarity.Similarity(user_data, "cooccurrence")
161 
162     val recommend = new RecommendItem()
163     val recommend_rdd = recommend.Recommend(similarity, user_data, 30)
164 
165     /**
166       * 打印結果
167       */
168     println("物品相似度矩陣:" + similarity.count())
169     similarity.collect().foreach(record => {
170       println(record.itemid_1 +","+ record.itemid_2 +","+ record.similar)
171     })
172 
173     println("用戶推薦列表:" + recommend_rdd.count())
174     recommend_rdd.collect().foreach(record => {
175       println(record.userid +","+ record.itemid +","+ record.pref)
176     })
177   }
178 }

六.結果

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM