音樂推薦與Audioscrobbler數據集


1. Audioscrobbler數據集

數據下載地址:

http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz

Audioscrobbler 數據集只記錄了播放數據,如“Bob 播放了一首Prince 的歌曲”。播放記錄所包含的信息比評分要少。僅憑Bob 播放過某一首歌這一信息並不能說明他真的喜歡這首歌。

雖然人們經常聽音樂,但卻很少給音樂評分。因此Audiocrobbler 數據集要大得多。它覆蓋了更多的用戶和藝術家,也包含了更多的總體信息,雖然單條記錄的信息比較少,這種類型的數據通常被稱為隱式反饋數據,因為用戶和藝術家的關系是通過其他行動隱含體現出來的,而不是通過顯示的評分或點贊得到的

 

2. 交替最小二乘推薦算法

現在我們要給這個隱式反饋數據選擇一個合適的推薦算法。這個數據及只記錄了用戶和歌曲之間的交互情況。

除了藝術家的名字外,數據集沒有包含用戶的信息,也沒有提供歌手的其他任何信息。我們要找的學習算法不需要用戶和藝術家的屬性信息。這類算法通常稱為協同過濾算法。

舉個例子:根據兩個用戶的年齡相同來判斷他們可能有相似的偏好,這不叫協同過濾。相反,根據兩個用戶播放過許多相同歌曲來判斷他們可能都喜歡某首歌,這才叫協同過濾

 

3. 矩陣分解模型

現在我們的用戶和產品數據可以視為一個大矩陣A,矩陣第i 行和第j 列上的元素都有值,代表用戶i 播放過藝術家 j 的音樂。

可以預料到,矩陣A 是稀疏的,里面大多數元素為0.因為相對於所有可能的 用戶 – 藝術家 組合,只有很少一部分組合會出現在數據中。

 

這里我們使用的是一種矩陣分解模型( Non-negative matrix factorization)。算法將A 分解為兩個小矩陣 X 和 Y 的乘積:

 

 

如圖所示,假設A 為 m x n矩陣,X 為 m x k矩陣,Y為 n x k 矩陣。此時X與Y 均是行多列少。行分別為用戶數與藝術家數,列均為 k(k為一個較小值)。此時 k 稱為潛在因素。

 

4. 潛在因素

潛在因素(factor analysis)模型:

潛在因素模型試圖通過數量相對較少的未被觀察到的底層原因,來解釋大量用戶和產品之間可觀察到的交互。

例如:有幾千個專輯可選,為什么數百萬人偏偏只買其中某些專輯?可以用對類別(可能只有數十種)的偏好來解釋用戶和專輯的關系,其中偏好信息並不能直接觀察到,而數據也沒有給出這些信息。

這里的偏好,其實就是我們上述要求的 k 里的值。這個k就是潛在因素,用於解釋數據中的交互關系。由於k 很小,矩陣分解算法只能是某種近似。所以上述圖里用的約等於號。

 

5. 矩陣補全算法

矩陣分解算法有時也稱為矩陣補全(matrix completion)算法。因為原始矩陣A 可能非常稀疏,但是乘積        是稠密的。

因為在我們計算出所有列里的系數(最優解)后,在使用 XYT 還原A時,大部分數據會被補全。

兩個矩陣分別有一行對應每個用戶和每個藝術家。每行的值很少,只有 k 個。每個值代表了對應模型的一個隱含特征。

因此行表示了用戶和藝術家怎樣關聯到這些隱含特征,而隱含特征可能就對應偏好或類別。

於是問題就簡化為 用戶 – 特征矩陣 和 特征 – 藝術家矩陣 的乘積,該乘積的結果是對整個稠密的 用戶 – 藝術家 相互關系矩陣的完整估計。

 

不幸的是,A = XYT  通常根本沒有解。原因就是X 和 Y 通常不夠大(嚴格來講,就是矩陣的階太小),無法完美表示 A。

這其實也是好事。A 只是所有可能出現的交互關系的一個微小樣本。在某種程度上我們認為 A 是對基本事實的一次觀察,它太稀疏,因此很難解釋這個基本事實。但用少數幾個因素(k 個)就能很好地解釋這個基本事實。

XYT 應該盡可能逼近 A,畢竟這是所有后續工作的基礎,但它不能也不應該完全復制A。然而同樣不幸的是,想直接得到 X 和 Y 的最優解是不可能的。好消息是,如果 Y 已知,求 X 的最優解是非常容易的,反之亦然。但 X 和 Y 事先都是未知的

當然,我們仍有辦法求解X 和 Y,其中之一便是交替最小二乘(Alternating Least Squares, ALS)法。Spark Mllib 的ALS 就是此算法的實現。

 

雖然 Y 是未知的,但我們可以把它初始化為隨機行向量矩陣。接着運用簡單的線性代數,就能在給定 A 和 Y 的條件下求出 X 的最優解。

實際上,X 的第 i 行是 A 的第 i 行和 Y 的函數,因此可以很容易分開計算 X 的每一行。因為 X 的每一行可以分開計算,所以我們可以將其並行化,而並行化是大規模計算的一大優點。

  AiY(YTY)-1 = Xi

想要兩邊精確相等是不可能的,因此實際的目標是最小化  | AiY(YTY)-1 - Xi | ,或者最小化它們的平方誤差(為了計算優化方便)。這也就是著名的最小二乘法。

這里給出的方程式只是為了說明行向量的計算方法,但實踐中從來不會對矩陣求逆,我們會借助於QR 分解之類的方法,這種方法速度更快且更直接。

 

同理,我們可以由X 計算  Yj 。然后又可以由 Y 計算 X,這樣反復下去。這就是算法中“交替”的由來。

這里 Y 是隨機的,X是由最優化計算出來的,如此迭代下去,X 和 Y 最終會收斂得到一個合適的結果。

將 ALS 算法用於隱形數據矩陣分解時,ALS 矩陣分解稍微復雜一點。它不是直接分解輸入矩陣 A,而是分解由 0 和 1 組成的矩陣 P,當 A 中元素為正時,P 中對應元素為 1,否則為 0。A 中的具體值會在后面以權重的形式反映出來。

ALS 算法也可以利用輸入數據是稀疏的這一特點。稀疏的輸入數據、可以用簡單的線性代數運算求最優解,以及數據本身可並行化,這三點使得算法在大規模數據上速度非常快。

 

6. 准備數據

1. 先把數據上傳到 HDFS

 

2. Spark Mllib 的 ALS 算法實現有一個小缺點:它要求用戶和產品的 ID 必須是數值型,並且是 32 位非負整數。這意味着大於 Integer.MAX_VALUE(即2147483647)的ID 都是非法的。我們的數據是否滿足了這個要求?

 

3. 導入數據:

val rawUserArtistData = sc.textFile(“hdfs:///user/hadoop/profiledata/user_artist_data.txt“, 24)

默認情況下,RDD 為每個HDFS 塊生成一個分區,將HDFS 塊大小設為典型的128M 或 64M。

由於此文件大小為 400M 左右,所以文件被拆為 3 個或 6 個分區。這通常沒什么問題,但由於相比簡單文本處理,ALS 這類機器學習算法要消耗更多的計算資源,因此減小數據塊大小以增加分區個數會更好。

減小數據塊能使Spark 處理任務的同時使用的處理器核數更多。可以為textFile方法設置第二個參數,用這個參數指定一個不同於默認值的分區數,這樣就可以將分區數設置的更大些。比如,可以考慮將這個參數設為集群處理器總核數。

 

首先我們查看一下數據樣例:

scala> val first = rawUserArtistData.first

first: String = 1000002 1 55 (分別為用戶id,藝術家id,和播放次數)

 

然后使用stats統計數據查看用戶與藝術家的id 是否超過ALS 限制:

scala> rawUserArtistData.map(_.split(' ')(0).toDouble).stats()

[Stage 0:>                                                         (0 + 4) / 24]  (這里會用24個container)

res2: org.apache.spark.util.StatCounter = (max: 2443548.000000, …)

scala> rawUserArtistData.map(_.split(' ')(1).toDouble).stats()

res1: org.apache.spark.util.StatCounter = (max: 10794401.000000, …)

通過stats() 得到的統計數據,可以發現用戶 id 最高為2443548,藝術家 id 最高為 10794401,都遠小於 ALS 的數值型限制。

 

另一文件 artist_data.txt 里存放了藝術家名字以及數值ID。我們可以看一下數據樣例:

val rawArtistData = sc.textFile("hdfs:///user/hadoop/profiledata/artist_data.txt", 24)

val first = rawArtistData.first

String = 1134999  06Crazy Life

每行數據將 id 與 藝術家名字 以 tab 分隔。這里我們可以使用 span() 方法來分割字符串,並返回一對元組:

scala> first.span(_ != '\t')

res9: (String, String) = (1134999,"     06Crazy Life")

 

將span() 方法應用到集群數據:

scala> val artistByID = rawArtistData.map( line => {

     | val (id, name) = line.span(_ != '\t')

     | (id.toInt, name.trim)

     | })

但是在應用到集群數據時,會遇到報錯:

scala> artistByID.count

Caused by: java.lang.NumberFormatException: For input string: "3AW“

說明有些數據是非法的,並不完全遵循數據格式。

 

flatMap 與 map 均是對每個元素應用一個函數操作的函數,它們的區別為:

val books = List("Hadoop", "Hive", "HDFS")

scala> books.map(_.toList)

res24: List[List[Char]] = List(List(H, a, d, o, o, p), List(H, i, v, e), List(H, D, F, S))

scala> books.flatMap(_.toList)

res25: List[Char] = List(H, a, d, o, o, p, H, i, v, e, H, D, F, S)

map 會返回每個函數作用后的結果。flatMap 在返回結果后,再將結果集合展開。

 

現在我們使用flatMap 和 Option 類一起用,來對artist_data 進行處理:

val result = head.flatMap( line => {

     val (id, name) = line.span(_ != '\t')

     try{

     Some((id.toInt, name.trim))

     } catch {

     case e: NumberFormatException => None}

     })

 

為了更好的理解map、flatMap 以及 Option 類的 Some,我們看一下下面這個例子:

假設我們有以下數據:

scala> head

res25: Array[String] = Array(1134999    06Crazy Life, AVW13, 10113088   Terfel, Bartoli- Mozart: Don, 10151459  The Flaming Sidebur,…)

其中 head(1) 里的數據“AVW13” 為異常數據。此時我們先用map命令,得到的結果為:

Array[Option[(Int, String)]] = Array(Some((1134999,06Crazy Life)), None, Some((10113088,Terfel, Bartoli- Mozart: Don)), …)

可以看到里面的元素為Option,且None 元素也被保留。

 

然后我們使用flatMap:

Array[(Int, String)] = Array((1134999,06Crazy Life), (10113088,Terfel, Bartoli- Mozart: Don), (10151459,The Flaming Sidebur),…)

可以看到 None 數據被過濾,且數據類型不再是 Option,而是元組。

由此說明,flatMap 在展開后,過濾掉了None,並且只展開一層(即Option[(Int, String)] )

Option 代表一個值可以不存在,有點像只有 1 或 0 的一個簡單集合,1 對應子類 Some,0 對應子類 None。在上述案例中,返回的要么是 Some((id.toInt, name.trim)) 要么是 None

 

artist_alias.txt 將拼寫錯誤的藝術家 ID 或非標准的藝術家 ID 映射為藝術家的正規名字。其中每行有兩個ID,用制表符分隔。

這里我們有必要把它轉成 Map 集合的形式,將“非標准的藝術家ID” 映射為“標准的藝術家ID”:

scala> val artistAlias = rawArtistAlias.flatMap{ line =>

     | val tokens = line.split('\t')

     | Some((tokens(0).toInt, tokens(1).toInt))

     | }

但是發現以上代碼報錯:java.lang.NumberFormatException: For input string: ""

說明有數據異常,它為空字符串

 

因此我們再修改一下代碼,若字符串為 “”,則返回None:

scala> val artistAlias = rawArtistAlias.flatMap{ line =>

      val tokens = line.split('\t')

      if (tokens(0).isEmpty) {

      None

      } else {

      Some((tokens(0).toInt, tokens(1).toInt))

     }}

artistAlias: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[11] at flatMap at <console>:25

scala> artistAlias.first

res1: (Int, Int) = (1092764,1000311)

 

在獲取到ID 對應的元組后,我們可以使用 collectAsMap 方法將元組轉為Map:val artistAlias = rawArtistAlias.flatMap{ line =>

val tokens = line.split('\t')

if (tokens(0).isEmpty) {

None

} else {

Some((tokens(0).toInt, tokens(1).toInt))

}}.collectAsMap()

artistAlias: scala.collection.Map[Int,Int] = Map(6803336 -> 1000010, 6663187 -> 1992, 2124273 -> 2814, 10412283 -> 1010353,…

 

從第一條記錄我們可以看到:

6803336 映射為 1000010

我們在artistByID里查詢這兩條記錄:

scala> artistByID.lookup(6803336).head

res9: String = Aerosmith (unplugged)

scala> artistByID.lookup(1000010).head

res10: String = Aerosmith

顯然,這條記錄將 Aerosmith (unplugged) 映射為 Aerosmith

 

7. 構建第一個模型

現在的數據集的形式完全符合Spark Mllib 的ALS 算法實現的要求,但我們還需額外做兩個轉換:

  1. 如果藝術家ID 存在一個不同的正規 ID,我們要用別名數據集對象將所有的藝術家ID 轉換成正規 ID
  2. 需要把數據轉成 Rating 對象,Rating 對象是ALS 算法實現對“用戶 – 產品 – 值” 的抽象

 

scala> import org.apache.spark.mllib.recommendation._

scala> val bArtistAlias = sc.broadcast(artistAlias)

bArtistAlias: org.apache.spark.broadcast.Broadcast[scala.collection.Map[Int,Int]] = Broadcast(6)

scala> val trainData = rawUserArtistData.map { line =>

     | val Array(userID, artistID, count) = line.split(' ').map(_.toInt)

     | val finalArtistID =

     | bArtistAlias.value.getOrElse(artistID, artistID)

     | Rating(userID, finalArtistID, count)

     | }.cache()

 

8. 廣播變量

val bArtistAlias = sc.broadcast(artistAlias)

這里用了一個廣播變量,將artistAlias 變量作為一個廣播變量。廣播變量的作用如下:

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.

Explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

廣播變量主要用於在迭代中一直需要被訪問的只讀變量。它將此變量緩存在每個executor 里,以減少集群網絡傳輸消耗

Spark 執行一個階段(stage)時,會為待執行函數建立閉包,也就是該階段所有任務所需信息的二進制形式。這個閉包包括驅動程序里函數引用的所有數據結構。Spark 把這個閉包發送到集群的每個executor 上。

當許多任務需要訪問同一個(不可變的)數據結構時,我們應該使用廣播變量。它對任務閉包的常規處理進行擴展,是我們能夠:

  1. 在每個 executor 上將數據緩存為原始的 Java 對象,這樣就不用為每個人物執行反序列化
  2. 在多個作業和階段之間緩存數據

 

在函數最后,我們調用了 cache() 以指示 Spark 在 RDD 計算好后將其暫時存儲在集群的內存里。這樣是有益的,因為 ALS 算法是 迭代的,通常情況下至少要訪問該數據 10 次以上。如果不調用 cache(),那么每次要用到 RDD 時都需要從原始數據中重新計算。

 

9. 構建模型

現在我們已有了訓練數據 Rating,格式如下:

 Rating(1000002,1,55.0)

分別對應用戶id,藝術家id,聽的次數

然后我們構建模型:

scala> val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)

這里除了訓練數據外,其他全是模型參數。具體參數我們之后再介紹,這里可以先了解這個 10 是指 10 個特征向量,也就是 k 的值。

在這個例子中,對於每個用戶和產品,模型都包含一個有 10 個值的特征向量。根據前面的介紹,我們將矩陣A 分為了兩個矩陣 X 與 Y 的乘積。所以最終模型用兩個不同的 RDD,它們分別表示“用戶 - 特征” 和 “產品 – 特偵” 這兩個大型矩陣。

 

我們再看看生產的模型里支持的操作:

scala> model.

predict   productFeatures   rank   recommendProducts   recommendProductsForUsers   recommendUsers   recommendUsersForProducts   save   userFeatures

從字面上我們可以基本判斷模型支持的操作。其中 productFeatures 和 userFeatures 便是對藝術家和用戶生成的特征。每條是一個包含10個元素的數組,也稱為一個特征向量。

scala> model.userFeatures.first

res13: (Int, Array[Double]) = (120,Array(-0.16259685158729553, 0.058804575353860855, -0.08195385336875916, 0.14299602806568146, -0.24055717885494232, 0.07681675255298615, -0.1902841031551361, -0.17557889223098755, 0.10770561546087265, 0.15148405730724335))

上面即為用戶120的特征向量

 

10. 檢查推薦結果

現在我們看看模型給出的推薦從直觀上來看是否合理:

1. 首先我們先獲取某個用戶聽過的藝術家:

val rawArtistsForUser = rawUserArtistData.map(_.split(' ')).

filter {   case Array(user,_,_) => user.toInt == 2093760}

2. 獲取藝術家id並去重

scala> val existingProducts = rawArtistsForUser.map{ case Array(_,artist,_) => artist.toInt}.collect().toSet

existingProducts: scala.collection.immutable.Set[Int] = Set(1255340, 942, 1180, 813, 378)

 

打印出藝術家名字:

scala> artistByID.filter { case (id, name) => existingProducts.contains(id)}.values.collect().foreach(println)

David Gray

Blackalicious

Jurassic 5

The Saw Doctors

Xzibit

 

然后我們使用模型給此用戶做5個推薦:

scala> val recommendations = model.recommendProducts(2093760, 5)

recommendations: Array[org.apache.spark.mllib.recommendation.Rating] = Array(

Rating(2093760,2814,0.031054236194155326), Rating(2093760,1001819,0.03031702049169676), Rating(2093760,930,0.029521346550942212), Rating(2093760,1300642,0.029004849813751586), Rating(2093760,4605,0.029003239498464842)

)

每個Rating 包括用戶id,藝術家id,以及一個評分值。值越大,推薦質量越好。

 

scala> val recommendedProductIDs = recommendations.map(_.product).toSet

scala> artistByID.filter { case(id, name) =>

     | recommendedProductIDs.contains(id)

     | }.values.collect.foreach(println)

50 Cent

Snoop Dogg

2Pac

Eminem

The Game

看得出這些推薦並不咋樣,雖然這些藝術家都比較受歡迎,而且是hippop之類的歌曲。但好像並沒有針對用戶的收聽習慣進行個性化

 

11. 評價推薦質量

再考慮如何評價推薦質量:

  1. 我們假定用戶會傾向於播放受人歡迎的藝術家的歌曲,而不會播放不受歡迎的藝術家的歌曲,這個假設是合理的。因此,用戶的播放數據在一定程度上表示了“優秀的”和“糟糕的”藝術家推薦
  2. 推薦引擎這類的評分系統評價的指標之一就是:好藝術家在推薦列表中應該靠前。問題是如何評價“好的藝術家”?
  3. 推薦引擎的作用在於向用戶推薦他從來沒有聽過的藝術家

計算AUC 來衡量模型,以及使用training data 和 test data

 

12. 選擇超參數

超參數並不是通過算法學習到的參數,而是由調用者指定的指定。

在 ALS.trainImplicit() 的參數包括以下幾個:

1. rank = 10

模型的潛在因素的個數,即“用戶 – 特征” 和 “產品 – 特征” 矩陣的列數;一般來說,它也是矩陣的階

2. Iteration = 5

矩陣分解迭代的次數;迭代的次數越多,花費的時間越長,但分解的結果可能會更好。

3. lambda = 0.01

標准的過擬合參數;值越大越不容易產生過擬合,但值太大會降低分解的准確度

4. alpha = 1.0

控制矩陣分解時,被觀察到的“用戶 – 產品” 交互相對沒被觀察到的交互的權重

 

第一次使用的超參數不一定是最優的。如何選擇好的超參數在機器學習中是個普遍性的問題。最基本的方法是嘗試不同值的組合並對每個組合評估某個指標,然后挑選指標值最好的組合。

如:

val evaluation =

for ( rank <- Array(10, 50);

              lambda <- Array(1.0, 0.0001)

              alpha <- Array(1.0, 40.0))

     yield {

              val model = ALS.trainImplicit(trainData, rank, 10, lambda, alpha)

              val auc = …

              ((rank, lambda, alpha), auc) }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM