電影推薦系統-整體總結(五)實時推薦


電影推薦系統-整體總結(五)實時推薦

一、Scala代碼實現

1.自定義數據類--Model.scala

package streamingRecommender

/**
  * @Author : ASUS and xinrong
  * @Version : 2020/9/4
  *  數據格式轉換類
  *  ---------------電影表------------------------
  *  1
  *  Toy Story (1995)
  *
  *  81 minutes
  *  March 20, 2001
  *  1995
  *  English
  *  Adventure|Animation|Children|Comedy|Fantasy
  *  Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn|John Ratzenberger|Annie Potts|John Morris|Erik von Detten|Laurie Metcalf|R. Lee Ermey|Sarah Freeman|Penn Jillette|Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn
  *  John Lasseter
  */
case class Movie(val mid:Int,val name:String,val descri:String,
                 val timelong:String,val cal_issue:String,val shoot:String,
                 val language:String,val genres :String,val actors:String,val directors:String)
/**
  * -----用戶對電影的評分數據集--------
  * 1,31,2.5,1260759144
  */
case class MovieRating(val uid:Int,val mid:Int,val score:Double,val timastamp:Int)

/**
  * --------用戶對電影的標簽數據集--------
  * 15,339,sandra 'boring' bullock,1138537770
  */
case class Tag(val uid:Int,val mid:Int,val tag:String,val timestamp:Int)

/**
  *
  * MongoDB配置對象
  * @param uri
  * @param db
  */
case class MongoConfig(val uri:String,val db:String)

/**
  * ES配置對象
  * @param httpHosts
  * @param transportHosts:保存的是所有ES節點的信息
  * @param clusterName
  */
case class EsConfig(val httpHosts:String,val transportHosts:String,val index:String,val clusterName:String)

/**
  * recs的二次封裝數據類
  * @param mid
  * @param res
  */
case class Recommendation(mid:Int,res:Double)

/**
  * Key-Value封裝數據類
  * @param genres
  * @param recs
  */
case class GenresRecommendation(genres:String,recs:Seq[Recommendation])
//注:Seq-Sequence是一個特質,可以理解成一個列表;Recommendation是一個實現類

case class UserRecs(uid:Int,recs:Seq[Recommendation])

/**
  * 定義電影相似度
  * @param mid
  * @param recs
  * 注:Seq-Sequence是一個特質,可以理解成一個列表;Recommendation是一個自定義實現類
  */
case class MoviesRecs(mid:Int,recs:Seq[Recommendation])

2.StreamingRecommender類

package streamingRecommender

import com.mongodb.casbah
import com.mongodb.casbah.MongoClient
import com.mongodb.casbah.commons.MongoDBObject
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import redis.clients.jedis.Jedis

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
/**
  * @Author : ASUS and xinrong
  * @Version : 2020/9/25
  *         實時推薦部分
  */
object ConnHelper{
  lazy val jedis=new Jedis("192.168.212.21")
  lazy val mongoClient=MongoClient(casbah.MongoClientURI("mongodb://192.168.212.21:27017/recom"))
}
object StreamingRecommender {
  //聲明
  val MAX_USER_RATINGS_NUM=20 //從Redis中獲取多少個用戶的評分
  val MAX_SIM_MOVIES_NUM=20 //相似電影候選表中取多少個電影
  val MONGODB_MOVIE_RECES_COLLECTION="MovieRecs"
  val MONGODB_RATING_COLLECTION="Rating"
  val MONGODB_STREAM_RECS_COLLECTION="StreamRecs" //實時推薦寫入哪張表

  def main(args: Array[String]): Unit = {
    //一、聲明Spark的環境、Kafka和MongoDB的相關信息--------------------------------------------------------------------
    val config = Map(
      "spark.core" -> "local[3]",
      "kafka.topic" -> "recom",
      "mongo.uri" -> "mongodb://192.168.212.21:27017/recom",
      "mongo.db" -> "recom"
    )

    val sparkConf = new SparkConf().setAppName("StreamingRecommender").setMaster(config("spark.core"))
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    val sc = sparkSession.sparkContext
    //使用SparkStreaming將連續的數據轉化成不連續的RDD
    //指定采樣時間:2秒
    val ssc = new StreamingContext(sc, Seconds(2))

    //定義隱式參數用於連接MongoDB
    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
    import sparkSession.implicits._ //制作共享變量--這個變量保存了含有所有電影的相似度矩陣
    val simMovieMatrix = sparkSession
      .read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_MOVIE_RECES_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load() //load()之后,數據由DataFrameReader變成DataFrame
      .as[MoviesRecs]
      .rdd //數據變成RDD[MoviesRecs]
      .map { //改變數據格式
      items =>
        (items.mid, items.recs.map(x => (x.mid, x.res)).toMap)
    }.collectAsMap() //將整體變成Map(映射的形式)--Map[Int,Map[Int,Double]]
    //將共享變量(廣播變量)共享出去
    val simMovieMatrixBroadCast = sc.broadcast(simMovieMatrix)
    //val abc=sc.makeRDD(1 to 2)
    // abc.map(x=>simMovieMatrixBroadCast.value.get(1)).count()
    //**************************** Kafka的配置信息 ***********************************
    //存放Kafka的配置信息(基本上不太會改動)
    val kafkaconfig = Map(
      "bootstrap.servers" -> "192.168.212.21:9092", //Kafa的IP
      "key.deserializer" -> classOf[StringDeserializer], //編碼解碼工具
      "value.deserializer" -> classOf[StringDeserializer], //編碼解碼工具
      "group.id" -> "recomgroup" // 消費者組--注意:需要在Kafka的配置文件-server.properties里進行配置
    )

    //二、連接Kafka將數據獲取進來-------------------------------------------------------------------------------------- //1.連接Kafka(直連的方式)
    //1)LocationStrategies--從...取數據,一般用:PreferConsistent(一般都是固定的)
    //查看其原理啥的參考:https://blog.csdn.net/Dax1n/article/details/61917718
    //2)ConsumerStrategies:和消費有關的
    //Subscribe后面最好加上:[String,String](不加也可)
    val kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(config("kafka.topic")), kafkaconfig))

    //三、接收評分流(Kafka中-實時數據)--UID | MID | Score | TIMESTAMP-------------------------------------------------------------------- //注意:Scala和java 里面符號"|"都需要用轉義符進行轉義
    //將數據重新組織了一下--用戶ID,電影ID,評分,時間戳
    //kafkaStream是DStream
    val ratingStream = kafkaStream.map {
      items =>
        val strings = items.value().split("\\|")
        (strings(0).toInt, strings(1).toInt, strings(2).toDouble, strings(3).toInt)
    }
    //操作每一個RDD
    ratingStream.foreachRDD {
      rdd =>
        rdd.map {
          case (uid, mid, score, timestamp) =>
            //##################################  實時計算邏輯的實現  #################################
            //1)從redis中獲取最近這段時間的M次評分(這里M=20)--Redis中-歷史數據 //getUserRecentlyRating(評分次數,用戶ID,定義的lazy變量)
            val userRecentlyRatings = getUserRecentlyRating(MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis)
            //2)獲取最近瀏覽電影r最相似的M個電影--用共享變量的方式(電影之間的相似度矩陣已經在離線部分求過) //getTopSimMovies(評分次數,電影ID,用戶ID,共享變量--這個變量保存了含有所有電影的相似度矩陣)
            //simMovieMatrixBroadCast---BroadCast[Map[Int,Map[Int,Double]]]
            //simMovieMatrixBroadCast.value---Map[Int,Map[Int,Double]]
            val simTopMovies = getTopSimMovies(MAX_SIM_MOVIES_NUM, uid, mid, simMovieMatrixBroadCast.value)

            //3)計算待選電影的推薦優先級(就是實現那個數學分析公式) //computMovieScores(共享變量-保存了含有所有電影的相似度矩陣,
            // 從redis中獲取當前最近的M次評分(這里M=20),獲取電影P最相似的K個電影)
            val streamRecs = computMovieScores(simMovieMatrixBroadCast.value, userRecentlyRatings, simTopMovies)

            //4)將數據保存到MongoDB中
 saveRecsToMongoDB(uid, streamRecs)
        }.count() //觸發計算才有顯示,對count()的結果我們不感興趣,只是用他觸發計算
        //運行Spark Streaming 啟動流式計算
        ssc.start() //不會停的,等待手動停止
        ssc.awaitTermination()
    }

    /**
      * 從Redis 取數據--當前最近的(新加入的-實時的)num次評分
      * 注:從Kafka中抽取的實時數據里面的評分-score並沒有用上,用的只是里面的uid、mid,這里就是根據kafka中最近觀看電影的用戶名在Redis的數據中進行匹配,從而找到對應的評分記錄
      * @param num   評分個數
      * @param uid   誰評的分數
      * @param jedis 連接Redis的工具(客戶端)
      * @return
      */
    def getUserRecentlyRating(num: Int, uid: Int, jedis: Jedis) = {
      //Redis中保存的數據格式:
      //lpush uid:1 1129:2.0 1172:4.0 1263:2.0 1287:2.0 1293:2.0 1339:3.5 1343:2.0 1371:2.5
      //.lrange()是因為獲取Redis中的數據時,是這樣的命令行,EG:192.168.212.21:6379> lrange uid:1 0 5
      //其中,start這里是0,stop這里是5
      //注意:jedis是Java里面的,.map()是Scala里面的東西,如果要將java 里面的東西用到Scala里面,需要引入:
      // import scala.collection.JavaConversions._
      jedis.lrange("uid" + uid.toString, 0, num - 1).map {
        items =>
          val strings = items.split("\\:")
          (strings(0).trim.toInt, strings(1).trim.toDouble)
      }.toArray
    }

    /**
      * 取出和當前電影r相似的前num個的相似電影
      *
      * @param num
      * @param mid
      * @param uid
      * @param simMovies
      * @param mongoConfig
      */
    def getTopSimMovies(num: Int, uid: Int, mid: Int, simMovies: collection.Map[Int, Map[Int, Double]])(implicit mongoConfig: MongoConfig) = {
      //1.從共享變量的電影相似度矩陣中獲取和當前電影的所有相似電影
      //.get(mid)之后的allSimMovies的數據類型是:Option[Map[Int,Double]] //.get之后allSimMovies的數據類型變成:Map[Int,Double]
      val allSimMovies = simMovies.get(mid).get.toArray //再.toArray之后allSimMovies的數據類型是:Array[(Int,Double)]

      //2.獲取用戶已經觀看過的電影
      //從Ratings表里面取出
      val usedMovies = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
        .find(MongoDBObject("uid" -> uid)).toArray.map {
        items =>
          items.get("mid").toString.toInt
      }
      //3.過濾掉已經評分的電影、排序輸出(降序排序)
      allSimMovies.filter(x => !usedMovies.contains(x._1)).sortWith(_._2 > _._2).take(num).map(x => x._1)
    }

    /**
      * 計算待選電影的推薦分數
      *
      * @param simMovies           電影的相似度矩陣(歷史數據)
      * @param userRecentlyRatings 用戶最近對電影的評分-20個
      * @param topSimMovies        當前電影最相近的電影-20個
      */
    def computMovieScores(simMovies: collection.Map[Int, Map[Int, Double]], userRecentlyRatings: Array[(Int, Double)], topSimMovies: Array[Int]) = {
      //保存每一個待選電影和最近評分的每個電影的權重得分
      val score = ArrayBuffer[(Int, Double)]()
      //保存每一個電影的增強因子數 //Key-mid ,Value-有多少個
      val increMap = mutable.HashMap[Int, Int]()
      //保存每一個電影的減弱因子數 //Key-mid ,Value-有多少個
      val decreMap = mutable.HashMap[Int, Int]()
      //用戶最近的評分 和 當前電影最相近的電影 要做 雙重for循環
      //topSimMovies對應圖中的A(B),userRecentlyRatings的對應圖中的X Y Z
      for (topSimMovie <- topSimMovies; userRecentlyRating <- userRecentlyRatings) {
        //userRecentlyRating._1 就相當於電影r ; topSimMovie 就相當於電影q
        val simScore = getMoviesSimScore(simMovies, userRecentlyRating._1, topSimMovie)
        if (simScore > 0.6) {
          //score也就是公式中除號上面和號右側的部分
          score += ((userRecentlyRating._1, simScore * userRecentlyRating._2))
          if (userRecentlyRating._2 >= 3) {
            //增強因子起作用
            increMap(topSimMovie) = increMap.getOrDefault(topSimMovie, 0) + 1
          } else {
            //否則減弱因子起作用
            decreMap(topSimMovie) = increMap.getOrDefault(topSimMovie, 0) + 1
          }
        }
      }
      score.groupBy(_._1).map {
        case (mid, sims) =>
          (mid, sims.map(_._2).sum / sims.length + log(increMap(mid)) - log(decreMap(mid)))
      }.toArray //變成一個數組保存
    }

    /**
      * 自寫log()--實現lg()
      *
      * @param m
      * @return
      */
    def log(m: Int) = {
      //loh(2)、log(10) 都是可以的,感覺按照公式的話應該寫成log(10)
      math.log(m) / math.log(10)
    }

    /**
      * 獲取電影之間的相似度
      *
      * @param simMovies       和之前電影p最相似的電影集合
      * @param userRatingMovie 用戶最近給最近電影的評分中的一個
      * @param topSimMovie     和當前電影最相似的電影集合中的一個
      */
    def getMoviesSimScore(simMovies: collection.Map[Int, Map[Int, Double]], userRatingMovie: Int, topSimMovie: Int) = {
      //模式匹配
      //注意寫Some,這樣最后得出結果的數據類型才會和simMovies里面的相同,都是Double
 simMovies.get(topSimMovie) match {
        case Some(sim) => sim.get(userRatingMovie) match {
          case Some(score) => score case None => 0.0
        }
        case None => 0.0 //None是Some的對立面
      }
    }

    /**
      * 向數據庫中寫入信息
      *
      * @param uid
      * @param streamRecs
      * @param mongoConfig
      */
    def saveRecsToMongoDB(uid: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig) = {
      val streamRecsCollect = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_STREAM_RECS_COLLECTION)
      //先刪除
      streamRecsCollect.findAndRemove(MongoDBObject("uid"->uid))
      //再插入
      //將數據從(Int,Double)(Int,Double)(Int,Double)(Int,Double)
      //轉換成Int:Double|Int:Double|Int:Double|Int:Double
      streamRecsCollect.insert(MongoDBObject("uid"->uid,"recs"->streamRecs.map(x=>x._1+":"+x._2).mkString("|")))
      println("save to mongodb")
    }
  }
}

 二、圖解

1.實時部分的過程圖

這里藍色部分是用API實現的,藍色的使用Kafka Stream這部分在企業中不常用。

 

 2.實時推薦部分程序實現的思維導圖

 三、文字描述(由數學分析公式推導程序思路)

公式中重要組成部分的解釋、求法

注:這里藍色的代表歷史數據,橘色的代表實時數據

1) sim(q,r)

求出“和p電影最為相似電影集合中的某個電影-q”和"最近觀看的電影-r"之間的相似度

(1)"q"所在的集合S(取前num個)是由方法(二)得出的

     /**
      * 方法(二)取出和當前電影r相似的前num個的相似電影
      *
      * @param num
      * @param mid
      * @param uid
      * @param simMovies
      * @param mongoConfig
      */
    def getTopSimMovies(num: Int, uid: Int, mid: Int, simMovies: collection.Map[Int, Map[Int, Double]])(implicit mongoConfig: MongoConfig) = {
      //1.從共享變量的電影相似度矩陣中獲取和當前電影的所有相似電影 //.get(mid)之后的allSimMovies的數據類型是:Option[Map[Int,Double]]
      //.get之后allSimMovies的數據類型變成:Map[Int,Double]
      val allSimMovies = simMovies.get(mid).get.toArray //再.toArray之后allSimMovies的數據類型是:Array[(Int,Double)]

      //2.獲取用戶已經觀看過的電影 //從Ratings表里面取出
      val usedMovies = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
        .find(MongoDBObject("uid" -> uid)).toArray.map {
        items =>
          items.get("mid").toString.toInt
      }
      //3.過濾掉已經評分的電影、排序輸出(降序排序)
      allSimMovies.filter(x => !usedMovies.contains(x._1)).sortWith(_._2 > _._2).take(num).map(x => x._1)
    }

(2)求出電影q和電影r之間的相似度

進行數據匹配--在“和電影p相似的電影集合”中匹配“和 當前瀏覽的電影r最相似的電影集合中的某個電影-q

                      --方法二求出的那個集合中的電影 (Map[Int, Map[Int, Double]]匹配的就是紅色子的部分;

接着再對匹配成功的電影再次進行數據匹配--再匹配當前觀看過的電影-r  (Map[Int, Map[Int, Double]]匹配的就是紅色子的部分;

最后得出電影q和r之間的相似度。(Map[Int, Map[Int, Double]]就是紅色子的部分

     /**
      * 方法(三)獲取電影之間的相似度
      *
      * @param simMovies       和之前電影p相似的電影集合
      * @param userRatingMovie 用戶最近給最近電影的評分中的一個--取的是mid (r)
      * @param topSimMovie     和當前電影最相似的電影集合中的一個(q) */
    def getMoviesSimScore(simMovies: collection.Map[Int, Map[Int, Double]], userRatingMovie: Int, topSimMovie: Int) = {
      //模式匹配
      //注意寫Some,這樣最后得出結果的數據類型才會和simMovies里面的相同,都是Double
 simMovies.get(topSimMovie) match {
        case Some(sim) => sim.get(userRatingMovie) match {
          case Some(score) => score
          case None => 0.0
        }
        case None => 0.0 //None是Some的對立面
      }
    }

2)Rr

用戶對最近觀看電影的評分

它們由方法(一)得出的

是根據Kafka中獲取的實時數據(uid),和Redis中的歷史數據得到的

     /**
      * 方法(一)從Redis 取數據--當前最近的(新加入的)num次評分
      *
      * @param num   評分個數
      * @param uid   誰評的分數
      * @param jedis 連接Redis的工具(客戶端)
      * @return
      */
    def getUserRecentlyRating(num: Int, uid: Int, jedis: Jedis) = {
      //Redis中保存的數據格式:
      //lpush uid:1 1129:2.0 1172:4.0 1263:2.0 1287:2.0 1293:2.0 1339:3.5 1343:2.0 1371:2.5
      //.lrange()是因為獲取Redis中的數據時,是這樣的命令行,EG:192.168.212.21:6379> lrange uid:1 0 5
      //其中,start這里是0,stop這里是5
      //注意:jedis是Java里面的,.map()是Scala里面的東西,如果要將java 里面的東西用到Scala里面,需要引入:
      // import scala.collection.JavaConversions._
      jedis.lrange("uid" + uid.toString, 0, num - 1).map {
        items =>
          val strings = items.split("\\:")
          (strings(0).trim.toInt, strings(1).trim.toDouble)
      }.toArray
    }

3)RK(r屬於RK)

用戶u按時間順序最近的K個評分

在求當前電影r相似的前num個的相似電影(方法二)的時候已經考慮到,其實公式中就是RK在體現“最近... ...個”的條件。

4)sim_sum

qRK中電影的相似度大於最小閾值的個數

這個和后面的5)可放在一起解釋(就是公式徹底實現的部分)

5)incount、recount

incount: RK中與電影q相似的、且本身(電影r)評分較高(>=3)的電影個數

recount:RK中與電影q相似的、且本身(電影r評分較低(<3)的電影個數

下面這段程序就在“計算待選電影的推薦分數”的方法里:

 if (simScore > 0.6) {
          //score也就是公式中除號上面和號右側的部分
          score += ((userRecentlyRating._1, simScore * userRecentlyRating._2))
          if (userRecentlyRating._2 >= 3) {
            //增強因子起作用
            increMap(topSimMovie) = increMap.getOrDefault(topSimMovie, 0) + 1
          } else {
            //否則減弱因子起作用
            decreMap(topSimMovie) = increMap.getOrDefault(topSimMovie, 0) + 1
          }
        }
      }
      score.groupBy(_._1).map {
        case (mid, sims) =>
          (mid, sims.map(_._2).sum / sims.length + log(increMap(mid)) - log(decreMap(mid)))
      }.toArray //變成一個數組保存

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM