原文:http://blog.selfup.cn/1001.html
什么是協同過濾
協同過濾(Collaborative Filtering, 簡稱CF),wiki上的定義是:簡單來說是利用某興趣相投、擁有共同經驗之群體的喜好來推薦使用者感興趣的資訊,個人透過合作的機制給予資訊相當程度的回應(如評分)並記錄下來以達到過濾的目的進而幫助別人篩選資訊,回應不一定局限於特別感興趣的,特別不感興趣資訊的紀錄也相當重要。
以上定義太拗口,舉個簡單的例子:我現在多年不看日本anime的新番了,最近突然又想看幾部新番,但又不知道這么多新番中看哪些比較好,於是我就找幾個同樣喜歡日本動漫的朋友來咨詢。我第一個想咨詢的朋友是和我口味最像的,我們都特別喜歡看《蟲師》、《黑之契約者》、《寒蟬》這樣的小眾動畫;我問的第二個朋友和我口味差不多,他特別喜歡看《鋼煉》《無頭騎士異聞錄》這樣的動畫,我雖然喜歡,但不像他那么喜歡;由於身邊喜歡日本動畫的朋友不多,剩下第三個可以咨詢的是一個宅女,平常經常看腐、宅、基的動漫,完全跟我不是一路人,於是問了下她推薦的片子,並將這些片子打上的黑名單的標簽。然后我就開始看第一個朋友推薦的片子了,要是時間特別多又很無聊我可能會看第二個朋友推薦的,但打死我也不會看第三個朋友推薦的。這就是協同過濾的一個簡化、小眾版。
如何進行相似度度量
接着上面的例子,我可以通過我和其它朋友共同喜歡某個或某類動漫來確定我們的口味是否一樣,那么如何以數學或者機器的形式來表示這個“口味一樣”呢?通常,是通過“距離”來表示,例如:歐幾里得距離、皮爾遜相關度、曼哈頓距離、Jaccard系數等等。
歐幾里得距離
歐幾里德距離(Euclidean Distance),最初用於計算歐幾里得空間中兩個點的距離,在二維空間中,就是我們熟悉的兩點間的距離,x、y表示兩點,維度為n:

相似度:

皮爾遜相關度
皮爾遜相關度(Pearson Correlation Coefficient),用於判斷兩組數據與某一直線擬合程度的一種度量,取值在[-1,1]之間。當數據不是很規范的時候(如偏差較大),皮爾遜相關度會給出較好的結果。

曼哈頓距離
曼哈頓距離(Manhattan distance),就是在歐幾里得空間的固定直角坐標系上兩點所形成的線段對軸產生的投影的距離總和。

Jaccard系數
Jaccard系數,也稱為Tanimoto系數,是Cosine相似度的擴展,也多用於計算文檔數據的相似度。通常應用於x為布爾向量,即各分量只取0或1的時候。此時,表示的是x,y的公共特征的占x,y所占有的特征的比例:

計算推薦
根據上述“距離”的算法,我們可以找出與自己“口味一樣”的人了,但這並不是目的,目的是找出推薦的物品。一種常用的做法是選出與你興趣相同的N個人,然后根據這N個人的記錄來進行加權推薦。具體如下,假設已經計算出歐幾里得相似度:
| 朋友 | 相似度 | 銀魂 | S.x銀魂 | 食靈零 | S.x食靈零 | 雨月 | S.x雨月 |
|---|---|---|---|---|---|---|---|
| A | 0.95 | 10.0 | 9.5 | 9.0 | 8.55 | - | - |
| B | 0.80 | 8.5 | 6.8 | 7.5 | 6 | 5.0 | 4 |
| C | 0.25 | - | - | 6.5 | 1.625 | 9.0 | 2.25 |
| 總計 | 16.3 | 16.175 | 6.25 | ||||
| Sim.Sum | 1.75 | 2 | 1.05 | ||||
| 總計/Sim.Sum | 9.31 | 8.09 | 5.95 |
其中,S.x開頭的表示相似度與評分的乘積,Sim.Sum表示打過分的朋友的相似度之和。可以看出根據三位友人的推薦,我從這三部動漫中應該選擇銀魂來看。
Item CF與User CF
基於用戶的協同過濾(User CF),其基本思想相當簡單,基於用戶對物品的偏好找到相鄰鄰居用戶,然后將鄰居用戶喜歡的推薦給當前用戶。上述過程就屬於User CF。
基於物品的CF(Item CF)的原理和基於用戶的CF類似,只是在計算鄰居時采用物品本身,而不是從用戶的角度,即基於用戶對物品的偏好找到相似的物品,然后根據用戶的歷史偏好,推薦相似的物品給他。
兩者的計算復雜度和適用場景皆不同,詳細可參見參考資料。
Spark協同過濾實現
訓練數據
196 242 3 881250949 186 302 3 891717742 22 377 1 878887116 244 51 2 880606923 166 346 1 886397596 298 474 4 884182806 ...
其中第一列為userid,第二列為movieid,第三列為評分,第四列為timestamp(未使用)。
完整版下載:MovieLens(解壓后的u.data文件)。http://files.grouplens.org/datasets/movielens/ml-100k.zip
代碼實現
public class UserSideCF implements Serializable {
private static final Pattern TAB = Pattern.compile("\t");
public MatrixFactorizationModel buildModel(RDD<Rating> rdd) { //訓練模型
int rank = 10;
int numIterations = 20;
MatrixFactorizationModel model = ALS.train(rdd, rank, numIterations, 0.01);
return model;
}
public RDD<Rating>[] splitData() { //分割數據,一部分用於訓練,一部分用於測試
SparkConf sparkConf = new SparkConf().setAppName("JavaALS").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = sc.textFile("/home/nodin/ml-100k/u.data");
JavaRDD<Rating> ratings = lines.map(line -> {
String[] tok = TAB.split(line);
int x = Integer.parseInt(tok[0]);
int y = Integer.parseInt(tok[1]);
double rating = Double.parseDouble(tok[2]);
return new Rating(x, y, rating);
});
RDD<Rating>[] splits = ratings.rdd().randomSplit(new double[]{0.6,0.4}, 11L);
return splits;
}
public static void main(String[] args) {
UserSideCF cf = new UserSideCF();
RDD<Rating>[] splits = cf.splitData();
MatrixFactorizationModel model = cf.buildModel(splits[0]);
Double MSE = cf.getMSE(splits[0].toJavaRDD(), model);
System.out.println("Mean Squared Error = " + MSE); //訓練數據的MSE
Double MSE1 = cf.getMSE(splits[1].toJavaRDD(), model);
System.out.println("Mean Squared Error1 = " + MSE1); //測試數據的MSE
}
public Double getMSE(JavaRDD<Rating> ratings, MatrixFactorizationModel model) { //計算MSE
JavaPairRDD usersProducts = ratings.mapToPair(rating -> new Tuple2<>(rating.user(), rating.product()));
JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = model.predict(usersProducts.rdd())
.toJavaRDD()
.mapToPair(new PairFunction<Rating, Tuple2<Integer, Integer>, Double>() {
@Override
public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating rating) throws Exception {
return new Tuple2<>(new Tuple2<>(rating.user(), rating.product()), rating.rating());
}
});
JavaPairRDD<Tuple2<Integer, Integer>, Double> ratesAndPreds = ratings
.mapToPair(new PairFunction<Rating, Tuple2<Integer, Integer>, Double>() {
@Override
public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating rating) throws Exception {
return new Tuple2<>(new Tuple2<>(rating.user(), rating.product()), rating.rating());
}
});
JavaPairRDD joins = ratesAndPreds.join(predictions);
return joins.mapToDouble(new DoubleFunction<Tuple2<Tuple2<Integer, Integer>, Tuple2<Double, Double>>>() {
@Override
public double call(Tuple2<Tuple2<Integer, Integer>, Tuple2<Double, Double>> o) throws Exception {
double err = o._2()._1() - o._2()._2();
return err * err;
}
}).mean();
}
}
運行結果
0.6,0.4(訓練數據,測試數據) - 0.3706799281981904, 2.4569381099423957(訓練數據,測試數據) 0.7,0.3 - 0.40358067085112936, 2.4469113734667935 0.8,0.2 - 0.4335027003381571, 2.0930908173274476 0.9,0.1 - 0.4587619714761296, 1.7213014771993198
