電影推薦系統-整體總結(五)實時推薦
一、Scala代碼實現
1.自定義數據類--Model.scala
package streamingRecommender /** * @Author : ASUS and xinrong * @Version : 2020/9/4 * 數據格式轉換類 * ---------------電影表------------------------ * 1 * Toy Story (1995) * * 81 minutes * March 20, 2001 * 1995 * English * Adventure|Animation|Children|Comedy|Fantasy * Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn|John Ratzenberger|Annie Potts|John Morris|Erik von Detten|Laurie Metcalf|R. Lee Ermey|Sarah Freeman|Penn Jillette|Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn * John Lasseter */ case class Movie(val mid:Int,val name:String,val descri:String, val timelong:String,val cal_issue:String,val shoot:String, val language:String,val genres :String,val actors:String,val directors:String) /** * -----用戶對電影的評分數據集-------- * 1,31,2.5,1260759144 */ case class MovieRating(val uid:Int,val mid:Int,val score:Double,val timastamp:Int) /** * --------用戶對電影的標簽數據集-------- * 15,339,sandra 'boring' bullock,1138537770 */ case class Tag(val uid:Int,val mid:Int,val tag:String,val timestamp:Int) /** * * MongoDB配置對象 * @param uri * @param db */ case class MongoConfig(val uri:String,val db:String) /** * ES配置對象 * @param httpHosts * @param transportHosts:保存的是所有ES節點的信息 * @param clusterName */ case class EsConfig(val httpHosts:String,val transportHosts:String,val index:String,val clusterName:String) /** * recs的二次封裝數據類 * @param mid * @param res */ case class Recommendation(mid:Int,res:Double) /** * Key-Value封裝數據類 * @param genres * @param recs */ case class GenresRecommendation(genres:String,recs:Seq[Recommendation]) //注:Seq-Sequence是一個特質,可以理解成一個列表;Recommendation是一個實現類 case class UserRecs(uid:Int,recs:Seq[Recommendation]) /** * 定義電影相似度 * @param mid * @param recs * 注:Seq-Sequence是一個特質,可以理解成一個列表;Recommendation是一個自定義實現類 */ case class MoviesRecs(mid:Int,recs:Seq[Recommendation])
2.StreamingRecommender類
package streamingRecommender import com.mongodb.casbah import com.mongodb.casbah.MongoClient import com.mongodb.casbah.commons.MongoDBObject import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.kafka.common.serialization.StringDeserializer import redis.clients.jedis.Jedis import scala.collection.JavaConversions._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer /** * @Author : ASUS and xinrong * @Version : 2020/9/25 * 實時推薦部分 */ object ConnHelper{ lazy val jedis=new Jedis("192.168.212.21") lazy val mongoClient=MongoClient(casbah.MongoClientURI("mongodb://192.168.212.21:27017/recom")) } object StreamingRecommender { //聲明 val MAX_USER_RATINGS_NUM=20 //從Redis中獲取多少個用戶的評分 val MAX_SIM_MOVIES_NUM=20 //相似電影候選表中取多少個電影 val MONGODB_MOVIE_RECES_COLLECTION="MovieRecs" val MONGODB_RATING_COLLECTION="Rating" val MONGODB_STREAM_RECS_COLLECTION="StreamRecs" //實時推薦寫入哪張表 def main(args: Array[String]): Unit = { //一、聲明Spark的環境、Kafka和MongoDB的相關信息-------------------------------------------------------------------- val config = Map( "spark.core" -> "local[3]", "kafka.topic" -> "recom", "mongo.uri" -> "mongodb://192.168.212.21:27017/recom", "mongo.db" -> "recom" ) val sparkConf = new SparkConf().setAppName("StreamingRecommender").setMaster(config("spark.core")) val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() val sc = sparkSession.sparkContext //使用SparkStreaming將連續的數據轉化成不連續的RDD //指定采樣時間:2秒 val ssc = new StreamingContext(sc, Seconds(2)) //定義隱式參數用於連接MongoDB implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db")) import sparkSession.implicits._ //制作共享變量--這個變量保存了含有所有電影的相似度矩陣 val simMovieMatrix = sparkSession .read .option("uri", mongoConfig.uri) .option("collection", MONGODB_MOVIE_RECES_COLLECTION) .format("com.mongodb.spark.sql") .load() //load()之后,數據由DataFrameReader變成DataFrame .as[MoviesRecs] .rdd //數據變成RDD[MoviesRecs] .map { //改變數據格式 items => (items.mid, items.recs.map(x => (x.mid, x.res)).toMap) }.collectAsMap() //將整體變成Map(映射的形式)--Map[Int,Map[Int,Double]] //將共享變量(廣播變量)共享出去 val simMovieMatrixBroadCast = sc.broadcast(simMovieMatrix) //val abc=sc.makeRDD(1 to 2) // abc.map(x=>simMovieMatrixBroadCast.value.get(1)).count() //**************************** Kafka的配置信息 *********************************** //存放Kafka的配置信息(基本上不太會改動) val kafkaconfig = Map( "bootstrap.servers" -> "192.168.212.21:9092", //Kafa的IP "key.deserializer" -> classOf[StringDeserializer], //編碼解碼工具 "value.deserializer" -> classOf[StringDeserializer], //編碼解碼工具 "group.id" -> "recomgroup" // 消費者組--注意:需要在Kafka的配置文件-server.properties里進行配置 ) //二、連接Kafka將數據獲取進來-------------------------------------------------------------------------------------- //1.連接Kafka(直連的方式) //1)LocationStrategies--從...取數據,一般用:PreferConsistent(一般都是固定的) //查看其原理啥的參考:https://blog.csdn.net/Dax1n/article/details/61917718 //2)ConsumerStrategies:和消費有關的 //Subscribe后面最好加上:[String,String](不加也可) val kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(config("kafka.topic")), kafkaconfig)) //三、接收評分流(Kafka中-實時數據)--UID | MID | Score | TIMESTAMP-------------------------------------------------------------------- //注意:Scala和java 里面符號"|"都需要用轉義符進行轉義 //將數據重新組織了一下--用戶ID,電影ID,評分,時間戳 //kafkaStream是DStream val ratingStream = kafkaStream.map { items => val strings = items.value().split("\\|") (strings(0).toInt, strings(1).toInt, strings(2).toDouble, strings(3).toInt) } //操作每一個RDD ratingStream.foreachRDD { rdd => rdd.map { case (uid, mid, score, timestamp) => //################################## 實時計算邏輯的實現 ################################# //1)從redis中獲取最近這段時間的M次評分(這里M=20)--Redis中-歷史數據 //getUserRecentlyRating(評分次數,用戶ID,定義的lazy變量) val userRecentlyRatings = getUserRecentlyRating(MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis) //2)獲取最近瀏覽電影r最相似的M個電影--用共享變量的方式(電影之間的相似度矩陣已經在離線部分求過) //getTopSimMovies(評分次數,電影ID,用戶ID,共享變量--這個變量保存了含有所有電影的相似度矩陣) //simMovieMatrixBroadCast---BroadCast[Map[Int,Map[Int,Double]]] //simMovieMatrixBroadCast.value---Map[Int,Map[Int,Double]] val simTopMovies = getTopSimMovies(MAX_SIM_MOVIES_NUM, uid, mid, simMovieMatrixBroadCast.value) //3)計算待選電影的推薦優先級(就是實現那個數學分析公式) //computMovieScores(共享變量-保存了含有所有電影的相似度矩陣, // 從redis中獲取當前最近的M次評分(這里M=20),獲取電影P最相似的K個電影) val streamRecs = computMovieScores(simMovieMatrixBroadCast.value, userRecentlyRatings, simTopMovies) //4)將數據保存到MongoDB中 saveRecsToMongoDB(uid, streamRecs) }.count() //觸發計算才有顯示,對count()的結果我們不感興趣,只是用他觸發計算 //運行Spark Streaming 啟動流式計算 ssc.start() //不會停的,等待手動停止 ssc.awaitTermination() } /** * 從Redis 取數據--當前最近的(新加入的-實時的)num次評分 * 注:從Kafka中抽取的實時數據里面的評分-score並沒有用上,用的只是里面的uid、mid,這里就是根據kafka中最近觀看電影的用戶名在Redis的數據中進行匹配,從而找到對應的評分記錄 * @param num 評分個數 * @param uid 誰評的分數 * @param jedis 連接Redis的工具(客戶端) * @return */ def getUserRecentlyRating(num: Int, uid: Int, jedis: Jedis) = { //Redis中保存的數據格式: //lpush uid:1 1129:2.0 1172:4.0 1263:2.0 1287:2.0 1293:2.0 1339:3.5 1343:2.0 1371:2.5 //.lrange()是因為獲取Redis中的數據時,是這樣的命令行,EG:192.168.212.21:6379> lrange uid:1 0 5 //其中,start這里是0,stop這里是5 //注意:jedis是Java里面的,.map()是Scala里面的東西,如果要將java 里面的東西用到Scala里面,需要引入: // import scala.collection.JavaConversions._ jedis.lrange("uid" + uid.toString, 0, num - 1).map { items => val strings = items.split("\\:") (strings(0).trim.toInt, strings(1).trim.toDouble) }.toArray } /** * 取出和當前電影r相似的前num個的相似電影 * * @param num * @param mid * @param uid * @param simMovies * @param mongoConfig */ def getTopSimMovies(num: Int, uid: Int, mid: Int, simMovies: collection.Map[Int, Map[Int, Double]])(implicit mongoConfig: MongoConfig) = { //1.從共享變量的電影相似度矩陣中獲取和當前電影的所有相似電影 //.get(mid)之后的allSimMovies的數據類型是:Option[Map[Int,Double]] //.get之后allSimMovies的數據類型變成:Map[Int,Double] val allSimMovies = simMovies.get(mid).get.toArray //再.toArray之后allSimMovies的數據類型是:Array[(Int,Double)] //2.獲取用戶已經觀看過的電影 //從Ratings表里面取出 val usedMovies = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION) .find(MongoDBObject("uid" -> uid)).toArray.map { items => items.get("mid").toString.toInt } //3.過濾掉已經評分的電影、排序輸出(降序排序) allSimMovies.filter(x => !usedMovies.contains(x._1)).sortWith(_._2 > _._2).take(num).map(x => x._1) } /** * 計算待選電影的推薦分數 * * @param simMovies 電影的相似度矩陣(歷史數據) * @param userRecentlyRatings 用戶最近對電影的評分-20個 * @param topSimMovies 當前電影最相近的電影-20個 */ def computMovieScores(simMovies: collection.Map[Int, Map[Int, Double]], userRecentlyRatings: Array[(Int, Double)], topSimMovies: Array[Int]) = { //保存每一個待選電影和最近評分的每個電影的權重得分 val score = ArrayBuffer[(Int, Double)]() //保存每一個電影的增強因子數 //Key-mid ,Value-有多少個 val increMap = mutable.HashMap[Int, Int]() //保存每一個電影的減弱因子數 //Key-mid ,Value-有多少個 val decreMap = mutable.HashMap[Int, Int]() //用戶最近的評分 和 當前電影最相近的電影 要做 雙重for循環 //topSimMovies對應圖中的A(B),userRecentlyRatings的對應圖中的X Y Z for (topSimMovie <- topSimMovies; userRecentlyRating <- userRecentlyRatings) { //userRecentlyRating._1 就相當於電影r ; topSimMovie 就相當於電影q val simScore = getMoviesSimScore(simMovies, userRecentlyRating._1, topSimMovie) if (simScore > 0.6) { //score也就是公式中除號上面和號右側的部分 score += ((userRecentlyRating._1, simScore * userRecentlyRating._2)) if (userRecentlyRating._2 >= 3) { //增強因子起作用 increMap(topSimMovie) = increMap.getOrDefault(topSimMovie, 0) + 1 } else { //否則減弱因子起作用 decreMap(topSimMovie) = increMap.getOrDefault(topSimMovie, 0) + 1 } } } score.groupBy(_._1).map { case (mid, sims) => (mid, sims.map(_._2).sum / sims.length + log(increMap(mid)) - log(decreMap(mid))) }.toArray //變成一個數組保存 } /** * 自寫log()--實現lg() * * @param m * @return */ def log(m: Int) = { //loh(2)、log(10) 都是可以的,感覺按照公式的話應該寫成log(10) math.log(m) / math.log(10) } /** * 獲取電影之間的相似度 * * @param simMovies 和之前電影p最相似的電影集合 * @param userRatingMovie 用戶最近給最近電影的評分中的一個 * @param topSimMovie 和當前電影最相似的電影集合中的一個 */ def getMoviesSimScore(simMovies: collection.Map[Int, Map[Int, Double]], userRatingMovie: Int, topSimMovie: Int) = { //模式匹配 //注意寫Some,這樣最后得出結果的數據類型才會和simMovies里面的相同,都是Double simMovies.get(topSimMovie) match { case Some(sim) => sim.get(userRatingMovie) match { case Some(score) => score case None => 0.0 } case None => 0.0 //None是Some的對立面 } } /** * 向數據庫中寫入信息 * * @param uid * @param streamRecs * @param mongoConfig */ def saveRecsToMongoDB(uid: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig) = { val streamRecsCollect = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_STREAM_RECS_COLLECTION) //先刪除 streamRecsCollect.findAndRemove(MongoDBObject("uid"->uid)) //再插入 //將數據從(Int,Double)(Int,Double)(Int,Double)(Int,Double) //轉換成Int:Double|Int:Double|Int:Double|Int:Double streamRecsCollect.insert(MongoDBObject("uid"->uid,"recs"->streamRecs.map(x=>x._1+":"+x._2).mkString("|"))) println("save to mongodb") } } }
二、圖解
1.實時部分的過程圖

這里藍色部分是用API實現的,藍色的使用Kafka Stream這部分在企業中不常用。


2.實時推薦部分程序實現的思維導圖

三、文字描述(由數學分析公式推導程序思路)

公式中重要組成部分的解釋、求法
注:這里藍色的代表歷史數據,橘色的代表實時數據
1) sim(q,r)
求出“和p電影最為相似電影集合中的某個電影-q”和"最近觀看的電影-r"之間的相似度。
(1)"q"所在的集合S(取前num個)是由方法(二)得出的
/** * 方法(二)取出和當前電影r相似的前num個的相似電影 * * @param num * @param mid * @param uid * @param simMovies * @param mongoConfig */ def getTopSimMovies(num: Int, uid: Int, mid: Int, simMovies: collection.Map[Int, Map[Int, Double]])(implicit mongoConfig: MongoConfig) = { //1.從共享變量的電影相似度矩陣中獲取和當前電影的所有相似電影 //.get(mid)之后的allSimMovies的數據類型是:Option[Map[Int,Double]] //.get之后allSimMovies的數據類型變成:Map[Int,Double] val allSimMovies = simMovies.get(mid).get.toArray //再.toArray之后allSimMovies的數據類型是:Array[(Int,Double)] //2.獲取用戶已經觀看過的電影 //從Ratings表里面取出 val usedMovies = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION) .find(MongoDBObject("uid" -> uid)).toArray.map { items => items.get("mid").toString.toInt } //3.過濾掉已經評分的電影、排序輸出(降序排序) allSimMovies.filter(x => !usedMovies.contains(x._1)).sortWith(_._2 > _._2).take(num).map(x => x._1) }
(2)求出電影q和電影r之間的相似度
進行數據匹配--在“和電影p相似的電影集合”中匹配“和 當前瀏覽的電影r最相似的電影集合中的某個電影-q”
--方法二求出的那個集合中的電影 (Map[Int, Map[Int, Double]])匹配的就是紅色子的部分;
接着再對匹配成功的電影再次進行數據匹配--再匹配當前觀看過的電影-r (Map[Int, Map[Int, Double]])匹配的就是紅色子的部分;
最后得出電影q和r之間的相似度。(Map[Int, Map[Int, Double]])就是紅色子的部分
/** * 方法(三)獲取電影之間的相似度 * * @param simMovies 和之前電影p相似的電影集合 * @param userRatingMovie 用戶最近給最近電影的評分中的一個--取的是mid (r) * @param topSimMovie 和當前電影最相似的電影集合中的一個(q) */ def getMoviesSimScore(simMovies: collection.Map[Int, Map[Int, Double]], userRatingMovie: Int, topSimMovie: Int) = { //模式匹配 //注意寫Some,這樣最后得出結果的數據類型才會和simMovies里面的相同,都是Double simMovies.get(topSimMovie) match { case Some(sim) => sim.get(userRatingMovie) match { case Some(score) => score case None => 0.0 } case None => 0.0 //None是Some的對立面 } }
2)Rr
用戶對最近觀看電影的評分
它們由方法(一)得出的
是根據Kafka中獲取的實時數據(uid),和Redis中的歷史數據得到的
/** * 方法(一)從Redis 取數據--當前最近的(新加入的)num次評分 * * @param num 評分個數 * @param uid 誰評的分數 * @param jedis 連接Redis的工具(客戶端) * @return */ def getUserRecentlyRating(num: Int, uid: Int, jedis: Jedis) = { //Redis中保存的數據格式: //lpush uid:1 1129:2.0 1172:4.0 1263:2.0 1287:2.0 1293:2.0 1339:3.5 1343:2.0 1371:2.5 //.lrange()是因為獲取Redis中的數據時,是這樣的命令行,EG:192.168.212.21:6379> lrange uid:1 0 5 //其中,start這里是0,stop這里是5 //注意:jedis是Java里面的,.map()是Scala里面的東西,如果要將java 里面的東西用到Scala里面,需要引入: // import scala.collection.JavaConversions._ jedis.lrange("uid" + uid.toString, 0, num - 1).map { items => val strings = items.split("\\:") (strings(0).trim.toInt, strings(1).trim.toDouble) }.toArray }
3)RK(r屬於RK)
用戶u按時間順序最近的K個評分
在求和當前電影r相似的前num個的相似電影(方法二)的時候已經考慮到,其實公式中就是RK在體現“最近... ...個”的條件。
4)sim_sum
q與RK中電影的相似度大於最小閾值的個數
這個和后面的5)可放在一起解釋(就是公式徹底實現的部分)
5)incount、recount
incount: RK中與電影q相似的、且本身(電影r)評分較高(>=3)的電影個數;
recount:RK中與電影q相似的、且本身(電影r)評分較低(<3)的電影個數。
下面這段程序就在“計算待選電影的推薦分數”的方法里:
if (simScore > 0.6) { //score也就是公式中除號上面和號右側的部分 score += ((userRecentlyRating._1, simScore * userRecentlyRating._2)) if (userRecentlyRating._2 >= 3) { //增強因子起作用 increMap(topSimMovie) = increMap.getOrDefault(topSimMovie, 0) + 1 } else { //否則減弱因子起作用 decreMap(topSimMovie) = increMap.getOrDefault(topSimMovie, 0) + 1 } } } score.groupBy(_._1).map { case (mid, sims) => (mid, sims.map(_._2).sum / sims.length + log(increMap(mid)) - log(decreMap(mid))) }.toArray //變成一個數組保存
