這個項目的整體業務邏輯是通過Spring進行搭建,並部署在Tomcat上的。業務產生的數據一部分被存儲到mongoDB並用於spark sql和ml的離線計算。另一部分被傳送到Flume,經kafka到達spark streaming進行實時計算。還有一部分數據存儲到redis,同樣運用到spark streaming上。本文主要關注spark相關的部分。項目的原始實現主要基於RDD,而且有不少低效的代碼實現。本文在此基礎上對80%的spark相關代碼進行了重寫,使新的實現在運行效率上提高了兩倍以上、內存使用減少了幾倍、代碼量也減少近一半。
項目架構
綜合業務:Spring、Tomcat
數據存儲:業務數據MongoDB、搜索服務數據ES、緩存數據Redis
離線和實施推薦:Spark DF、ML、Streaming
消息服務:Kafka
-
【數據加載】
數據加載服務,主要用於項目的數據初始化,用於將三個數據集(Movies【電影的數據集】、Rating【用戶對於電影的評分】、Tags【用戶對於電影的標簽】)初始化到Mongodb數據庫以及ElasticSearch里面。
-
【離線推薦】
- 通過Azkaban周期性的調度【離線統計服務】和【離線推薦服務】。
- 【離線統計服務】
- 最熱電影統計算法 => RateMoreMovies表中
- 當前最熱電影統計算法 => RateMoreRecentlyMovies 表中
- 電影的平均評分算法 => AvgMovies 表中
- 電影類別TOP10推薦 => GenresTopMovies表中。
- 【離線推薦服務】
- 通過Spark ALS算法計算模型的訓練
- 基於Model產生電影相似度矩陣 => MovieRecs表中
- 基於Model產生用戶推薦矩陣 => UserRecs表中
-
【實時推薦】
- 當一個用戶完成了對電影的評分之后,觸發實時推薦,后台服務將評分數據實時寫入到LOG日志中。
- Kafka會通過kafkaStream程序將log隊列中的數據進行格式化,然后將數據推送到recommender的隊列中
- Spark Streaming實時推薦程序讀取kafka中推送過來的數據,配合讀取Redis緩存中的數據,運行實時推薦算法【基於模型的推薦】,把結果數據寫入到MongoDB的StraemRecs表中
-
【推薦結果的聚合】
- 綜合業務服務會根據一定的比例聚合 【離線推薦服務(ALS的協同過濾)】、【基於內容的推薦(基於ElasticSearch More like this功能)】、【實時推薦服務(基於模型的推薦)】這些結果。
- 聚合完成之后,將結果返回到用戶端。
前期工作:數據加載
利用Spark SQL將數據分別導入到 MongoDB 和 ElasticSearch
目標
-
MongoDB
- 將Movie【電影數據集】/ Rating【用戶對電影的評分數據集】/ Tag【用戶對電影的標簽數據集】數據集分別加載到MongoDB數據庫中的Movie / Rating / Tag表中
-
ElasticSearch
- 需要將Movie【電影數據集】加載到ElasticSearch名叫Movie的Index中。需要將Tag數據和Movie數據融合。
步驟
- 先新建一個Maven項目,將依賴添加好。
- 分析數據集Movie、Rating、Tag
- Spark DF加載數據集
- 將DF加載到MongoDB中:
- 將原來的Collection全部刪除
- 通過DF的write方法將數據寫入
- 創建數據庫索引
- 關閉MongoDB連接
- 將DF加載到ElasticSearch中:
- 將存在的Index刪除掉,然后創建新的Index
- 通過DF的write方法將數據寫入
- 關閉Spark集群
數據格式說明
// MONGO_MOVIE_COLLECTION,包含電影ID、描述、時長、發行日期、拍攝日期、語言、類型、演員、導演
case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String, shoot: String, language: String, genres: String, actors: String, directors: String)
// MONGO_RATING_COLLECTION,包含用戶ID、電影ID、用戶對電影的評分、用戶評分時間
case class MovieRating(uid: Int, mid: Int, score: Double, timestamp: Int)
// MONGO_TAG_COLLECTION,包含用戶ID、電影ID、標簽的具體內容、用戶打標簽時間
case class Tag(uid: Int, mid: Int, tag: String, timestamp: Int)
加載數據到MongoDB
// 由於上面三種數據的加載處理步驟一樣,這里就只展示其中之一的MovieRating。
// 先構建schema,盡管spark有推斷功能,但是為了穩定,還是自己定義schema比較好。
val ratingSchema = StructType(Array(
StructField("uid", IntegerType, true),
StructField("mid", IntegerType, true),
StructField("score", DoubleType, true),
StructField("timestamp", IntegerType, true)
))
// 讀取數據
val ratingDF = spark.read
.format("csv")
.schema(ratingSchema)
.load(RATING_DATA_PATH)
// 新建一個到MongoDB的連接
val mongoClient = MongoClient(MongoClientURI(mongodbConfig.uri))
// 如果MongoDB中有對應的數據庫,那么應該刪除
mongoClient(mongodbConfig.db)(MONGO_RATING_COLLECTION).dropCollection()
// 寫入數據
ratingDF
.write
.option("uri", mongodbConfig.uri)
.option("collection", MONGO_RATING_COLLECTION)
.mode("overwrite")
.format(MONGO_DRIVER_CLASS)
.save()
// 對數據表建索引
mongoClient(mongodbConfig.db)(MONGO_RATING_COLLECTION).createIndex(MongoDBObject("uid" -> 1))
// 關閉MongoDB連接
mongoClient.close()
將數據寫入ES
這里先講Tag數據進行轉換,並與movie表合並,最后將這個表加載到ES。
// 對tag數據進行處理,使之變為下面格式
/**
* |MID | Tags |
* |1 | tag1|tag2|tag3|tag4....|
*/
val newTag = tagDF.groupBy($"mid")
.agg(concat_ws("|", collect_set($"tag")).as("tags"))
.select("mid", "tags")
// 需要將處理后的Tag數據,和Moive數據融合,產生新的Movie數據
val joinExpression = movieDF.col("mid") === newTag.col("mid")
val movieWithTagsDF = movieDF.join(newTag, joinExpression, "left")
// 需要將新的Movie數據保存到ES中
//新建一個配置
val settings = Settings.builder()
.put("cluster.name", eSConfig.clustername)
.build()
//新建一個ES的客戶端
val esClient = new PreBuiltTransportClient(settings)
//需要將 TransportHosts 添加到 esClient 中
val REGEX_HOST_PORT = "(.+):(\\d+)".r
eSConfig.transportHosts.split(",")
.foreach {
// 通過正則表達提取
case REGEX_HOST_PORT(host: String, port: String) =>
// 將所有的 es節點 的地址都加入到 esClient 中
esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port.toInt))
}
//需要清除掉ES中遺留的數據
if (esClient.admin().indices().exists(
new IndicesExistsRequest(eSConfig.index)).actionGet().isExists) {
esClient.admin().indices().delete(new DeleteIndexRequest(eSConfig.index))
}
esClient.admin().indices().create(new CreateIndexRequest(eSConfig.index))
//將數據寫入到ES中
movieDF
.write
.option("es.nodes", eSConfig.httpHosts)
.option("es.http.timeout", "100m")
.option("es.mapping.id", "mid") // es地址中取得的id值會被賦值到 _id ,這和 mid 不一樣,所以用 mapping 做轉換
.mode("overwrite")
.format(ES_DRIVER_CLASS)
.save(eSConfig.index + "/" + ES_TYPE)
離線推薦
統計推薦
目標
-
優質電影
獲取所有歷史數據中評分最多的電影的集合,統計每個電影的評分數 => RateMoreMovies
-
熱門電影
認為按照月來統計,這個月中評分數量最多的電影我們認為是熱門電影,統計每個月中的每個電影的評分數量 => RateMoreRecentlyMovie
-
統計電影的平均評分
將Rating數據集中所有的用戶評分數據進行平均,計算每一個電影的平均評分 => AverageMovies
-
統計每種類別電影的TOP10電影
將每種類別的電影中評分最高的10個電影分別計算出來, => GenresTopMovies
步驟
-
新建一個項目StaticticsRecommender
-
統計所有歷史數據中電影的評分個數
通過Rating數據集,用mid進行groupby操作,count計算總數
-
統計以月為單位的電影的評分個數
- 利用from_unixtime對timestamp進行轉換,結果格式為yyyyMM
- 通過groupby 年月,mid 來完成統計
-
統計每個電影評分得分
通過Rating數據集,用戶mid進行group by操作,avg計算評分 -
統計每種類別中評分最高的10個電影
- 需要通過JOIN操作將電影的平均評分數據和Movie數據集進行合並,產生MovieWithScore數據集
- 利用split和explode將MovieWithScore拆開為以一種Genre為開頭的MovieWithScore
- 此處有兩種方案(這里展示第一種,但最好的是建立Aggregator,在ALS離線推薦中介紹):
- 通過Genre作為Key,進行groupByKey操作,將相同電影類別的電影進行聚集並按評分取TOP10。(TopN利用PriorityQueue實現)
- 利用repartitionAndSortWithinPartitions、mapPartition和groupSorted(輪子)實現分Genre的TOP10。
- 上面數據轉化為DF后輸出到MongoDB中
離線計算由原來的RDD改為基於Spark DataFrame的實現,而DF本身比RDD具有更多的默認優化,比如Tungsten運用了堆外內存,減少GC開銷、序列化比RDD的Java默認序列化更加高效、全代碼生成等優勢。上述是效率提升是理論上的,下面是實際上的。
首先,DF相比於RDD能這大大地減少了代碼量。下面的功能就是簡單地統計每部電影的評分數、以月為單位統計每部電影的評分數、每部電影平均分、不同類型電影的評分top10(即分組topn)。這里需要連接電影數據集和評分集,前者包含了電影類型,后者包含平均分。原代碼使用RDD的groupByKey,這是一種低效的操作,因為它不會進行map-side聚合,而且分組后使用map,直接調用sortWith對每個分組進行排序,再用slice進行取top10。這種實現並不好,在Scala中運用sortWith雖然方便,但是效率很低,它比toArray.sort慢不少。slice也是,用take更高效。但實際上這里RDD的實現也不需要用全排序再取前10。利用aggregateByKey加優先隊列,即堆排的實現,效率更高。其用時由原來的20多秒變為6秒左右。當然,在DF中可用windowfunction來實現topn,但是windowfunction的底層實現是先shuffle然后sort,最后根據所定義的窗口進行相應的聚合。這一過程同樣沒有map-side聚合,所以盡管數據量少時速度更快,但在數據量大時或許不是一個好的topn實現。其實我覺得最好的topn實現應該是自定義一個aggregator,這個在離線推薦部分會提到。
// 統計所有歷史數據中電影的評分個數
val rateMoreMoviesDF = ratingDF
.groupBy($"mid")
.agg(count($"mid"))
// 統計以月為單位的電影評分個數
val dateFormat = "yyyyMM"
val rateMoreRecentLyMovies = ratingDF
.withColumn("yearmouth", from_unixtime($"timestamp", dateFormat))
.groupBy($"yearmouth", $"mid")
.agg(count($"mid") as "count")
.select($"mid", $"count", $"yearmouth")
// 統計每個電影的平均得分
val averageMoviesDF = ratingDF
.select($"mid", $"score")
.groupBy($"mid")
.agg(avg($"score").as("avg"))
// 計算不同類別電影的Top10
val joinExpression = movieDF.col("mid") === averageMoviesDF.col("mid")
// 用 inner join 將沒有評分的電影會除去
val movieWithScoreRDD = movieDF
.join(averageMoviesDF, joinExpression, "inner")
.withColumn("splitted_genres", split($"genres", "\\|"))
.withColumn("single_genre", explode($"splitted_genres"))
.map(row => {
(row.getAs[String]("single_genre"), (row.getAs[Int]("mid"), row.getAs[Double]("avg")))
})
.rdd
val ord = new Ordering[(Int, Double)] {
override def compare(x: (Int, Double), y: (Int, Double)): Int = {
val compare2 = x._2.compareTo(y._2)
if (compare2 != 0) -compare2 else 0
}
}
// 利用aggregateByKey和PriorityQueue實現map-side aggregation
val res = movieWithScoreRDD
.aggregateByKey(new mutable.PriorityQueue[(Int, Double)]()(ord))((acc, v) => {
if (acc.length < 10) {
acc.enqueue(v)
acc
} else {
acc += ord.min(acc.dequeue(), v)
}
},
(acc1, acc2) => {
val acc0 = acc1 ++ acc2
while (acc0.length > 10) {
acc0.dequeue()
}
acc0
}).toDF()
ALS離線推薦
目標
- 訓練ALS推薦模型並計算出屬於用戶各自的Top20電影推薦
- 計算電影相似度矩陣
步驟
-
訓練ALS推薦模型
- 實例化ALS模型,按常規的模型訓練流程訓練模型后調用recommendForAllUsers完成推薦。
-
計算電影相似度矩陣
- 獲取電影的特征矩陣,轉換成DoubleMatrix
- 電影的特征矩陣之間做笛卡爾積,通過余弦相似度計算兩個電影的相似度
- 將數據通過GroupBy處理后,輸出
-
ALS模型的參數選擇
- 通過計算ALS的均方根誤差來判斷參數的優劣程度
ALS模型需要三列,用戶id、電影id和評分,並從中計算出用戶矩陣和電影矩陣。項目需要給每個用戶推薦20部電影,此處我直接使用了2.2提供的recommendForAllUsers函數便直接得到了結果。但后續項目還要求求出電影的相似度矩陣,用於后續實時推薦計算。這個算法spark沒有現成的實現,但是思路根recommendForAllUsers是一樣的。就是先求電影矩陣自身的cross join,然后求每部電影對除其自身外的其他所有電影的余弦相似度。公式為兩個向量的點積除以兩個向量長度的乘積。然后取出每部電影中,余弦相似度前100的電影,這樣子構建出電影的相似度矩陣。我根據spark的recommendForAll的源碼,重新實現了這一功能,這比原代碼的實現快上1倍,而且cross join產生的中間數據的內存也減少了幾倍,由幾G到了幾十M。實際上這里也得出了一個cross join的優化方式.....然后就是取每部電影的前100相似的電影。這里同樣是分組TopN,但源碼使用了私有的TopNAggregator。同樣我也實現了一個,這是我覺得在數據量大時最好的TopN實現。首先它基於DS,也享受了DF的部分優化(比如Catalyst、序列化等)這是優於rdd的aggregateByKey的部分,然后,它包含了map-side聚合,減少了shuffle期間需要傳輸的數據,這是優於windowfunc的部分。
// 訓練模型的一般步驟
val alsModel = new ALS()
val paramGrid = new ParamGridBuilder()
.addGrid(alsModel.rank, Seq(5))
.addGrid(alsModel.regParam, Seq(1.0))
.build()
val regEval = new RegressionEvaluator()
.setLabelCol("score")
.setPredictionCol("prediction")
.setMetricName("rmse")
val tvs = new TrainValidationSplit()
.setTrainRatio(0.5)
.setEstimatorParamMaps(paramGrid)
.setEstimator(alsModel)
.setEvaluator(regEval)
val trainedModel = tvs.fit(trainData)
val bestModel = trainedModel.bestModel.asInstanceOf[ALS]
val fittedALS = bestModel.fit(trainData)
// 2.2.0 新增方法,直接可以對訓練集中的 user 或 item 進行推薦
val perUserPredictions = fittedALS.recommendForAllUsers(20)
// 計算電影相似度矩陣,Spark2.2只有根據用戶推薦電影,或者反過來,並沒有更具電影推薦電影的,但是思路其實是一樣的,這里根據spark源碼中recommendForAllUsers的原理實現電影相似度的計算。
// 提取item因子
val movieFeatures = fittedALS.itemFactors
// cross join優化的預處理
val ready2Crossjoin = movieFeatures.as[(Int, Array[Float])]
.mapPartitions(_.grouped(4096))
val ratings = ready2Crossjoin
.crossJoin(ready2Crossjoin)
.as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])]
.flatMap {
case (mf1Iter, mf2Iter) =>
val m1 = mf1Iter.size
val m2 = math.min(mf2Iter.size, 100)
var i = 0
val output = new Array[(Int, Int, Float)](m1 * m2)
val pq = mutable.PriorityQueue[(Int, Float)]()(Ordering.by(-_._2))
val vectorOp = new F2jBLAS
mf1Iter.foreach { case (m1Id, mf1Factor) =>
mf2Iter.foreach { case (m2Id, mf2Factor) =>
if (m1Id == m2Id) {
// do nothing
} else {
val simScore = consinSim(ALSRank, vectorOp, mf1Factor, mf2Factor)
if (pq.length < m2) {
pq.enqueue((m2Id, simScore))
} else {
val temp = pq.dequeue()
pq += (if (temp._2 > simScore) temp else (m2Id, simScore))
}
}
}
pq.foreach { case (mf2Id, score) =>
output(i) = (m1Id, mf2Id, score)
i += 1
}
pq.clear()
}
output.toSeq
}
// TopNAggregator在下面進行介紹。
val topNAggregator = new TopNAggregator[Int, Int, Float](10, Ordering.by(-_._2))
val rowRes = ratings.as[(Int, Int, Float)]
.groupByKey(_._1)
.agg(topNAggregator.toColumn)
.toDF("moviceID", "similar_movieID")
// 用於轉換數據格式,實際上是將“_2”中的FloatType轉換為DoubleType
val arrayType = ArrayType(
new StructType()
.add("_1", IntegerType)
.add("_2", DoubleType)
)
// 得出結果
val res = rowRes.select($"moviceID", $"similar_movieID".cast(arrayType))
TopNAggregator的實現
class TopNAggregator[K1: TypeTag, K2: TypeTag, V: TypeTag](num: Int, ord: Ordering[(K2, V)])
extends Aggregator[(K1, K2, V), mutable.PriorityQueue[(K2, V)], Array[(K2, V)]] {
override def zero: mutable.PriorityQueue[(K2, V)] = new mutable.PriorityQueue[(K2, V)]()(ord)
override def reduce(q: mutable.PriorityQueue[(K2, V)],
a: (K1, K2, V)): mutable.PriorityQueue[(K2, V)] = {
if (q.size < num) {
q += ((a._2, a._3))
} else {
q += ord.min((a._2, a._3), q.dequeue)
}
}
override def merge(q1: mutable.PriorityQueue[(K2, V)],
q2: mutable.PriorityQueue[(K2, V)]): mutable.PriorityQueue[(K2, V)] = {
q1 ++= q2
while (q1.length > num) {
q1.dequeue()
}
q1
}
override def finish(r: mutable.PriorityQueue[(K2, V)]): Array[(K2, V)] = {
r.toArray.sorted(ord.reverse)
}
override def bufferEncoder: Encoder[mutable.PriorityQueue[(K2, V)]] = {
Encoders.kryo[mutable.PriorityQueue[(K2, V)]]
}
override def outputEncoder: Encoder[Array[(K2, V)]] = ExpressionEncoder[Array[(K2, V)]]()
}
實時推薦
日志預處理:構建kafka流,從大量日志數據中過濾出特定前綴的用戶評分log,並傳到recommender topic讓下面的spark streaming接收。
- 構建LogProcessor實現Processor接口,實現對於數據的處理
- 構建StreamsConfig配置數據
- 構建TopologyBuilder來設置數據處理拓撲關系,就是輸入、處理器和輸出的關系。
- 構建KafkaStreams來啟動整個處理
前提
- 在Redis集群中存儲了每一個用戶最近對電影的K次評分。實時算法可以快速獲取。
- 離線推薦算法已經將電影相似度矩陣提前計算到了MongoDB中。
- Kafka已經獲取到了用戶實時的評分數據。
讀取MonogDB中的相似度矩陣然后廣播。讀取kafka stream,提取里面的uid、mid、score評分、timestamp,即某個用戶對某部電影的進行評分從而產生的日志,然后foreachRDD,對每條日志執行下面三個計算。從被廣播的相似度矩陣中獲取這次被評分的電影P最相似的K個電影(當然,實現還會從評分集中獲取用戶已看過的電影,並將這些電影過濾掉)這些作為推薦的候選電影。然后獲取用戶最近N次電影評分,實際是從redis中讀取,這些數據用於調整候選電影的優先級別。后面更多就是利用Scala實現算法的思想了,和spark等技術相關性不大。首先新建一個數組存儲候選電影的評分、兩個map存儲候選電影的增強和減弱因子。然后對每個候選電影進行遍歷,遍歷中再遍歷用戶最近評分的電影。針對每個候選電影,看用戶最近評分的電影中有沒有與它相似的,有的話返回其在相似度表中的分數,沒有就返回0.0。然后如果相似度大於0.6,就將這個相似度作為權重,乘以用戶對其相似電影的評分。得出候選電影的初步評分。然后根據用戶對該相似電影的評分,如果大於3,則候選電影的增強因子+1,否則在減弱因子中-1。最后計算所有部候選電影的初步評分的均值,並加上其增強因子的對數,減去其減弱因子的對數,得出最終得分,並把這個得分表插入到MongoDB用於用戶評分后的推薦。
// 獲取當前最近的M次電影評分。實際是從redis中讀取,結果為Array[(Int, Double)],int是mid,double是評分
val userRecentlyRatings = getUserRecentRatings(ConnHelper.jedis, uid, MAX_USER_RATINGS_COUNT)
// 獲取這次被評分的電影P最相似的K個電影。結果為Array[Int]。思路是先從廣播中的map獲取相似電影Array[(Int, Double)],然后從MONGO_RATING_COLLECTION中獲取用戶已看過的電影,得到Array[Int],最后從相似的電影中filter用戶沒看過的,且只返回mid。
// simMovies實際上就是待推薦電影(僅僅根據相似度表得出的結果)。從電影相似度表中找出與用戶本次評分的電影相似的電影,並去掉用戶已經看過的。
val simMovies = getTopSimMovies(MAX_SIM_MOVIES_NUM, mid, uid, simMoviesBV.value)
// 計算待選電影的推薦優先級別
val streamRecs = computeMovieScores(simMoviesBV.value, userRecentlyRatings, simMovies)
// 具體實現如下
def computeMovieScores(simMovies: Map[Int, Array[(Int, Double)]], userRecentlyRatings: Array[(Int, Double)],
topSimMovie: Array[Int]): Array[(Int, Double)] ={
// 用於保存每個待選電影和最近評分的每一個電影的權重得分
val score = ArrayBuffer[(Int, Double)]()
// 用於保存每個電影的增強因子
val increMap = mutable.HashMap[Int, Int]()
// 用於保存每個電影的減弱因子
val decreMap = mutable.HashMap[Int, Int]()
for (topSimMovie <- topSimMovie; userRecentlyRating <- userRecentlyRatings) {
// 針對每個topSimMovie,看與它相似的電影中有沒有用戶最近評分的電影,有的話返回其在相似度表中的分數,沒有就返回0.0。具體實現看下面1
val simScore = getMoviesSimScore(simMovies, userRecentlyRating._1, topSimMovie)
if (simScore > 0.6) {
// topSimMovie的相似電影中如果有被用戶最近評過分的電影,且在相似度表的分數大於0.6,就將這個分數 * 用戶對topSimMovie相似電影中的某部電影的分數,並存儲到score
score += ((topSimMovie, simScore * userRecentlyRating._2))
// 根據用戶最近的評分對每一個topSimMovie的評分進行調整。
if (userRecentlyRating._2 > 3) {
increMap(topSimMovie) = increMap.getOrElse(topSimMovie, 0) + 1
} else {
decreMap(topSimMovie) = decreMap.getOrElse(topSimMovie, 0) + 1
}
}
}
// 最后分組聚合score,同時利用increMap和decreMap對分數進行調整
score.groupBy(_._1)
.map{case (mid, sims) => (mid, sims.map(_._2).sum / sims.length + log(increMap(mid)) - log(decreMap(mid)))}
.toArray
}
// 1
def getMoviesSimScore(simMovies: Map[Int, Array[(Int, Double)]], userRatingMovie: Int, topSimMovie: Int): Double ={
simMovies.get(topSimMovie) match {
case Some(sim) => // sim:與某部電影相似的一列電影,包含mid和score
var res = 0.0
sim.foreach(smid =>
res = if (smid._1 == userRatingMovie) smid._2 else 0.0
)
res
case None => 0.0
}
}
// 最后將結果存儲到MongoDB