电影推荐系统-整体总结(五)实时推荐


电影推荐系统-整体总结(五)实时推荐

一、Scala代码实现

1.自定义数据类--Model.scala

package streamingRecommender

/**
  * @Author : ASUS and xinrong
  * @Version : 2020/9/4
  *  数据格式转换类
  *  ---------------电影表------------------------
  *  1
  *  Toy Story (1995)
  *
  *  81 minutes
  *  March 20, 2001
  *  1995
  *  English
  *  Adventure|Animation|Children|Comedy|Fantasy
  *  Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn|John Ratzenberger|Annie Potts|John Morris|Erik von Detten|Laurie Metcalf|R. Lee Ermey|Sarah Freeman|Penn Jillette|Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn
  *  John Lasseter
  */
case class Movie(val mid:Int,val name:String,val descri:String,
                 val timelong:String,val cal_issue:String,val shoot:String,
                 val language:String,val genres :String,val actors:String,val directors:String)
/**
  * -----用户对电影的评分数据集--------
  * 1,31,2.5,1260759144
  */
case class MovieRating(val uid:Int,val mid:Int,val score:Double,val timastamp:Int)

/**
  * --------用户对电影的标签数据集--------
  * 15,339,sandra 'boring' bullock,1138537770
  */
case class Tag(val uid:Int,val mid:Int,val tag:String,val timestamp:Int)

/**
  *
  * MongoDB配置对象
  * @param uri
  * @param db
  */
case class MongoConfig(val uri:String,val db:String)

/**
  * ES配置对象
  * @param httpHosts
  * @param transportHosts:保存的是所有ES节点的信息
  * @param clusterName
  */
case class EsConfig(val httpHosts:String,val transportHosts:String,val index:String,val clusterName:String)

/**
  * recs的二次封装数据类
  * @param mid
  * @param res
  */
case class Recommendation(mid:Int,res:Double)

/**
  * Key-Value封装数据类
  * @param genres
  * @param recs
  */
case class GenresRecommendation(genres:String,recs:Seq[Recommendation])
//注:Seq-Sequence是一个特质,可以理解成一个列表;Recommendation是一个实现类

case class UserRecs(uid:Int,recs:Seq[Recommendation])

/**
  * 定义电影相似度
  * @param mid
  * @param recs
  * 注:Seq-Sequence是一个特质,可以理解成一个列表;Recommendation是一个自定义实现类
  */
case class MoviesRecs(mid:Int,recs:Seq[Recommendation])

2.StreamingRecommender类

package streamingRecommender

import com.mongodb.casbah
import com.mongodb.casbah.MongoClient
import com.mongodb.casbah.commons.MongoDBObject
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import redis.clients.jedis.Jedis

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
/**
  * @Author : ASUS and xinrong
  * @Version : 2020/9/25
  *         实时推荐部分
  */
object ConnHelper{
  lazy val jedis=new Jedis("192.168.212.21")
  lazy val mongoClient=MongoClient(casbah.MongoClientURI("mongodb://192.168.212.21:27017/recom"))
}
object StreamingRecommender {
  //声明
  val MAX_USER_RATINGS_NUM=20 //从Redis中获取多少个用户的评分
  val MAX_SIM_MOVIES_NUM=20 //相似电影候选表中取多少个电影
  val MONGODB_MOVIE_RECES_COLLECTION="MovieRecs"
  val MONGODB_RATING_COLLECTION="Rating"
  val MONGODB_STREAM_RECS_COLLECTION="StreamRecs" //实时推荐写入哪张表

  def main(args: Array[String]): Unit = {
    //一、声明Spark的环境、Kafka和MongoDB的相关信息--------------------------------------------------------------------
    val config = Map(
      "spark.core" -> "local[3]",
      "kafka.topic" -> "recom",
      "mongo.uri" -> "mongodb://192.168.212.21:27017/recom",
      "mongo.db" -> "recom"
    )

    val sparkConf = new SparkConf().setAppName("StreamingRecommender").setMaster(config("spark.core"))
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    val sc = sparkSession.sparkContext
    //使用SparkStreaming将连续的数据转化成不连续的RDD
    //指定采样时间:2秒
    val ssc = new StreamingContext(sc, Seconds(2))

    //定义隐式参数用于连接MongoDB
    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
    import sparkSession.implicits._ //制作共享变量--这个变量保存了含有所有电影的相似度矩阵
    val simMovieMatrix = sparkSession
      .read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_MOVIE_RECES_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load() //load()之后,数据由DataFrameReader变成DataFrame
      .as[MoviesRecs]
      .rdd //数据变成RDD[MoviesRecs]
      .map { //改变数据格式
      items =>
        (items.mid, items.recs.map(x => (x.mid, x.res)).toMap)
    }.collectAsMap() //将整体变成Map(映射的形式)--Map[Int,Map[Int,Double]]
    //将共享变量(广播变量)共享出去
    val simMovieMatrixBroadCast = sc.broadcast(simMovieMatrix)
    //val abc=sc.makeRDD(1 to 2)
    // abc.map(x=>simMovieMatrixBroadCast.value.get(1)).count()
    //**************************** Kafka的配置信息 ***********************************
    //存放Kafka的配置信息(基本上不太会改动)
    val kafkaconfig = Map(
      "bootstrap.servers" -> "192.168.212.21:9092", //Kafa的IP
      "key.deserializer" -> classOf[StringDeserializer], //编码解码工具
      "value.deserializer" -> classOf[StringDeserializer], //编码解码工具
      "group.id" -> "recomgroup" // 消费者组--注意:需要在Kafka的配置文件-server.properties里进行配置
    )

    //二、连接Kafka将数据获取进来-------------------------------------------------------------------------------------- //1.连接Kafka(直连的方式)
    //1)LocationStrategies--从...取数据,一般用:PreferConsistent(一般都是固定的)
    //查看其原理啥的参考:https://blog.csdn.net/Dax1n/article/details/61917718
    //2)ConsumerStrategies:和消费有关的
    //Subscribe后面最好加上:[String,String](不加也可)
    val kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(config("kafka.topic")), kafkaconfig))

    //三、接收评分流(Kafka中-实时数据)--UID | MID | Score | TIMESTAMP-------------------------------------------------------------------- //注意:Scala和java 里面符号"|"都需要用转义符进行转义
    //将数据重新组织了一下--用户ID,电影ID,评分,时间戳
    //kafkaStream是DStream
    val ratingStream = kafkaStream.map {
      items =>
        val strings = items.value().split("\\|")
        (strings(0).toInt, strings(1).toInt, strings(2).toDouble, strings(3).toInt)
    }
    //操作每一个RDD
    ratingStream.foreachRDD {
      rdd =>
        rdd.map {
          case (uid, mid, score, timestamp) =>
            //##################################  实时计算逻辑的实现  #################################
            //1)从redis中获取最近这段时间的M次评分(这里M=20)--Redis中-历史数据 //getUserRecentlyRating(评分次数,用户ID,定义的lazy变量)
            val userRecentlyRatings = getUserRecentlyRating(MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis)
            //2)获取最近浏览电影r最相似的M个电影--用共享变量的方式(电影之间的相似度矩阵已经在离线部分求过) //getTopSimMovies(评分次数,电影ID,用户ID,共享变量--这个变量保存了含有所有电影的相似度矩阵)
            //simMovieMatrixBroadCast---BroadCast[Map[Int,Map[Int,Double]]]
            //simMovieMatrixBroadCast.value---Map[Int,Map[Int,Double]]
            val simTopMovies = getTopSimMovies(MAX_SIM_MOVIES_NUM, uid, mid, simMovieMatrixBroadCast.value)

            //3)计算待选电影的推荐优先级(就是实现那个数学分析公式) //computMovieScores(共享变量-保存了含有所有电影的相似度矩阵,
            // 从redis中获取当前最近的M次评分(这里M=20),获取电影P最相似的K个电影)
            val streamRecs = computMovieScores(simMovieMatrixBroadCast.value, userRecentlyRatings, simTopMovies)

            //4)将数据保存到MongoDB中
 saveRecsToMongoDB(uid, streamRecs)
        }.count() //触发计算才有显示,对count()的结果我们不感兴趣,只是用他触发计算
        //运行Spark Streaming 启动流式计算
        ssc.start() //不会停的,等待手动停止
        ssc.awaitTermination()
    }

    /**
      * 从Redis 取数据--当前最近的(新加入的-实时的)num次评分
      * 注:从Kafka中抽取的实时数据里面的评分-score并没有用上,用的只是里面的uid、mid,这里就是根据kafka中最近观看电影的用户名在Redis的数据中进行匹配,从而找到对应的评分记录
      * @param num   评分个数
      * @param uid   谁评的分数
      * @param jedis 连接Redis的工具(客户端)
      * @return
      */
    def getUserRecentlyRating(num: Int, uid: Int, jedis: Jedis) = {
      //Redis中保存的数据格式:
      //lpush uid:1 1129:2.0 1172:4.0 1263:2.0 1287:2.0 1293:2.0 1339:3.5 1343:2.0 1371:2.5
      //.lrange()是因为获取Redis中的数据时,是这样的命令行,EG:192.168.212.21:6379> lrange uid:1 0 5
      //其中,start这里是0,stop这里是5
      //注意:jedis是Java里面的,.map()是Scala里面的东西,如果要将java 里面的东西用到Scala里面,需要引入:
      // import scala.collection.JavaConversions._
      jedis.lrange("uid" + uid.toString, 0, num - 1).map {
        items =>
          val strings = items.split("\\:")
          (strings(0).trim.toInt, strings(1).trim.toDouble)
      }.toArray
    }

    /**
      * 取出和当前电影r相似的前num个的相似电影
      *
      * @param num
      * @param mid
      * @param uid
      * @param simMovies
      * @param mongoConfig
      */
    def getTopSimMovies(num: Int, uid: Int, mid: Int, simMovies: collection.Map[Int, Map[Int, Double]])(implicit mongoConfig: MongoConfig) = {
      //1.从共享变量的电影相似度矩阵中获取和当前电影的所有相似电影
      //.get(mid)之后的allSimMovies的数据类型是:Option[Map[Int,Double]] //.get之后allSimMovies的数据类型变成:Map[Int,Double]
      val allSimMovies = simMovies.get(mid).get.toArray //再.toArray之后allSimMovies的数据类型是:Array[(Int,Double)]

      //2.获取用户已经观看过的电影
      //从Ratings表里面取出
      val usedMovies = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
        .find(MongoDBObject("uid" -> uid)).toArray.map {
        items =>
          items.get("mid").toString.toInt
      }
      //3.过滤掉已经评分的电影、排序输出(降序排序)
      allSimMovies.filter(x => !usedMovies.contains(x._1)).sortWith(_._2 > _._2).take(num).map(x => x._1)
    }

    /**
      * 计算待选电影的推荐分数
      *
      * @param simMovies           电影的相似度矩阵(历史数据)
      * @param userRecentlyRatings 用户最近对电影的评分-20个
      * @param topSimMovies        当前电影最相近的电影-20个
      */
    def computMovieScores(simMovies: collection.Map[Int, Map[Int, Double]], userRecentlyRatings: Array[(Int, Double)], topSimMovies: Array[Int]) = {
      //保存每一个待选电影和最近评分的每个电影的权重得分
      val score = ArrayBuffer[(Int, Double)]()
      //保存每一个电影的增强因子数 //Key-mid ,Value-有多少个
      val increMap = mutable.HashMap[Int, Int]()
      //保存每一个电影的减弱因子数 //Key-mid ,Value-有多少个
      val decreMap = mutable.HashMap[Int, Int]()
      //用户最近的评分 和 当前电影最相近的电影 要做 双重for循环
      //topSimMovies对应图中的A(B),userRecentlyRatings的对应图中的X Y Z
      for (topSimMovie <- topSimMovies; userRecentlyRating <- userRecentlyRatings) {
        //userRecentlyRating._1 就相当于电影r ; topSimMovie 就相当于电影q
        val simScore = getMoviesSimScore(simMovies, userRecentlyRating._1, topSimMovie)
        if (simScore > 0.6) {
          //score也就是公式中除号上面和号右侧的部分
          score += ((userRecentlyRating._1, simScore * userRecentlyRating._2))
          if (userRecentlyRating._2 >= 3) {
            //增强因子起作用
            increMap(topSimMovie) = increMap.getOrDefault(topSimMovie, 0) + 1
          } else {
            //否则减弱因子起作用
            decreMap(topSimMovie) = increMap.getOrDefault(topSimMovie, 0) + 1
          }
        }
      }
      score.groupBy(_._1).map {
        case (mid, sims) =>
          (mid, sims.map(_._2).sum / sims.length + log(increMap(mid)) - log(decreMap(mid)))
      }.toArray //变成一个数组保存
    }

    /**
      * 自写log()--实现lg()
      *
      * @param m
      * @return
      */
    def log(m: Int) = {
      //loh(2)、log(10) 都是可以的,感觉按照公式的话应该写成log(10)
      math.log(m) / math.log(10)
    }

    /**
      * 获取电影之间的相似度
      *
      * @param simMovies       和之前电影p最相似的电影集合
      * @param userRatingMovie 用户最近给最近电影的评分中的一个
      * @param topSimMovie     和当前电影最相似的电影集合中的一个
      */
    def getMoviesSimScore(simMovies: collection.Map[Int, Map[Int, Double]], userRatingMovie: Int, topSimMovie: Int) = {
      //模式匹配
      //注意写Some,这样最后得出结果的数据类型才会和simMovies里面的相同,都是Double
 simMovies.get(topSimMovie) match {
        case Some(sim) => sim.get(userRatingMovie) match {
          case Some(score) => score case None => 0.0
        }
        case None => 0.0 //None是Some的对立面
      }
    }

    /**
      * 向数据库中写入信息
      *
      * @param uid
      * @param streamRecs
      * @param mongoConfig
      */
    def saveRecsToMongoDB(uid: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig) = {
      val streamRecsCollect = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_STREAM_RECS_COLLECTION)
      //先删除
      streamRecsCollect.findAndRemove(MongoDBObject("uid"->uid))
      //再插入
      //将数据从(Int,Double)(Int,Double)(Int,Double)(Int,Double)
      //转换成Int:Double|Int:Double|Int:Double|Int:Double
      streamRecsCollect.insert(MongoDBObject("uid"->uid,"recs"->streamRecs.map(x=>x._1+":"+x._2).mkString("|")))
      println("save to mongodb")
    }
  }
}

 二、图解

1.实时部分的过程图

这里蓝色部分是用API实现的,蓝色的使用Kafka Stream这部分在企业中不常用。

 

 2.实时推荐部分程序实现的思维导图

 三、文字描述(由数学分析公式推导程序思路)

公式中重要组成部分的解释、求法

注:这里蓝色的代表历史数据,橘色的代表实时数据

1) sim(q,r)

求出“和p电影最为相似电影集合中的某个电影-q”和"最近观看的电影-r"之间的相似度

(1)"q"所在的集合S(取前num个)是由方法(二)得出的

     /**
      * 方法(二)取出和当前电影r相似的前num个的相似电影
      *
      * @param num
      * @param mid
      * @param uid
      * @param simMovies
      * @param mongoConfig
      */
    def getTopSimMovies(num: Int, uid: Int, mid: Int, simMovies: collection.Map[Int, Map[Int, Double]])(implicit mongoConfig: MongoConfig) = {
      //1.从共享变量的电影相似度矩阵中获取和当前电影的所有相似电影 //.get(mid)之后的allSimMovies的数据类型是:Option[Map[Int,Double]]
      //.get之后allSimMovies的数据类型变成:Map[Int,Double]
      val allSimMovies = simMovies.get(mid).get.toArray //再.toArray之后allSimMovies的数据类型是:Array[(Int,Double)]

      //2.获取用户已经观看过的电影 //从Ratings表里面取出
      val usedMovies = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
        .find(MongoDBObject("uid" -> uid)).toArray.map {
        items =>
          items.get("mid").toString.toInt
      }
      //3.过滤掉已经评分的电影、排序输出(降序排序)
      allSimMovies.filter(x => !usedMovies.contains(x._1)).sortWith(_._2 > _._2).take(num).map(x => x._1)
    }

(2)求出电影q和电影r之间的相似度

进行数据匹配--在“和电影p相似的电影集合”中匹配“和 当前浏览的电影r最相似的电影集合中的某个电影-q

                      --方法二求出的那个集合中的电影 (Map[Int, Map[Int, Double]]匹配的就是红色子的部分;

接着再对匹配成功的电影再次进行数据匹配--再匹配当前观看过的电影-r  (Map[Int, Map[Int, Double]]匹配的就是红色子的部分;

最后得出电影q和r之间的相似度。(Map[Int, Map[Int, Double]]就是红色子的部分

     /**
      * 方法(三)获取电影之间的相似度
      *
      * @param simMovies       和之前电影p相似的电影集合
      * @param userRatingMovie 用户最近给最近电影的评分中的一个--取的是mid (r)
      * @param topSimMovie     和当前电影最相似的电影集合中的一个(q) */
    def getMoviesSimScore(simMovies: collection.Map[Int, Map[Int, Double]], userRatingMovie: Int, topSimMovie: Int) = {
      //模式匹配
      //注意写Some,这样最后得出结果的数据类型才会和simMovies里面的相同,都是Double
 simMovies.get(topSimMovie) match {
        case Some(sim) => sim.get(userRatingMovie) match {
          case Some(score) => score
          case None => 0.0
        }
        case None => 0.0 //None是Some的对立面
      }
    }

2)Rr

用户对最近观看电影的评分

它们由方法(一)得出的

是根据Kafka中获取的实时数据(uid),和Redis中的历史数据得到的

     /**
      * 方法(一)从Redis 取数据--当前最近的(新加入的)num次评分
      *
      * @param num   评分个数
      * @param uid   谁评的分数
      * @param jedis 连接Redis的工具(客户端)
      * @return
      */
    def getUserRecentlyRating(num: Int, uid: Int, jedis: Jedis) = {
      //Redis中保存的数据格式:
      //lpush uid:1 1129:2.0 1172:4.0 1263:2.0 1287:2.0 1293:2.0 1339:3.5 1343:2.0 1371:2.5
      //.lrange()是因为获取Redis中的数据时,是这样的命令行,EG:192.168.212.21:6379> lrange uid:1 0 5
      //其中,start这里是0,stop这里是5
      //注意:jedis是Java里面的,.map()是Scala里面的东西,如果要将java 里面的东西用到Scala里面,需要引入:
      // import scala.collection.JavaConversions._
      jedis.lrange("uid" + uid.toString, 0, num - 1).map {
        items =>
          val strings = items.split("\\:")
          (strings(0).trim.toInt, strings(1).trim.toDouble)
      }.toArray
    }

3)RK(r属于RK)

用户u按时间顺序最近的K个评分

在求当前电影r相似的前num个的相似电影(方法二)的时候已经考虑到,其实公式中就是RK在体现“最近... ...个”的条件。

4)sim_sum

qRK中电影的相似度大于最小阈值的个数

这个和后面的5)可放在一起解释(就是公式彻底实现的部分)

5)incount、recount

incount: RK中与电影q相似的、且本身(电影r)评分较高(>=3)的电影个数

recount:RK中与电影q相似的、且本身(电影r评分较低(<3)的电影个数

下面这段程序就在“计算待选电影的推荐分数”的方法里:

 if (simScore > 0.6) {
          //score也就是公式中除号上面和号右侧的部分
          score += ((userRecentlyRating._1, simScore * userRecentlyRating._2))
          if (userRecentlyRating._2 >= 3) {
            //增强因子起作用
            increMap(topSimMovie) = increMap.getOrDefault(topSimMovie, 0) + 1
          } else {
            //否则减弱因子起作用
            decreMap(topSimMovie) = increMap.getOrDefault(topSimMovie, 0) + 1
          }
        }
      }
      score.groupBy(_._1).map {
        case (mid, sims) =>
          (mid, sims.map(_._2).sum / sims.length + log(increMap(mid)) - log(decreMap(mid)))
      }.toArray //变成一个数组保存

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM