偏好是無法度量的。
相比其他的機器學習算法,推薦引擎的輸出更直觀,更容易理解。
接下來三章主要講述Spark中主要的機器學習算法。其中一章圍繞推薦引擎展開,主要介紹音樂推薦。在隨后的章節中我們先介紹Spark和MLib的實際應用,接着介紹一些機器學習的基本思想。
3.1 數據集
用戶和藝術家的關系是通過其他行動隱含提現出來的,例如播放歌曲或專輯,而不是通過顯式的評分或者點贊得到的。這被稱為隱式反饋數據。現在的家用電視點播也是這樣,用戶一般不會主動評分。
數據集在http://www-etud.iro.umontreal.ca/~bergstrj/audioscrobbler_data.html,需要自帶梯子,下載地址是http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz,這個好像不用梯子。在百度網盤共享地址是http://pan.baidu.com/s/1bQ4Ilg。
3.2 交替最小二乘推薦算法
我們要找的學習算法不需要用戶和藝術家的屬性信息。這類算法通常稱為協同過濾算法。根據兩個用戶的年齡相同來判斷談么可能有相似的偏好,這不叫協同過濾。相反,根據兩個用戶包房過許多相同歌曲來判斷他們可能都喜歡某首歌,這才叫協同過濾。
潛在因素模型通過數量相對少的未被觀察到的底層原因,來解釋大量用戶和產品之間可觀察到的交互。
本實例使用的是一種矩陣分解模型。問題簡化為“用戶-特征矩陣”和“特征-藝術家矩陣”的乘積,該乘積的結果是對整個稠密的“用戶-藝術家互相關心矩陣”的完整估計。
在求解矩陣分解時,使用交替最小二乘(Alternating Least Squares,ALS)算法。需要借助QR分解的方法。
3.3 准備數據
如果數據不是運行在集群上,而是運行在本地,為了保證內存充足,在啟動spark-shell時需要指定參數--driver-memory 6g。
構建模型的第一步是了解數據,對數據進行解析或轉換,以便在Spark中做分析。
Spark MLib的ALS算法實現有一個小缺點:它要求用戶和產品的ID必須是數值型,並且是32位非負整數,這意味着大於Integer.MAX_VALUE(2147483647)的ID是非法的。我們首先看看數據集是否滿足要求:
Scala:
scala> val rawUserArtistData = sc.textFile("D:/Workspace/AnalysisWithSpark/src/main/java/advanced/chapter3/profiledata_06-May-2005/user_artist_data.txt") rawUserArtistData: org.apache.spark.rdd.RDD[String] = D:/Workspace/AnalysisWithSpark/src/main/java/advanced/chapter3/profiledata_06-May-2005/user_artist_data.txt MapPartitionsRDD[1] at textFile at <console>:27 scala> rawUserArtistData.map(_.split(' ')(0).toDouble).stats() res0: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1947573.265353, stdev: 496000.544975, max: 2443548.000000, min: 90.000000) scala> rawUserArtistData.map(_.split(' ')(1).toDouble).stats() res1: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1718704.093757, stdev: 2539389.040171, max: 10794401.000000, min: 1.000000)
Java:
1 //初始化SparkConf 2 SparkConf sc = new SparkConf().setMaster("local").setAppName("RecommendingMusic"); 3 System.setProperty("hadoop.home.dir", "D:/Tools/hadoop-2.6.4"); 4 JavaSparkContext jsc = new JavaSparkContext(sc); 5 6 //讀入用戶-藝術家播放數據 7 JavaRDD<String> rawUserArtistData =jsc.textFile("src/main/java/advanced/chapter3/profiledata_06-May-2005/user_artist_data.txt"); 8 9 //顯示數據統計信息 10 System.out.println(rawUserArtistData.mapToDouble(line -> Double.parseDouble(line.split(" ")[0])).stats()); 11 System.out.println(rawUserArtistData.mapToDouble(line -> Double.parseDouble(line.split(" ")[1])).stats());
最大用戶和藝術家ID為2443548和10794401,沒必要處理這些ID。
接着解析藝術家ID與與藝術家名對應關系。由於文件中少量的行不規范,有些行沒有制表符、有些不小心加入了換行符,所以不能直接使用map處理。這是需要使用flatMap,它將輸入對應的兩個或多個結果組成的集合簡單展開,然后放到一個更大的RDD中。后面的Scala程序沒有運行過,只是粘貼在這里。讀入藝術家ID-藝術家名數據並剔除錯誤數據:
Scala:
val rawArtistData = sc.textFile("hdfs:///user/ds/artist_data.txt") val artistByID = rawArtistData.flatMap { line => val (id, name) = line.span(_ != '\t') if (name.isEmpty) { None } else { try { Some((id.toInt, name.trim)) } catch { case e: NumberFormatException => None } } }
Java:
1 //讀入藝術家ID-藝術家名數據 2 JavaRDD<String> rawArtistData =jsc.textFile("src/main/java/advanced/chapter3/profiledata_06-May-2005/artist_data.txt"); 3 JavaPairRDD<Integer, String> artistByID = rawArtistData.flatMapToPair(line -> { 4 List<Tuple2<Integer, String>> results = new ArrayList<>(); 5 String[] lineSplit = line.split("\\t", 2); 6 if (lineSplit.length == 2) { 7 Integer id; 8 try { 9 id = Integer.parseInt(lineSplit[0]); 10 } catch (NumberFormatException e) { 11 id = null; 12 } 13 if(!lineSplit[1].isEmpty() && id != null){ 14 results.add(new Tuple2<Integer, String>(id, lineSplit[1])); 15 } 16 } 17 return results; 18 });
將拼寫錯誤的藝術家ID或非標准的藝術家ID映射為藝術家的正規名:
Scala:
val rawArtistAlias = sc.textFile("hdfs:///user/ds/artist_alias.txt") val artistAlias = rawArtistAlias.flatMap { line => val tokens = line.split('\t') if (tokens(0).isEmpty) { None } else { Some((tokens(0).toInt, tokens(1).toInt)) } }.collectAsMap()
Java:
1 //將拼寫錯誤的藝術家ID或非標准的藝術家ID映射為藝術家的正規名 2 JavaRDD<String> rawArtistAlias =jsc.textFile("src/main/java/advanced/chapter3/profiledata_06-May-2005/artist_alias.txt"); 3 Map<Integer, Integer> artistAlias = rawArtistAlias.flatMapToPair(line -> { 4 List<Tuple2<Integer, Integer>> results = new ArrayList<>(); 5 String[] lineSplit = line.split("\\t", 2); 6 if((lineSplit.length == 2 && !lineSplit[0].isEmpty())){ 7 results.add(new Tuple2<Integer, Integer>(Integer.parseInt(lineSplit[0]), Integer.parseInt(lineSplit[1]))); 8 } 9 return results; 10 }).collectAsMap();
artist_alias.txt中第一條為:"1092764 1000311",獲取ID為1092764和1000311的藝術家名:
Java:
1 artistByID.lookup(1092764).forEach(System.out::println); 2 artistByID.lookup(1000311).forEach(System.out::println);
輸出為:
Winwood, Steve
Steve Winwood
書中的例子為:
Scala:
artistByID.lookup(6803336).head artistByID.lookup(1000010).head
Java:
1 artistByID.lookup(1000010).forEach(System.out::println); 2 artistByID.lookup(6803336).forEach(System.out::println);
輸出為:
Aerosmith (unplugged)
Aerosmith
3.4 構建第一個模型
我們需要做兩個轉換:第一,將藝術家ID轉為正規ID;第二,把數據轉換成rating對象,它是ALS算法對“用戶-產品-值”的抽象。其中產品指“向人們推薦的物品”。現在完成這兩個工作:
Scala:
import org.apache.spark.mllib.recommendation._ val bArtistAlias = sc.broadcast(artistAlias) val trainData = rawUserArtistData.map { line => val Array(userID, artistID, count) = line.split(' ').map(_.toInt) val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID) Rating(userID, finalArtistID, count) }.cache()
Java:
1 //數據集轉換 2 Broadcast<Map<Integer, Integer>> bArtistAlias = jsc.broadcast(artistAlias); 3 4 JavaRDD<Rating> trainData = rawUserArtistData.map( line -> { 5 List<Integer> list = Arrays.asList(line.split(" ")).stream().map(x -> Integer.parseInt(x)).collect(Collectors.toList()); 6 bArtistAlias.getValue().getOrDefault(list.get(1), list.get(1)); 7 return new Rating(list.get(0), list.get(1), list.get(2)); 8 }).cache();
這里使用了廣播變量,它能夠在每個executor上將數據緩存為原始的Java對象,這樣就不用為每個任務執行反序列化,可以在多個作業和階段之間緩存數據。
構建模型:
Scala:
val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)
Java:
1 MatrixFactorizationModel model = org.apache.spark.mllib.recommendation.ALS.train(JavaRDD.toRDD(trainData), 10, 5 ,0.01, 1);
構建要花費很長的時間。我使用的是i5的筆記本,估計得三四天才能算完。所以實際計算時,我只使用了前98個用戶的數據,一共是14903行。所以在3.5中打印用戶播放過藝術家作品時,ID使用數據集中有的ID。
查看特征變量:
Scala:
model.userFeatures.mapValues(_.mkString(", ")).first()
Java:
1 model.userFeatures().toJavaRDD().foreach(f -> System.out.println(f._1.toString() + f._2[0] + f._2.toString()));
3.5 逐個檢查推薦結果
獲取用戶ID對應的藝術家:
Scala:
val rawArtistsForUser = rawUserArtistData.map(_.split(' ')). filter { case Array(user,_,_) => user.toInt == 2093760 } val existingProducts = rawArtistsForUser.map { case Array(_,artist,_) => artist.toInt }. collect().toSet artistByID.filter { case (id, name) => existingProducts.contains(id) }.values.collect().foreach(println)
Java:
1 JavaRDD<String[]> rawArtistsForUser = rawUserArtistData.map(x -> x.split(" ")).filter(f -> Integer.parseInt(f[0]) == 1000029 ); 2 List<Integer> existingProducts = rawArtistsForUser.map(f -> Integer.parseInt(f[1])).collect(); 3 artistByID.filter(f -> existingProducts.contains(f._1)).values().collect().forEach(System.out::println);
我們可以對此用戶做出5個推薦:
Scala:
val recommendations = model.recommendProducts(2093760, 5) recommendations.foreach(println)
Java:
Rating[] recommendations = model.recommendProducts(1000029, 5);
Arrays.asList(recommendations).stream().forEach(System.out::println);
結果如下:
Rating(1000029,1001365,506.30319635520425) Rating(1000029,4531,453.6082026572616) Rating(1000029,4468,137.14313260781685) Rating(1000029,599,130.16330043654924) Rating(1000029,1003352,128.75804355555215)
書上說每行最后的數值是0到1之間的模糊值,值越大,推薦質量越好。但是我運行返回的結果不是這樣的。
Spark 1.6.2的Java API是這么說的:
Rating objects, each of which contains the given user ID, a product ID, and a "score" in the rating field. Each represents one recommended product, and they are sorted by score, decreasing. The first returned is the one predicted to be most strongly recommended to the user. The score is an opaque value that indicates how strongly recommended the product is.
應該最后一個數組是評分吧。
得到所推薦藝術家的ID后,就可以用類似的方法查到藝術家的名字:
Scala:
val recommendedProductIDs = recommendations.map(_.product).toSet artistByID.filter { case (id, name) => recommendedProductIDs.contains(id) }.values.collect().foreach(println)
Java:
1 List<Integer> recommendedProductIDs = Arrays.asList(recommendations).stream().map(y -> y.product()).collect(Collectors.toList()); 2 artistByID.filter(f -> recommendedProductIDs.contains(f._1)).values().collect().forEach(System.out::println);
輸出結果:
Barenaked Ladies Da Vinci's Notebook Rage They Might Be Giants "Weird Al" Yankovic
書上說好像推薦的結果不怎么樣。
3.8 選擇超參數
計算AUC這部分代碼沒有試。AUC(Area Under ROC Curve)是ROC(Receiver Operating Characteristic,受試者工作特征)線,它源於二戰中用於敵機檢測的雷達信號分析技術。在非均等代價下,ROC曲線不能直接反映出學習器的期望總體代價,而“代價曲線”則可達到該目的。
機器學習常涉及兩類參數:一類是算法的參數,亦稱“超參數”,數目常在10以內;另一類是模型的參數,數目可能很多。前者通常是由人工設定多個參數候選值后產生模型,后者則是通過學習來產生多個候選模型。
ALS.trainImplicit()的參數包括以下幾個:
rank
模型的潛在因素的個數,即“用戶-特征”和“產品-特征”矩陣的列數;一般來說,它也是矩陣的階。
iterations
矩陣分解迭代的次數;迭代的次數越多,花費的時間越長,但分解的結果可能會更好。
lambda
標准的過擬合參數;值越大越不容易產生過擬合,但值太大會降低分解的准確度。lambda取較大的值看起來結果要稍微好一些。
alpha
控制矩陣分解時,被觀察到的“用戶-產品”交互相對沒被觀察到的交互的權重。40是最初ALS論文的默認值,這說明了模型在強調用戶聽過什么時的表現要比強調用戶沒聽過什么時要好。
3.9 產生推薦
這個模型可以對所有用戶產生推薦。它可以用於批處理,批處理每隔一個小時或更短的時間為所有用戶重算模型和推薦結果,具體時間間隔取決於數據規模和集群速度。
但是目前Spark Mlib的ALS實現並不支持向所有用戶給出推薦。該實現可以每次對一個用戶進行推薦,這樣每次都會啟動一個短的幾秒鍾的分布式作業。這適合對小用戶群體快速重算推薦。下面對數據中的多個用戶進行推薦並打印結果:
Scala:
val someUsers = allData.map(_.user).distinct().take(100) val someRecommendations = someUsers.map(userID => model.recommendProducts(userID, 5)) someRecommendations.map( recs => recs.head.user + " -> " + recs.map(_.product).mkString(", ") ).foreach(println)
Java:
1 //對id為1000029的用戶做5個推薦 2 Rating[] recommendations = model.recommendProducts(1000029, 5); 3 Arrays.asList(recommendations).stream().forEach(System.out::println);
整個流程也可用於向藝術家推薦用戶:
Scala:
rawUserArtistData.map { line => ... val userID = tokens(1).toInt val artistID = tokens(0).toInt ... }
Java:
在數據集轉換時,"return new Rating(list.get(0), list.get(1), list.get(2));"變為"return new Rating(list.get(1), list.get(0), list.get(2));"
3.10 小結
對於非隱含數據,MLib也支持一種ALS的變體,它的用法和ALS是一樣的,不同之處在於模型使用方法ALS.train()構建。它適用於給出評分數據而不是次數數據。比如,如果數據集是用戶對藝術家的打分,值從1到5,那么用這種變體就很合適。不同推薦方法返回的Rating對象結果,其中rating字段是估計的打分。
如果能需要按需計算出推薦結果,可以使用Oryx 2(https://github.com/OryxProject/oryx),其底層使用MLib之類的庫,但用高效的方式訪問內存中的模型數據。