1.實時推薦系統與相關工作
1.1 原因
實時計算能夠及時捕獲用戶短時興趣,同時能夠快速反饋分發當前系統的用戶興趣內容。大量實踐以及發表的文章都顯示了推薦系統實時化,對推薦精准度的提升的有效性和必要性。
1.2 騰訊架構與實現
實時推薦相關工作非常多,騰訊和北大合作的兩篇SIGMOD文章是比較實際和詳細的實現,采用的計算框架能夠支持大規模數據的實時推薦,以下將會分開簡述以下兩篇文章。
2015年
Huang發表了基於Storm和KV存儲的大規模實時推薦系統 (TencentRec: Real-time Stream Recommendation in Practice)
- 實現了一系列經典推薦算法的實時版本
- 實現了數種實時算法提高推薦精度
- 廣泛應用於業務有效提高
騰訊采用使用storm原因,支持實時數據流式計算,良好的可擴展性、可容錯性,采用簡單編程模型。
文章核心包括實時增量計算的ItemCF,以及用戶隱式反饋計算、實時剪枝算法、基於用戶畫像的數據稀疏性策略。應用在多個業務上都有不同程度的提升,最明顯的是騰訊視頻的全局表現提升高達30%。
全文核心應該是下圖六道公式,闡述騰訊如何具體實現的增量itemcf。
文章中的co-rating,其實就是我們常說的user bias. 公式3和4解決了用戶隱式反饋問題,細節的計算可以參考2016的文章,實際是一個log函數融合了用戶的瀏覽、點擊、分享、購買等行為,轉化成rating.

請注意公式4,由於他們定義了corating,實際是將相似度的增量計算從L2范數的計算轉化成了L1范數計算.(當Rup取x的時候,y=1/x)。
可擴展的增量計算


2016年
騰訊視頻的推薦應用(Real-time Video Recommendation Exploration)
- 實時處理、大規模數據下的准確率和可擴展性。
- 開發了一個基於矩陣分解的大規模在線協同過濾算法,以及一系列的自適應更新策略。
- 通過增加包括視頻類別、時間因素影響、用戶畫像剪枝以及訓練等方法,提高實時TopN推薦的精度。
在我們看來,全文核心在於實時計算的數據流轉,如下圖所示:

基於storm的實時計算topology圖:

2. 糖豆的設計與實現
2.1 架構
糖豆整體推薦框架,從離線,近線,在線三套計算流程組合而成。在線流程基於Spark Streaming框架實現,部署在近線集群。 在線推薦框架實時根據用戶行為,生成實時推薦列表,從而滿足用戶瞬時興趣,提高推薦系統的推薦新鮮度。簡單架構圖如下:

2.2 基於Spark Streaming的實現
2.2.1. 計算流程
實時計算流程如下圖所示:

分解步驟:
- Spark Streaming 讀取Kafka,原始日志ETL
- 提取用戶隱式反饋,生成候選集tuple (uid,vid)
- 每天凌晨會將離線計算好的ItemCF模型結果集導入Redis。itemcf數據結構是一個similarity vid list。
- 實時維護看過視頻set,對看過視頻的處理候選集tuple過濾該用戶看過的視頻
- 實時更新推薦過視頻set,候選集tuple過濾當天已經被推薦過的視頻
- 候選集寫入Redis推薦list
python實現:
if __name__ == "__main__": print sys.argv reload(sys) sys.setdefaultencoding('utf-8') sc = SparkContext(appName="real_time_etl") #20秒 ssc = StreamingContext(sc, 15) brokers = "kafka-servers:9092" topic = "logstash" #讀取kafka kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) #解析日志、過濾無關數據、讀取相似視頻 lines = kvs.map(lambda x : readJson(x[1])).filter(lambda x: x is not None).map(lambda x: getTopkfromRedis(x)) #lines.pprint() #寫入推薦結果 lines.foreachRDD(lambda rdd: list2Redis(rdd)) ssc.start() ssc.awaitTermination()
2.2.2 監控
部署在集群Master節點的監控腳本會每30s掃描一次實時計算代碼進程,如果發現進程被failed,會自動拉起實時計算Spark Steaming進程。如果進程拉起失敗會觸發郵件、短信報警
#! /bin/sh MOBILE="your phone numbers" RT_HOME=/home/realtime/recommend.py DIR=/data/rtdamon PID_FILE=$DIR/.run/rt-litetl-damon.pid LOG_FILE=$DIR/.log/rt-litetl-damon.log t=$(date -d "today" +"%Y-%m-%d %H:%M:%S") source /etc/profile echo $PID_FILE $LOG_FILE if [ -e "$PID_FILE" ]; then pid=`cat $PID_FILE` echo $pid damon_process_exists=`ps v -p $pid | grep "rt-litetl-damon.sh" | grep -v grep|grep -v \<defunct\> ` echo "damon process exists : $process_exists" if [ -n "$damon_process_exists" ] then echo "Process rt-litetl-damon.sh is running! $t" >> $LOG_FILE exit fi fi pid=$$ echo "$pid" > $PID_FILE while : do process_exists=`ps -ef|grep "$RT_HOME"|grep "spark"|grep -v grep|wc -l` echo "process exists : $process_exists" >>$LOG_FILE if [ "$process_exists" == "0" ]; then /hadoop/spark/bin/spark-submit --master yarn --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 --py-files /hadoop/user/rt/redis.zip --num-executors 10 --executor-cores 7 --executor-memory 6g /home/realtime/recommend.py>>/data/rtlog/rtrecommed.log 2>&1 & /usr/bin/php -f /data/rtdamon/yunsms.class.php "$MOBILE" "recommend.py" echo "realtime recommendation process already restarted at $t" >> $LOG_FILE fi #sleep `expr 3600 \* 3` sleep `expr 60 \* 1` done
2.3 收益
根據我們的AB測試數據來看,整體CTR提升25%。用推薦系統的A版對比無推薦的B版,用戶觀看時長提升47%。

3. 問題與改進
- 較多代碼邏輯集中在Redis。目前Redis無災備措施,同時IO和負載也會出現Peak。
- Spark Streaming 目前實時級別在分鍾級。需要升級成storm的秒、毫秒級別。
- 需要用戶點擊等行為才會生產數據,容易召回不足。