概要:
隨着電子商務的高速發展和普及應用,個性化推薦的推薦系統已成為一個重要研究領域。
個性化推薦算法是推薦系統中最核心的技術,在很大程度上決定了電子商務推薦系統性能的優劣,決定着是否能夠推薦用戶真正感興趣的信息,而面對用戶的不斷提升的需求,推薦系統不僅需要正確的推薦,還要實時地根據用戶的行為進行分析並推薦最新的 結果。
實時推薦系統的任務就是為每個用戶,不斷地、精准地推送個性化的服務,甚至到達讓用戶體會到推薦系統比他們更了解自己的感覺。
本文主要研究的是基於模型的協同過濾算法—ALS以及實時推薦系統的可行性並詳細講解ALS(交替最小二乘法)的思想
然后在Spark Streaming框架上運用ALS算法進行測試,評估實時推薦中算法的可靠性
最后,在Spark Mllib和Streaming框架上構建了實時推薦引擎,將推薦數據保存在Hbase中,WebApp通過讀取Hbase中的推薦數據來向用戶展示推薦結果
關於其他類別的推薦算法就不細說了,網上有很多的資料查看,推薦幾篇文章:
IBM-探索推薦引擎內部的秘密系列
以及向亮的《推薦系統實踐》
下載地址
下面進入正文
基於矩陣分解的協同過濾算法–ALS:
基於模型的協同過濾推薦就是基於樣本的用戶喜好信息,訓練一個推薦模型,然后根據實時的用戶喜好的信息進行預測,計算推薦。
對於一個users-products-rating的評分數據集,ALS會建立一個user*product的m*n的矩陣(其中,m為users的數量,n為products的數量),如下圖:
這個矩陣的每一行代表一個用戶 (u1,u2,…,u9)、每一列代表一個產品 (v1,v2,…,v9)。用戶隔天產品的打分在 1-9 之間。
但是在這個數據集中,並不是每個用戶都對每個產品進行過評分,所以這個矩陣往往是稀疏的,用戶i對產品j的評分往往是空的
ALS所做的事情就是將這個稀疏矩陣通過一定的規律填滿,這樣就可以從矩陣中得到任意一個user對任意一個product的評分,ALS填充的評分項也稱為用戶i對產品j的預測得分
所以說,ALS算法的核心就是通過什么樣子的規律來填滿(預測)這個稀疏矩陣
它是這么做的:
假設m*n的評分矩陣R,可以被近似分解成U*(V)T
U為m*d的用戶特征向量矩陣
V為n*d的產品特征向量矩陣((V)T代表V的轉置)
d為user/product的特征值的數量
關於d這個值的理解,大概可以是這樣的:
對於每個產品,可以從d個角度進行評價,以電影為例,可以從主演,導演,特效,劇情4個角度來評價一部電影,那么d就等於4
可以認為,每部電影在這4個角度上都有一個固定的基准評分值
例如《末日崩塌》這部電影是一個產品,它的特征向量是由d個特征值組成的
d=4,有4個特征值,分別是主演,導演,特效,劇情
每個特征值的基准評分值分別為(滿分為1.0):
主演:0.9
導演:0.7
特效:0.8
劇情:0.6
矩陣V由n個product*d個特征值組成
對於矩陣U,假設對於任意的用戶A,該用戶對一部電影的綜合評分和電影的特征值存在一定的線性關系,即電影的綜合評分=(a1*d1+a2*d2+a3*d3+a4*d4)
其中a1-4為用戶A的特征值,d1-4為之前所說的電影的特征值
那么對於之前ALS算法的這個假設
m*n的評分矩陣R,可以被近似分解成U*(V)T
就是成立的,某個用戶對某個產品的評分可以通過矩陣U某行和矩陣V(轉置)的某列相乘得到
那么現在的問題是,如何確定用戶和產品的特征值?(之前僅僅是舉例子,實際中這兩個都是未知的變量)
采用的是交替的最小二乘法
在上面的公式中,a表示評分數據集中用戶i對產品j的真實評分,另外一部分表示用戶i的特征向量(轉置)*產品j的特征向量(這里可以得到預測的i對j的評分)
用真實評分減去預測評分然后求平方,對下一個用戶,下一個產品進行相同的計算,將所有結果累加起來(其中,數據集構成的矩陣是存在大量的空打分,並沒有實際的評分,解決的方法是就只看對已知打分的項)
但是這里之前問題還是存在,就是用戶和產品的特征向量都是未知的,這個式子存在兩個未知變量
解決的辦法是交替的最小二乘法
首先對於上面的公式,以下面的形式顯示:
為了防止過度擬合,加上正則化參數
首先用一個小於1的隨機數初始化V
根據公式(4)求U
此時就可以得到初始的UV矩陣了,計算上面說過的差平方和
根據計算得到的U和公式(5),重新計算並覆蓋V,計算差平方和
反復進行以上兩步的計算,直到差平方和小於一個預設的數,或者迭代次數滿足要求則停止
取得最新的UV矩陣
則原本的稀疏矩陣R就可以用R=U(V)T來表示了
ALS算法的核心就是將稀疏評分矩陣分解為用戶特征向量矩陣和產品特征向量矩陣的乘積
交替使用最小二乘法逐步計算用戶/產品特征向量,使得差平方和最小
通過用戶/產品特征向量的矩陣來預測某個用戶對某個產品的評分
算法原理講述完畢,接下來進行算法測試
算法測試:
算法測試分為兩部分:
一、測試最佳的參數,如:隱性因子個數,正則式等
二、測試在Streaming框架上算法的可用性
測試數據集來自MovieLens
測試一:
將整個數據集上傳至HDFS中
在spark程序中讀取ratings.dat文件,並隨機划出80%作為訓練數據集,20%作為測試數據集
設置隱性因子、正則式參數列表(由於物理機配置不好,集群能夠支持的最大迭代次數只有7次,在多就會內存溢出,所以這里直接將迭代次數設置為7)
對參數列表的全排列分別進行模型訓練,並計算MSE、RMSE
結果如下圖:
比較得出最佳的參數組合,以后的模型訓練參數都使用這個參數組合
測試二:
將原本的數據划分為三部分
trainingData-10k
testData-10k
剩下的為streamData,作為流數據實時發送
首先將trainingData、testData上傳到HDFS/data目錄下
在spark程序中讀取,並轉化為RDD[Rating]類型
使用Streaming框架接受流數據,並進行在線模型訓練
每訓練一次就計算一次MSE和RMSE
對比模型的精准性有沒有提高
使用Scala讀取本地的streamData,通過Socket發送到spark程序中
結果如下圖:
隨着數據的不斷增加,模型的精准度在不斷的提高,所以實時的更新推薦模型是可行的
推薦系統整合:
整體流程圖:
首先用程序生成用戶和圖書數據,並隨機模擬用戶行為數據,保存在Hbase中
在Hbase數據庫中包含了用戶表(4000個用戶),圖書表(5060本圖書)以及評分表(用戶對圖書的百萬條數據)
由於對個人來說無法得到真實的商業性數據,故評分數據都是程序 模擬隨機生成的,包括實時發送的流數據,所以這可能會對整個系統的推薦結果帶來影響
另外,除了WebUI部分,其余的程序都是運行在Linux的Spark集群上
原始數據通過一個程序不斷地向Hbase的評分表中寫入數據
模擬用戶在網站上的評分行為
運行截圖:
其中,前300個用戶的行為偏向於前600本圖書(計算機相關)
實時流數據將通過另外一個程序發送Socket數據,模擬用戶當前在網站上的實時評分行為
在最后使用用戶進行觀察測試時,程序將會只模擬這個用戶的評分行為以便觀察推薦系統的實時性
首先推薦引擎會讀取Hbase中的評分數據
並使用算法測試時得到的最佳參數組合來對其進行訓練
得到初始的模型
使用這個模型對Hbase中所有用戶進行圖書推薦(取 top10)
並將推薦結果保存在Hbase中
以上階段為系統初始化階段
運行截圖:
在系統初始化完成之后,開啟實時推薦引擎
接收不斷生成的用戶行為數據,並和Hbase中的原始數據混合,訓練出新的模型,產生推薦結果保存
不斷地進行流數據的讀取、訓練和保存推薦結果,直至系統關閉或者無流數據產生
推薦引擎運行如下圖:
WebUI部分:
WebUI是由ASP.NET開發的一個簡單的B/S應用,通過Thrift和Linux中的Hbase交互
選擇使用一個用戶觀察系統的實時推薦性,此時流數據模擬程序只產生這個用戶的評分行為
不同時刻,在該用戶有行為數據產生的情況下,推薦的內容(細節沒有仔細處理,比如有的圖片找不到路徑等。。。):
當前記錄
新的行為數據產生的記錄
總結:
前前后后大概花了兩個禮拜多一點的時間(畢竟還要顧着上課,基本也就是晚上才有時間)
其中遇到了許多坑,上網找過,請人問過,也上過知乎啥的讓大牛指導過
總之一句話,沒有真正動手做過是不會知道其中的艱苦,當然我早就變態的把它當樂趣來看了
原本的設想是使用聯合聚類+ALS矩陣分解來做的,但是試了一下,聯合聚類貌似不想k-means啥的那么簡單,以自己的水平來說暫時無法實現(還是要怪自己基礎不好咯~),遂放棄之~
之后又有一個美好的想法,通過ItemCF、UserCF、關聯規則、ALS等算法組合起來,形成一個混合的模型,畢竟這種模式才是比較接近商業化的構架,但是在Spark上面調用Mahout算法的時候又出現了各種各樣的問題,有時候甚至編譯都不通過。。。
在推薦算法性能測試的時候,自己實現了召回率,准確率,覆蓋率,多樣性,新穎度等指標的計算方式,但是實際測試時總是飆出莫名其妙的數據。。。
另外,使用ALS進行實時訓練模型的時候,每次都要重新訓練,感覺這是一個優化點,可否修改成接受到新數據之后不重復訓練,只計算新來的數據(水平有限,暫時只是想法)
期末考又臨近了,只好先放下這些不成器的東西以后再研究
最后的最后,無奈之下只能實現了一個最簡單的推薦系統
最后附上所有源代碼和簡要記錄的開發日志
源代碼已打包上傳:
下載地址
(代碼有些凌亂,沒來得及重構,僅僅做了基本的注釋,有需要的童鞋不要介意。。。)
開發日志:
6-9:准備book數據到hbase中。上傳到hdfs中文亂碼(docker中),讀取hdfs數據到hbase中出異常(原因:數據格式不對,內容太多超出一行,仔細看日志;scala輸入hbase異常)
6-10:完成t_users,t_books,t_ratings的數據導入
6-12:scala操作scan hbase表
坑位:
1:resultScaner不能直接for循環
2:spark上操作hbase
第一次簡單測試(按照之前的過程)
offset (0) + length (4) exceed the capacity of the array: 2 使用String
3:Streaming接收socket數據測試
4:Streaming執行內容測試
6-13:實時推薦測試
問題記錄:不能同時運行兩個sparkcontext
解決:使用sparkContext來創建StreamingContext
Streaming的處理方式
socketTextFile無法接受數據—logger缺少換行符
foreachRDD理解
完成實時更新模型
6-14:namenode經常莫名掛掉,重新配置虛擬機
ubuntu下hostname默認為ubuntu所以一直無法正確啟動–修改/etc/hostname 重啟
6-15:SparkStreaming實時讀取更新模型老是拋異常
解決:allData.cache(沒有緩存的話之前的流數據丟失無法找到)
Unable to reconnect to ZooKeeper service, session 0x14df6b4bcdb0009 has expired, closing socket connection/
Socket connection established to localhost/127.0.0.1:2181, initiating sessio
解決:在代碼中設置hbase的zk,配置文件中無效
6-16:解決15鈤的問題
allData.repartition(3).cache
更新模型時連接到zk異常
WARN [sparkDriver-akka.actor.default-dispatcher-46] storage.BlockManagerMasterActor (Logging.scala:logWarning(71)) - Removing BlockManager BlockManagerId(4, cloud1, 56133) with no recent heart beats: 125833ms exceeds 120000ms
原因:由於網絡差或者數據量太大,worker節點在一定的時間內(默認45s)沒有給master信號,master以為它掛了。
解決辦法:修改運行命令或者sprak-env.sh,添加參數 -Dspark.storage.blockManagerHeartBeatMs=6000000(以ms為單位,即6分鍾)。
修改:在此配置中無效,要在代碼中通過SparkConf設置
Spark1.4中直接通過spark.network.timeout一個配置全部
6-17:完成基礎推薦引擎搭建和測試;c#連接hbase環境搭建
6-18:spark批量寫hbase性能優化
myTable.setAutoFlush(false, false)//關鍵點1
myTable.setWriteBufferSize(3*1024*1024)//關鍵點2
myTable.flushCommits()//關鍵點3
關鍵點1_:將自動提交關閉,如果不關閉,每寫一條數據都會進行提交,是導入數據較慢的做主要因素。
關鍵點2:設置緩存大小,當緩存大於設置值時,hbase會自動提交。此處可自己嘗試大小,一般對大數據量,設置為5M即可,本文設置為3M。
關鍵點3:每一個分片結束后都進行flushCommits(),如果不執行,當hbase最后緩存小於上面設定值時,不會進行提交,導致數據丟失。
注:此外如果想提高Spark寫數據如Hbase速度,可以增加Spark可用核數量。
修改:實際測試中,以上優化並沒有起作用,反而會使一下數據丟失,沒有繼續深入測試
完成webapp的基本搭建
6-23:完成算法測試部分,評測指標RMSE,MSE,==》(平均值,取不同的n推薦列表畫曲線)召回率,准確率,覆蓋率,多樣性,新穎度
使用spark1.4 的新api來推薦物品提升效率
在spark-env和default里面的配置無效,在代碼中配置
System.setProperty(“spark.akka.frameSize”, “2000”)
6-24:系統原型完成
6-25:完善系統原型
6-26:論文初稿
參考資料:
Spark 下操作 HBase(1.0.0 新 API)
【C#】通過Thrift操作HBase系列(1)
ALS 在 Spark MLlib 中的實現
基於矩陣分解的協同過濾算法