电影推荐系统-整体总结(五)实时推荐
一、Scala代码实现
1.自定义数据类--Model.scala
package streamingRecommender /** * @Author : ASUS and xinrong * @Version : 2020/9/4 * 数据格式转换类 * ---------------电影表------------------------ * 1 * Toy Story (1995) * * 81 minutes * March 20, 2001 * 1995 * English * Adventure|Animation|Children|Comedy|Fantasy * Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn|John Ratzenberger|Annie Potts|John Morris|Erik von Detten|Laurie Metcalf|R. Lee Ermey|Sarah Freeman|Penn Jillette|Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn * John Lasseter */ case class Movie(val mid:Int,val name:String,val descri:String, val timelong:String,val cal_issue:String,val shoot:String, val language:String,val genres :String,val actors:String,val directors:String) /** * -----用户对电影的评分数据集-------- * 1,31,2.5,1260759144 */ case class MovieRating(val uid:Int,val mid:Int,val score:Double,val timastamp:Int) /** * --------用户对电影的标签数据集-------- * 15,339,sandra 'boring' bullock,1138537770 */ case class Tag(val uid:Int,val mid:Int,val tag:String,val timestamp:Int) /** * * MongoDB配置对象 * @param uri * @param db */ case class MongoConfig(val uri:String,val db:String) /** * ES配置对象 * @param httpHosts * @param transportHosts:保存的是所有ES节点的信息 * @param clusterName */ case class EsConfig(val httpHosts:String,val transportHosts:String,val index:String,val clusterName:String) /** * recs的二次封装数据类 * @param mid * @param res */ case class Recommendation(mid:Int,res:Double) /** * Key-Value封装数据类 * @param genres * @param recs */ case class GenresRecommendation(genres:String,recs:Seq[Recommendation]) //注:Seq-Sequence是一个特质,可以理解成一个列表;Recommendation是一个实现类 case class UserRecs(uid:Int,recs:Seq[Recommendation]) /** * 定义电影相似度 * @param mid * @param recs * 注:Seq-Sequence是一个特质,可以理解成一个列表;Recommendation是一个自定义实现类 */ case class MoviesRecs(mid:Int,recs:Seq[Recommendation])
2.StreamingRecommender类
package streamingRecommender import com.mongodb.casbah import com.mongodb.casbah.MongoClient import com.mongodb.casbah.commons.MongoDBObject import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.kafka.common.serialization.StringDeserializer import redis.clients.jedis.Jedis import scala.collection.JavaConversions._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer /** * @Author : ASUS and xinrong * @Version : 2020/9/25 * 实时推荐部分 */ object ConnHelper{ lazy val jedis=new Jedis("192.168.212.21") lazy val mongoClient=MongoClient(casbah.MongoClientURI("mongodb://192.168.212.21:27017/recom")) } object StreamingRecommender { //声明 val MAX_USER_RATINGS_NUM=20 //从Redis中获取多少个用户的评分 val MAX_SIM_MOVIES_NUM=20 //相似电影候选表中取多少个电影 val MONGODB_MOVIE_RECES_COLLECTION="MovieRecs" val MONGODB_RATING_COLLECTION="Rating" val MONGODB_STREAM_RECS_COLLECTION="StreamRecs" //实时推荐写入哪张表 def main(args: Array[String]): Unit = { //一、声明Spark的环境、Kafka和MongoDB的相关信息-------------------------------------------------------------------- val config = Map( "spark.core" -> "local[3]", "kafka.topic" -> "recom", "mongo.uri" -> "mongodb://192.168.212.21:27017/recom", "mongo.db" -> "recom" ) val sparkConf = new SparkConf().setAppName("StreamingRecommender").setMaster(config("spark.core")) val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() val sc = sparkSession.sparkContext //使用SparkStreaming将连续的数据转化成不连续的RDD //指定采样时间:2秒 val ssc = new StreamingContext(sc, Seconds(2)) //定义隐式参数用于连接MongoDB implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db")) import sparkSession.implicits._ //制作共享变量--这个变量保存了含有所有电影的相似度矩阵 val simMovieMatrix = sparkSession .read .option("uri", mongoConfig.uri) .option("collection", MONGODB_MOVIE_RECES_COLLECTION) .format("com.mongodb.spark.sql") .load() //load()之后,数据由DataFrameReader变成DataFrame .as[MoviesRecs] .rdd //数据变成RDD[MoviesRecs] .map { //改变数据格式 items => (items.mid, items.recs.map(x => (x.mid, x.res)).toMap) }.collectAsMap() //将整体变成Map(映射的形式)--Map[Int,Map[Int,Double]] //将共享变量(广播变量)共享出去 val simMovieMatrixBroadCast = sc.broadcast(simMovieMatrix) //val abc=sc.makeRDD(1 to 2) // abc.map(x=>simMovieMatrixBroadCast.value.get(1)).count() //**************************** Kafka的配置信息 *********************************** //存放Kafka的配置信息(基本上不太会改动) val kafkaconfig = Map( "bootstrap.servers" -> "192.168.212.21:9092", //Kafa的IP "key.deserializer" -> classOf[StringDeserializer], //编码解码工具 "value.deserializer" -> classOf[StringDeserializer], //编码解码工具 "group.id" -> "recomgroup" // 消费者组--注意:需要在Kafka的配置文件-server.properties里进行配置 ) //二、连接Kafka将数据获取进来-------------------------------------------------------------------------------------- //1.连接Kafka(直连的方式) //1)LocationStrategies--从...取数据,一般用:PreferConsistent(一般都是固定的) //查看其原理啥的参考:https://blog.csdn.net/Dax1n/article/details/61917718 //2)ConsumerStrategies:和消费有关的 //Subscribe后面最好加上:[String,String](不加也可) val kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(config("kafka.topic")), kafkaconfig)) //三、接收评分流(Kafka中-实时数据)--UID | MID | Score | TIMESTAMP-------------------------------------------------------------------- //注意:Scala和java 里面符号"|"都需要用转义符进行转义 //将数据重新组织了一下--用户ID,电影ID,评分,时间戳 //kafkaStream是DStream val ratingStream = kafkaStream.map { items => val strings = items.value().split("\\|") (strings(0).toInt, strings(1).toInt, strings(2).toDouble, strings(3).toInt) } //操作每一个RDD ratingStream.foreachRDD { rdd => rdd.map { case (uid, mid, score, timestamp) => //################################## 实时计算逻辑的实现 ################################# //1)从redis中获取最近这段时间的M次评分(这里M=20)--Redis中-历史数据 //getUserRecentlyRating(评分次数,用户ID,定义的lazy变量) val userRecentlyRatings = getUserRecentlyRating(MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis) //2)获取最近浏览电影r最相似的M个电影--用共享变量的方式(电影之间的相似度矩阵已经在离线部分求过) //getTopSimMovies(评分次数,电影ID,用户ID,共享变量--这个变量保存了含有所有电影的相似度矩阵) //simMovieMatrixBroadCast---BroadCast[Map[Int,Map[Int,Double]]] //simMovieMatrixBroadCast.value---Map[Int,Map[Int,Double]] val simTopMovies = getTopSimMovies(MAX_SIM_MOVIES_NUM, uid, mid, simMovieMatrixBroadCast.value) //3)计算待选电影的推荐优先级(就是实现那个数学分析公式) //computMovieScores(共享变量-保存了含有所有电影的相似度矩阵, // 从redis中获取当前最近的M次评分(这里M=20),获取电影P最相似的K个电影) val streamRecs = computMovieScores(simMovieMatrixBroadCast.value, userRecentlyRatings, simTopMovies) //4)将数据保存到MongoDB中 saveRecsToMongoDB(uid, streamRecs) }.count() //触发计算才有显示,对count()的结果我们不感兴趣,只是用他触发计算 //运行Spark Streaming 启动流式计算 ssc.start() //不会停的,等待手动停止 ssc.awaitTermination() } /** * 从Redis 取数据--当前最近的(新加入的-实时的)num次评分 * 注:从Kafka中抽取的实时数据里面的评分-score并没有用上,用的只是里面的uid、mid,这里就是根据kafka中最近观看电影的用户名在Redis的数据中进行匹配,从而找到对应的评分记录 * @param num 评分个数 * @param uid 谁评的分数 * @param jedis 连接Redis的工具(客户端) * @return */ def getUserRecentlyRating(num: Int, uid: Int, jedis: Jedis) = { //Redis中保存的数据格式: //lpush uid:1 1129:2.0 1172:4.0 1263:2.0 1287:2.0 1293:2.0 1339:3.5 1343:2.0 1371:2.5 //.lrange()是因为获取Redis中的数据时,是这样的命令行,EG:192.168.212.21:6379> lrange uid:1 0 5 //其中,start这里是0,stop这里是5 //注意:jedis是Java里面的,.map()是Scala里面的东西,如果要将java 里面的东西用到Scala里面,需要引入: // import scala.collection.JavaConversions._ jedis.lrange("uid" + uid.toString, 0, num - 1).map { items => val strings = items.split("\\:") (strings(0).trim.toInt, strings(1).trim.toDouble) }.toArray } /** * 取出和当前电影r相似的前num个的相似电影 * * @param num * @param mid * @param uid * @param simMovies * @param mongoConfig */ def getTopSimMovies(num: Int, uid: Int, mid: Int, simMovies: collection.Map[Int, Map[Int, Double]])(implicit mongoConfig: MongoConfig) = { //1.从共享变量的电影相似度矩阵中获取和当前电影的所有相似电影 //.get(mid)之后的allSimMovies的数据类型是:Option[Map[Int,Double]] //.get之后allSimMovies的数据类型变成:Map[Int,Double] val allSimMovies = simMovies.get(mid).get.toArray //再.toArray之后allSimMovies的数据类型是:Array[(Int,Double)] //2.获取用户已经观看过的电影 //从Ratings表里面取出 val usedMovies = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION) .find(MongoDBObject("uid" -> uid)).toArray.map { items => items.get("mid").toString.toInt } //3.过滤掉已经评分的电影、排序输出(降序排序) allSimMovies.filter(x => !usedMovies.contains(x._1)).sortWith(_._2 > _._2).take(num).map(x => x._1) } /** * 计算待选电影的推荐分数 * * @param simMovies 电影的相似度矩阵(历史数据) * @param userRecentlyRatings 用户最近对电影的评分-20个 * @param topSimMovies 当前电影最相近的电影-20个 */ def computMovieScores(simMovies: collection.Map[Int, Map[Int, Double]], userRecentlyRatings: Array[(Int, Double)], topSimMovies: Array[Int]) = { //保存每一个待选电影和最近评分的每个电影的权重得分 val score = ArrayBuffer[(Int, Double)]() //保存每一个电影的增强因子数 //Key-mid ,Value-有多少个 val increMap = mutable.HashMap[Int, Int]() //保存每一个电影的减弱因子数 //Key-mid ,Value-有多少个 val decreMap = mutable.HashMap[Int, Int]() //用户最近的评分 和 当前电影最相近的电影 要做 双重for循环 //topSimMovies对应图中的A(B),userRecentlyRatings的对应图中的X Y Z for (topSimMovie <- topSimMovies; userRecentlyRating <- userRecentlyRatings) { //userRecentlyRating._1 就相当于电影r ; topSimMovie 就相当于电影q val simScore = getMoviesSimScore(simMovies, userRecentlyRating._1, topSimMovie) if (simScore > 0.6) { //score也就是公式中除号上面和号右侧的部分 score += ((userRecentlyRating._1, simScore * userRecentlyRating._2)) if (userRecentlyRating._2 >= 3) { //增强因子起作用 increMap(topSimMovie) = increMap.getOrDefault(topSimMovie, 0) + 1 } else { //否则减弱因子起作用 decreMap(topSimMovie) = increMap.getOrDefault(topSimMovie, 0) + 1 } } } score.groupBy(_._1).map { case (mid, sims) => (mid, sims.map(_._2).sum / sims.length + log(increMap(mid)) - log(decreMap(mid))) }.toArray //变成一个数组保存 } /** * 自写log()--实现lg() * * @param m * @return */ def log(m: Int) = { //loh(2)、log(10) 都是可以的,感觉按照公式的话应该写成log(10) math.log(m) / math.log(10) } /** * 获取电影之间的相似度 * * @param simMovies 和之前电影p最相似的电影集合 * @param userRatingMovie 用户最近给最近电影的评分中的一个 * @param topSimMovie 和当前电影最相似的电影集合中的一个 */ def getMoviesSimScore(simMovies: collection.Map[Int, Map[Int, Double]], userRatingMovie: Int, topSimMovie: Int) = { //模式匹配 //注意写Some,这样最后得出结果的数据类型才会和simMovies里面的相同,都是Double simMovies.get(topSimMovie) match { case Some(sim) => sim.get(userRatingMovie) match { case Some(score) => score case None => 0.0 } case None => 0.0 //None是Some的对立面 } } /** * 向数据库中写入信息 * * @param uid * @param streamRecs * @param mongoConfig */ def saveRecsToMongoDB(uid: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig) = { val streamRecsCollect = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_STREAM_RECS_COLLECTION) //先删除 streamRecsCollect.findAndRemove(MongoDBObject("uid"->uid)) //再插入 //将数据从(Int,Double)(Int,Double)(Int,Double)(Int,Double) //转换成Int:Double|Int:Double|Int:Double|Int:Double streamRecsCollect.insert(MongoDBObject("uid"->uid,"recs"->streamRecs.map(x=>x._1+":"+x._2).mkString("|"))) println("save to mongodb") } } }
二、图解
1.实时部分的过程图
这里蓝色部分是用API实现的,蓝色的使用Kafka Stream这部分在企业中不常用。
2.实时推荐部分程序实现的思维导图
三、文字描述(由数学分析公式推导程序思路)
公式中重要组成部分的解释、求法
注:这里蓝色的代表历史数据,橘色的代表实时数据
1) sim(q,r)
求出“和p电影最为相似电影集合中的某个电影-q”和"最近观看的电影-r"之间的相似度。
(1)"q"所在的集合S(取前num个)是由方法(二)得出的
/** * 方法(二)取出和当前电影r相似的前num个的相似电影 * * @param num * @param mid * @param uid * @param simMovies * @param mongoConfig */ def getTopSimMovies(num: Int, uid: Int, mid: Int, simMovies: collection.Map[Int, Map[Int, Double]])(implicit mongoConfig: MongoConfig) = { //1.从共享变量的电影相似度矩阵中获取和当前电影的所有相似电影 //.get(mid)之后的allSimMovies的数据类型是:Option[Map[Int,Double]] //.get之后allSimMovies的数据类型变成:Map[Int,Double] val allSimMovies = simMovies.get(mid).get.toArray //再.toArray之后allSimMovies的数据类型是:Array[(Int,Double)] //2.获取用户已经观看过的电影 //从Ratings表里面取出 val usedMovies = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION) .find(MongoDBObject("uid" -> uid)).toArray.map { items => items.get("mid").toString.toInt } //3.过滤掉已经评分的电影、排序输出(降序排序) allSimMovies.filter(x => !usedMovies.contains(x._1)).sortWith(_._2 > _._2).take(num).map(x => x._1) }
(2)求出电影q和电影r之间的相似度
进行数据匹配--在“和电影p相似的电影集合”中匹配“和 当前浏览的电影r最相似的电影集合中的某个电影-q”
--方法二求出的那个集合中的电影 (Map[Int, Map[Int, Double]])匹配的就是红色子的部分;
接着再对匹配成功的电影再次进行数据匹配--再匹配当前观看过的电影-r (Map[Int, Map[Int, Double]])匹配的就是红色子的部分;
最后得出电影q和r之间的相似度。(Map[Int, Map[Int, Double]])就是红色子的部分
/** * 方法(三)获取电影之间的相似度 * * @param simMovies 和之前电影p相似的电影集合 * @param userRatingMovie 用户最近给最近电影的评分中的一个--取的是mid (r) * @param topSimMovie 和当前电影最相似的电影集合中的一个(q) */ def getMoviesSimScore(simMovies: collection.Map[Int, Map[Int, Double]], userRatingMovie: Int, topSimMovie: Int) = { //模式匹配 //注意写Some,这样最后得出结果的数据类型才会和simMovies里面的相同,都是Double simMovies.get(topSimMovie) match { case Some(sim) => sim.get(userRatingMovie) match { case Some(score) => score case None => 0.0 } case None => 0.0 //None是Some的对立面 } }
2)Rr
用户对最近观看电影的评分
它们由方法(一)得出的
是根据Kafka中获取的实时数据(uid),和Redis中的历史数据得到的
/** * 方法(一)从Redis 取数据--当前最近的(新加入的)num次评分 * * @param num 评分个数 * @param uid 谁评的分数 * @param jedis 连接Redis的工具(客户端) * @return */ def getUserRecentlyRating(num: Int, uid: Int, jedis: Jedis) = { //Redis中保存的数据格式: //lpush uid:1 1129:2.0 1172:4.0 1263:2.0 1287:2.0 1293:2.0 1339:3.5 1343:2.0 1371:2.5 //.lrange()是因为获取Redis中的数据时,是这样的命令行,EG:192.168.212.21:6379> lrange uid:1 0 5 //其中,start这里是0,stop这里是5 //注意:jedis是Java里面的,.map()是Scala里面的东西,如果要将java 里面的东西用到Scala里面,需要引入: // import scala.collection.JavaConversions._ jedis.lrange("uid" + uid.toString, 0, num - 1).map { items => val strings = items.split("\\:") (strings(0).trim.toInt, strings(1).trim.toDouble) }.toArray }
3)RK(r属于RK)
用户u按时间顺序最近的K个评分
在求和当前电影r相似的前num个的相似电影(方法二)的时候已经考虑到,其实公式中就是RK在体现“最近... ...个”的条件。
4)sim_sum
q与RK中电影的相似度大于最小阈值的个数
这个和后面的5)可放在一起解释(就是公式彻底实现的部分)
5)incount、recount
incount: RK中与电影q相似的、且本身(电影r)评分较高(>=3)的电影个数;
recount:RK中与电影q相似的、且本身(电影r)评分较低(<3)的电影个数。
下面这段程序就在“计算待选电影的推荐分数”的方法里:
if (simScore > 0.6) { //score也就是公式中除号上面和号右侧的部分 score += ((userRecentlyRating._1, simScore * userRecentlyRating._2)) if (userRecentlyRating._2 >= 3) { //增强因子起作用 increMap(topSimMovie) = increMap.getOrDefault(topSimMovie, 0) + 1 } else { //否则减弱因子起作用 decreMap(topSimMovie) = increMap.getOrDefault(topSimMovie, 0) + 1 } } } score.groupBy(_._1).map { case (mid, sims) => (mid, sims.map(_._2).sum / sims.length + log(increMap(mid)) - log(decreMap(mid))) }.toArray //变成一个数组保存