項目地址:https://github.com/ChanKamShing/UserCF_Spark.git
推薦系統的作業流程:
召回/match(推薦引擎)-> 物品候選集 -> 過濾 -> 排序 -> 策略(保證結果多樣性) -> 推薦list
協同過濾CF屬於第一階段,我們常常稱之為“推薦引擎”。“推薦引擎”可以有多個基准,包括:基於相似用戶、基於相似物品、基於特征搜索,以及基於熱門等方式。通過不同的方式可以解決不同的問題,譬如冷啟動問題,這里介紹的是基於相似用戶的方式。
在本文中,不會詳細介紹代碼,主要從邏輯上講述。
基本步驟:
1、找出當前用戶的若干個相似用戶,取出每個相似用戶購買過的商品(或打分過的電影)集合;
2、基於當前用戶的購買過的商品(或打分過的電影)集合,對其相似用戶購買過的商品(或打分過的電影)集合進行過濾,得出存在相似用戶,同時不存在當前用戶的商品(或電影)集合;
3、基於當前用戶與相似用戶之間的相似度,以及用戶對商品(或電影)的打分,進行排序取topN,得到物品候選集
具體實現:
u.data數據格式(user_id, item_id, rating, timestamp):
一、創建源數據
這里采用數據源為hive,同樣的,可以創建一張具有(user_id, item_id, rating, timestamp)字段的hive表,這樣子就可以通過Spark的DF、SparkSQL等組建對數據進行處理。
二、計算用戶相似度
區別於上一篇的python實現,使用Spark實現,是使用向量的cosine定理。
cosine=a*b/|a|*|b|
每個用戶相當於一個向量,他們各自購買過的商品是其對應向量的維度,維度值就是商品的分值。
1、計算分母
通過向量的定義,那么在Spark操作hive數據當中,就顯得特別容易,
- 對每一行數據的rating進行平方運算;
- 基於user_id進行聚合;
- 再對平方后的rating求和,再開根號;
經過上述步驟,可以將所有的向量的模都求出來。得到的DF數據結構為:(user_id, rating_sqrt_sum)
val userScoreSum = userDataDF.rdd.map(x=>(x(0).toString,x(2).toString)) .groupByKey() .mapValues(x=>sqrt(x.toArray.map(rating=>pow(rating.toDouble,2)).sum)) .toDF("user_id","rating_sqrt_sum")
2、計算分子
分子部分是兩個向量之間進行點乘,即向量之間的各個維度進行一一相乘,再相加。所以先基於原始DF,重新copy一份,作為相似用戶的DF,然后基於item_id,對兩張表進行聚合,構建了DF的數據結構為:(item_id, user_id, rating, user_v, rating_v)。有了這張表,就可以對rating和rating_v進行相乘,然后基於user_id、user_v做聚合操作,再將剛才rating和rating_v的乘積進行累加,就可以算出分子,計算得到的DF數據結構:(user_id, user_v, rating_dot)
// 倒排表(基於item的笛卡兒積)
val vDataDF = userDataDF.selectExpr("user_id as user_v", "item_id", "rating as rating_v") val u_v_decare = userDataDF.join(vDataDF,"item_id") .filter("case(rating as long)<>case(rating_v as long)") // 計算分子,維度值(rating)點乘,累加求和
val df_product = u_v_decare.selectExpr("item_id","user_id","user_v","case(rating as double)*case(rating_v as double) as prod") val df_sim_group = df_product.groupBy("user_id","user_v") .agg("prod"->"sum") .withColumnRenamed("sum(prod)","rating_dot")
3、計算cosine
構建一個新的DF數據結構,分別基於user_id,user_v,將步驟1、2的DF進行聚合,得到(user_id, user_v, rating_dot, rating_sqrt_sum, rating_sqrt_sum_v),接着對每一行數據直接套用cosine公式,最后選取需要的字段,構成新數據結構:(user_id, user_v)。
//計算整個cosine
val vScoreSum = userScoreSum.selectExpr("user_id as user_v","rating_sqrt_sum as rating_sqrt_sum_v") val df_sim_cosine = df_sim_group .join(userScoreSum,"user_id") .join(vScoreSum,"user_v") .selectExpr("user_id","user_v","rating_dot/(rating_sqrt_sum*rating_sqrt_sum_v) as cosine_sim")
三、過濾商品,並對商品進行打分
1、過濾
過濾商品之前,我們需要做得事情,首先獲取topN個相似用戶,然后取出這topN個相似用戶所對應的物品集合,再進行過濾。
1.1、獲取topN相似用戶
df_sim_cosine的結構里面是(user_id, user_v, cosine_sim),這個結構的數據已經是包括用戶兩兩之間的相似度,換句話說,只要根據user_id做聚合,然后基於cosine做反向排序,slice切片,就可以取到user_id的topN個相似用戶。
//使用slice取得topN個相似用戶
val sim_user_topN = df_sim_cosine.rdd.map(row=>(row(0).toString,(row(1).toString,row(2).toString))) .groupByKey() .mapValues(_.toArray.sortWith((x,y)=>x._2>y._2).slice(0,10)) //列轉行, RDD[(String, Array[(String, String)])]
.flatMapValues(x=>x) //行轉列, RDD[(String, (String, String))]
.toDF("user_id","user_v_sim") .selectExpr("user_id","user_v_sim._1 as user_v","user_v_sim._2 as cosine_sim")//將一個tuple的字段拆分成兩個字段
1.2、獲取user_id和其相似用戶物品列表
經過上一步可以獲取topN個相似用戶的數據,只要分別基於user_id,user_v進行關聯,就可以將商品列表給關聯上,而且關聯后的數據,附帶了商品的打分,便於后面給候選商品列表打分。
val df_user_items = userDataDF.rdd.map(row=>(row(0).toString,row(1).toString+"_"+row(2).toString)) .groupByKey() .mapValues(_.toArray) .toDF("user_id","item_rating_arr") val df_user_items_v = df_user_items.selectExpr("user_id as user_id_v", "item_rating_arr as item_rating_arr_v") //依次基於user_id、user_v聚合
val df_gen_item = sim_user_topN .join(df_user_items,"user_id") .join(df_user_items_v,"user_v")
要知道,userDataDF的數據結構是(user_id, item_id, rating),所以,要獲取“列表”,則必須對user_id進行聚合,這里做了一個格式處理,將item和rating用“_”連接,合並成一個數據處理。
1.3、過濾商品
由於上一步對item和rating的數據結構進行處理,所以這一步需要定義一個UDF來對商品進行過濾。
// 用一個udf從user_v的商品集合中,將與user_id具有相同的商品過濾掉,得到候選集
import org.apache.spark.sql.functions._ val filter_udf = udf{(items:Seq[String],items_v:Seq[String])=> val fMap = items.map{x=> val l = x.split("_") (l(0),l(1)) }.toMap //返回items_v,過濾商品
items_v.filter{x=> val l = x.split("_") fMap.getOrElse(l(0),-1) == -1 } }
items參數是user_id的商品集,items_v是user_v的商品集,使用該UDF后,會得到一個在user_v商品集基礎上過濾掉user_id商品集的、全新的商品集,然后選取需要的列構建新的DF。
//過濾掉user_id商品的DF數據(user_id, consine_sim, item_rating)
val df_filter_item = df_gen_item.withColumn("filtered_item", filter_udf(df_gen_item("item_rating_arr"),df_gen_item("item_rating_arr_v"))) .select("user_id","cosine_sim", "filtered_item")
2、給候選商品進行打分
(物品分數=用戶相似度*相似用戶對電影(物品)的打分)
經過過濾操作,我們得到一個數據結構(user_id, cosine_sim, filtered_item)的DF,現在顯而易見,需要的參數已經有了,剩下的就是直接套用公式。但是不要忘記,filtered_item的數據是一個Array類型,是一個商品的集合,所以可以定義一個UDF,作用是遍歷商品集合,分別乘以對應的cosine_sim。
val simRatingUDF = udf{(sim:Double,items:Seq[String])=> items.map{item_rating=> val l = item_rating.split("_") l(0)+"_"+l(1).toDouble*sim } }
得到的仍然是一個Array類型數據,即topN里的每個相似用戶對應的物品的集合,我們最終要的是topN相似用戶的商品集合組成的總的商品集合,再取topN個商品,所以,必須將Array拆開,可以使用explode。
//DF:(user_id,item_prod)
val itemSimRating = df_filter_item.withColumn("item_prod",simRatingUDF(df_filter_item("cosine_sim"),df_filter_item("filtered_item"))) .select("user_id","item_prod") //行轉列Array[item_prod],並分割item_pro。 // 注意:得出的數據結果,會出現多個相同的user_id->item,因為同一個user_id的不同相似用戶,可能會有同一樣商品,分割后,就出現這情況
val userItemScore = itemSimRating.select(itemSimRating("user_id"),explode(itemSimRating("item_prod"))) .selectExpr("user_id","split('item_prod','_')[0] as item_id","case(split('item_prod','_')[1] as double) as score")//將一個字符串的字段拆分成兩個字段
這里又會出現一個問題,user_id有topN個相似用戶,他們對應得到的商品集合里面,很大可能存在相同的item,那么就需要基於user_id和item_id做一個聚合,然后將相同item_id的打分進行累加,這才候選商品最后的打分。(區別於上一篇的python實現,它是基於傑卡爾德計算用戶相似度,使用的是商品數量,相當於商品的原始權重都為1,並沒有區分出一些具有代表性意義的商品,所以它還要對商品進行log降權處理;但是cosine計算相似度,直接使用rating值,rating值已經是對商品標上不同的權重)
//DF:(user_id,item_prod)
val itemSimRating = df_filter_item.withColumn("item_prod",simRatingUDF(df_filter_item("cosine_sim"),df_filter_item("filtered_item"))) .select("user_id","item_prod") //行轉列Array[item_prod],並分割item_pro。 // 注意:得出的數據結果,會出現多個相同的user_id->item,因為同一個user_id的不同相似用戶,可能會有同一樣商品,分割后,就出現這情況
val userItemScore = itemSimRating.select(itemSimRating("user_id"),explode(itemSimRating("item_prod"))) .selectExpr("user_id","split('item_prod','_')[0] as item_id","case(split('item_prod','_')[1] as double) as score")//將一個字符串的字段拆分成兩個字段 //基於user_id和item_id做聚合
val userItemScoreSum = userItemScore.groupBy("user_id","item_id") .agg("score"->"sum") .withColumnRenamed("sum(score)","last_score")
四、取topN商品
//排序取topN商品
val df_rec = userItemScoreSum.rdd.map(row=>(row(0),(row(1).toString,row(2).toString))) .groupByKey() .mapValues(_.toArray.sortWith((x,y)=>x._2>y._2).slice(0,10)) .flatMapValues(x=>x) .toDF("user_id","item_sim") .selectExpr("user_id","item_sim._1 as item_id", "item_sim._2 as score")