電影推薦系統項目的數據處理部分


這個項目的整體業務邏輯是通過Spring進行搭建,並部署在Tomcat上的。業務產生的數據一部分被存儲到mongoDB並用於spark sql和ml的離線計算。另一部分被傳送到Flume,經kafka到達spark streaming進行實時計算。還有一部分數據存儲到redis,同樣運用到spark streaming上。本文主要關注spark相關的部分。項目的原始實現主要基於RDD,而且有不少低效的代碼實現。本文在此基礎上對80%的spark相關代碼進行了重寫,使新的實現在運行效率上提高了兩倍以上、內存使用減少了幾倍、代碼量也減少近一半。

項目架構

綜合業務:Spring、Tomcat

數據存儲:業務數據MongoDB、搜索服務數據ES、緩存數據Redis

離線和實施推薦:Spark DF、ML、Streaming

消息服務:Kafka

  1. 【數據加載】

    數據加載服務,主要用於項目的數據初始化,用於將三個數據集(Movies【電影的數據集】、Rating【用戶對於電影的評分】、Tags【用戶對於電影的標簽】)初始化到Mongodb數據庫以及ElasticSearch里面。

  2. 【離線推薦】

    • 通過Azkaban周期性的調度【離線統計服務】和【離線推薦服務】。
    • 【離線統計服務】
      • 最熱電影統計算法  =>  RateMoreMovies表中
      • 當前最熱電影統計算法   =>  RateMoreRecentlyMovies 表中
      • 電影的平均評分算法   =>  AvgMovies 表中
      • 電影類別TOP10推薦   =>   GenresTopMovies表中。
    • 【離線推薦服務】
      • 通過Spark ALS算法計算模型的訓練
      • 基於Model產生電影相似度矩陣   =>  MovieRecs表中
      • 基於Model產生用戶推薦矩陣    =>  UserRecs表中
  3. 【實時推薦】

    • 當一個用戶完成了對電影的評分之后,觸發實時推薦,后台服務將評分數據實時寫入到LOG日志中。
    • Kafka會通過kafkaStream程序將log隊列中的數據進行格式化,然后將數據推送到recommender的隊列中
    • Spark Streaming實時推薦程序讀取kafka中推送過來的數據,配合讀取Redis緩存中的數據,運行實時推薦算法【基於模型的推薦】,把結果數據寫入到MongoDB的StraemRecs表中
  4. 【推薦結果的聚合】

    • 綜合業務服務會根據一定的比例聚合 【離線推薦服務(ALS的協同過濾)】、【基於內容的推薦(基於ElasticSearch More like this功能)】、【實時推薦服務(基於模型的推薦)】這些結果。
    • 聚合完成之后,將結果返回到用戶端。

前期工作:數據加載

利用Spark SQL將數據分別導入到 MongoDB 和 ElasticSearch

目標

  • MongoDB

    1. 將Movie【電影數據集】/ Rating【用戶對電影的評分數據集】/ Tag【用戶對電影的標簽數據集】數據集分別加載到MongoDB數據庫中的Movie / Rating / Tag表中
  • ElasticSearch

    1. 需要將Movie【電影數據集】加載到ElasticSearch名叫Movie的Index中。需要將Tag數據和Movie數據融合。

步驟

  1. 先新建一個Maven項目,將依賴添加好。
  2. 分析數據集Movie、Rating、Tag
  3. Spark DF加載數據集
  4. 將DF加載到MongoDB中:
    • 將原來的Collection全部刪除
    • 通過DF的write方法將數據寫入
    • 創建數據庫索引
    • 關閉MongoDB連接
  5. 將DF加載到ElasticSearch中:
    • 將存在的Index刪除掉,然后創建新的Index
    • 通過DF的write方法將數據寫入
  6. 關閉Spark集群

數據格式說明

// MONGO_MOVIE_COLLECTION,包含電影ID、描述、時長、發行日期、拍攝日期、語言、類型、演員、導演
case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String, shoot: String, language: String, genres: String, actors: String, directors: String)

// MONGO_RATING_COLLECTION,包含用戶ID、電影ID、用戶對電影的評分、用戶評分時間
case class MovieRating(uid: Int, mid: Int, score: Double, timestamp: Int)

// MONGO_TAG_COLLECTION,包含用戶ID、電影ID、標簽的具體內容、用戶打標簽時間
case class Tag(uid: Int, mid: Int, tag: String, timestamp: Int)

加載數據到MongoDB

// 由於上面三種數據的加載處理步驟一樣,這里就只展示其中之一的MovieRating。
// 先構建schema,盡管spark有推斷功能,但是為了穩定,還是自己定義schema比較好。
val ratingSchema =  StructType(Array(
  StructField("uid", IntegerType, true),
  StructField("mid", IntegerType, true),
  StructField("score", DoubleType, true),
  StructField("timestamp", IntegerType, true)
  ))

// 讀取數據
val ratingDF = spark.read
  .format("csv")
  .schema(ratingSchema)
  .load(RATING_DATA_PATH)

// 新建一個到MongoDB的連接
val mongoClient = MongoClient(MongoClientURI(mongodbConfig.uri))
// 如果MongoDB中有對應的數據庫,那么應該刪除
mongoClient(mongodbConfig.db)(MONGO_RATING_COLLECTION).dropCollection()

// 寫入數據
ratingDF
      .write
      .option("uri", mongodbConfig.uri)
      .option("collection", MONGO_RATING_COLLECTION)
      .mode("overwrite")
      .format(MONGO_DRIVER_CLASS)
      .save()

// 對數據表建索引
mongoClient(mongodbConfig.db)(MONGO_RATING_COLLECTION).createIndex(MongoDBObject("uid" -> 1))

// 關閉MongoDB連接
mongoClient.close()

將數據寫入ES

這里先講Tag數據進行轉換,並與movie表合並,最后將這個表加載到ES。

// 對tag數據進行處理,使之變為下面格式
/**
* |MID | Tags |
* |1   | tag1|tag2|tag3|tag4....|
*/
val newTag = tagDF.groupBy($"mid")
  .agg(concat_ws("|", collect_set($"tag")).as("tags"))
  .select("mid", "tags")

// 需要將處理后的Tag數據,和Moive數據融合,產生新的Movie數據
val joinExpression = movieDF.col("mid") === newTag.col("mid")
val movieWithTagsDF = movieDF.join(newTag, joinExpression, "left")

// 需要將新的Movie數據保存到ES中
//新建一個配置
val settings = Settings.builder()
  .put("cluster.name", eSConfig.clustername)
  .build()

//新建一個ES的客戶端
val esClient = new PreBuiltTransportClient(settings)

//需要將 TransportHosts 添加到 esClient 中
val REGEX_HOST_PORT = "(.+):(\\d+)".r
eSConfig.transportHosts.split(",")
  .foreach {
    // 通過正則表達提取
    case REGEX_HOST_PORT(host: String, port: String) =>
      // 將所有的 es節點 的地址都加入到 esClient 中
      esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port.toInt))
  }

//需要清除掉ES中遺留的數據
if (esClient.admin().indices().exists(
  new IndicesExistsRequest(eSConfig.index)).actionGet().isExists) {
  esClient.admin().indices().delete(new DeleteIndexRequest(eSConfig.index))
}
esClient.admin().indices().create(new CreateIndexRequest(eSConfig.index))

//將數據寫入到ES中
movieDF
  .write
  .option("es.nodes", eSConfig.httpHosts)
  .option("es.http.timeout", "100m")
  .option("es.mapping.id", "mid") // es地址中取得的id值會被賦值到 _id ,這和 mid 不一樣,所以用 mapping 做轉換
  .mode("overwrite")
  .format(ES_DRIVER_CLASS)
  .save(eSConfig.index + "/" + ES_TYPE)

離線推薦

統計推薦

目標

  1. 優質電影

    獲取所有歷史數據中評分最多的電影的集合,統計每個電影的評分數 => RateMoreMovies

  2. 熱門電影

    認為按照月來統計,這個月中評分數量最多的電影我們認為是熱門電影,統計每個月中的每個電影的評分數量 => RateMoreRecentlyMovie

  3. 統計電影的平均評分

    將Rating數據集中所有的用戶評分數據進行平均,計算每一個電影的平均評分 => AverageMovies

  4. 統計每種類別電影的TOP10電影

    將每種類別的電影中評分最高的10個電影分別計算出來, => GenresTopMovies

步驟

  1. 新建一個項目StaticticsRecommender

  2. 統計所有歷史數據中電影的評分個數

    通過Rating數據集,用mid進行groupby操作,count計算總數

  3. 統計以月為單位的電影的評分個數

    • 利用from_unixtime對timestamp進行轉換,結果格式為yyyyMM
    • 通過groupby 年月,mid 來完成統計
  4. 統計每個電影評分得分
    通過Rating數據集,用戶mid進行group by操作,avg計算評分

  5. 統計每種類別中評分最高的10個電影

    • 需要通過JOIN操作將電影的平均評分數據和Movie數據集進行合並,產生MovieWithScore數據集
    • 利用split和explode將MovieWithScore拆開為以一種Genre為開頭的MovieWithScore
    • 此處有兩種方案(這里展示第一種,但最好的是建立Aggregator,在ALS離線推薦中介紹):
      • 通過Genre作為Key,進行groupByKey操作,將相同電影類別的電影進行聚集並按評分取TOP10。(TopN利用PriorityQueue實現)
      • 利用repartitionAndSortWithinPartitions、mapPartition和groupSorted(輪子)實現分Genre的TOP10。
    • 上面數據轉化為DF后輸出到MongoDB中

離線計算由原來的RDD改為基於Spark DataFrame的實現,而DF本身比RDD具有更多的默認優化,比如Tungsten運用了堆外內存,減少GC開銷、序列化比RDD的Java默認序列化更加高效、全代碼生成等優勢。上述是效率提升是理論上的,下面是實際上的。

首先,DF相比於RDD能這大大地減少了代碼量。下面的功能就是簡單地統計每部電影的評分數、以月為單位統計每部電影的評分數、每部電影平均分、不同類型電影的評分top10(即分組topn)。這里需要連接電影數據集和評分集,前者包含了電影類型,后者包含平均分。原代碼使用RDD的groupByKey,這是一種低效的操作,因為它不會進行map-side聚合,而且分組后使用map,直接調用sortWith對每個分組進行排序,再用slice進行取top10。這種實現並不好,在Scala中運用sortWith雖然方便,但是效率很低,它比toArray.sort慢不少。slice也是,用take更高效。但實際上這里RDD的實現也不需要用全排序再取前10。利用aggregateByKey加優先隊列,即堆排的實現,效率更高。其用時由原來的20多秒變為6秒左右。當然,在DF中可用windowfunction來實現topn,但是windowfunction的底層實現是先shuffle然后sort,最后根據所定義的窗口進行相應的聚合。這一過程同樣沒有map-side聚合,所以盡管數據量少時速度更快,但在數據量大時或許不是一個好的topn實現。其實我覺得最好的topn實現應該是自定義一個aggregator,這個在離線推薦部分會提到。

// 統計所有歷史數據中電影的評分個數
val rateMoreMoviesDF = ratingDF
  .groupBy($"mid")
  .agg(count($"mid"))

// 統計以月為單位的電影評分個數
val dateFormat = "yyyyMM"
val rateMoreRecentLyMovies = ratingDF
  .withColumn("yearmouth", from_unixtime($"timestamp", dateFormat))
  .groupBy($"yearmouth", $"mid")
  .agg(count($"mid") as "count")
  .select($"mid", $"count", $"yearmouth")

// 統計每個電影的平均得分
val averageMoviesDF = ratingDF
  .select($"mid", $"score")
  .groupBy($"mid")
  .agg(avg($"score").as("avg"))

// 計算不同類別電影的Top10
val joinExpression = movieDF.col("mid") === averageMoviesDF.col("mid")
// 用 inner join 將沒有評分的電影會除去
val movieWithScoreRDD = movieDF
  .join(averageMoviesDF, joinExpression, "inner")
  .withColumn("splitted_genres", split($"genres", "\\|"))
  .withColumn("single_genre", explode($"splitted_genres"))
  .map(row => {
    (row.getAs[String]("single_genre"), (row.getAs[Int]("mid"), row.getAs[Double]("avg")))
  })
  .rdd

val ord = new Ordering[(Int, Double)] {
  override def compare(x: (Int, Double), y: (Int, Double)): Int = {
    val compare2 = x._2.compareTo(y._2)
    if (compare2 != 0) -compare2 else 0
  }
}

// 利用aggregateByKey和PriorityQueue實現map-side aggregation
val res = movieWithScoreRDD
  .aggregateByKey(new mutable.PriorityQueue[(Int, Double)]()(ord))((acc, v) => {
  if (acc.length < 10) {
    acc.enqueue(v)
    acc
  } else {
    acc += ord.min(acc.dequeue(), v)
  }
},
  (acc1, acc2) => {
    val acc0 = acc1 ++ acc2
    while (acc0.length > 10) {
      acc0.dequeue()
    }
    acc0
  }).toDF()

ALS離線推薦

目標

  1. 訓練ALS推薦模型並計算出屬於用戶各自的Top20電影推薦
  2. 計算電影相似度矩陣

步驟

  1. 訓練ALS推薦模型

    • 實例化ALS模型,按常規的模型訓練流程訓練模型后調用recommendForAllUsers完成推薦。
  2. 計算電影相似度矩陣

    • 獲取電影的特征矩陣,轉換成DoubleMatrix
    • 電影的特征矩陣之間做笛卡爾積,通過余弦相似度計算兩個電影的相似度
    • 將數據通過GroupBy處理后,輸出
  3. ALS模型的參數選擇

    • 通過計算ALS的均方根誤差來判斷參數的優劣程度

ALS模型需要三列,用戶id、電影id和評分,並從中計算出用戶矩陣和電影矩陣。項目需要給每個用戶推薦20部電影,此處我直接使用了2.2提供的recommendForAllUsers函數便直接得到了結果。但后續項目還要求求出電影的相似度矩陣,用於后續實時推薦計算。這個算法spark沒有現成的實現,但是思路根recommendForAllUsers是一樣的。就是先求電影矩陣自身的cross join,然后求每部電影對除其自身外的其他所有電影的余弦相似度。公式為兩個向量的點積除以兩個向量長度的乘積。然后取出每部電影中,余弦相似度前100的電影,這樣子構建出電影的相似度矩陣。我根據spark的recommendForAll的源碼,重新實現了這一功能,這比原代碼的實現快上1倍,而且cross join產生的中間數據的內存也減少了幾倍,由幾G到了幾十M。實際上這里也得出了一個cross join的優化方式.....然后就是取每部電影的前100相似的電影。這里同樣是分組TopN,但源碼使用了私有的TopNAggregator。同樣我也實現了一個,這是我覺得在數據量大時最好的TopN實現。首先它基於DS,也享受了DF的部分優化(比如Catalyst、序列化等)這是優於rdd的aggregateByKey的部分,然后,它包含了map-side聚合,減少了shuffle期間需要傳輸的數據,這是優於windowfunc的部分。

// 訓練模型的一般步驟
val alsModel = new ALS()
val paramGrid = new ParamGridBuilder()
  .addGrid(alsModel.rank, Seq(5))
  .addGrid(alsModel.regParam, Seq(1.0))
  .build()

val regEval = new RegressionEvaluator()
  .setLabelCol("score")
  .setPredictionCol("prediction")
  .setMetricName("rmse")

val tvs = new TrainValidationSplit()
  .setTrainRatio(0.5)
  .setEstimatorParamMaps(paramGrid)
  .setEstimator(alsModel)
  .setEvaluator(regEval)

val trainedModel = tvs.fit(trainData)
val bestModel = trainedModel.bestModel.asInstanceOf[ALS]
val fittedALS = bestModel.fit(trainData)

// 2.2.0 新增方法,直接可以對訓練集中的 user 或 item 進行推薦
val perUserPredictions = fittedALS.recommendForAllUsers(20)

// 計算電影相似度矩陣,Spark2.2只有根據用戶推薦電影,或者反過來,並沒有更具電影推薦電影的,但是思路其實是一樣的,這里根據spark源碼中recommendForAllUsers的原理實現電影相似度的計算。
// 提取item因子
val movieFeatures = fittedALS.itemFactors
// cross join優化的預處理
val ready2Crossjoin = movieFeatures.as[(Int, Array[Float])]
  .mapPartitions(_.grouped(4096))

val ratings = ready2Crossjoin
  .crossJoin(ready2Crossjoin)
  .as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])]
  .flatMap {
    case (mf1Iter, mf2Iter) =>
      val m1 = mf1Iter.size
      val m2 = math.min(mf2Iter.size, 100)
      var i = 0
      val output = new Array[(Int, Int, Float)](m1 * m2)
      val pq = mutable.PriorityQueue[(Int, Float)]()(Ordering.by(-_._2))
      val vectorOp = new F2jBLAS
      mf1Iter.foreach { case (m1Id, mf1Factor) =>
        mf2Iter.foreach { case (m2Id, mf2Factor) =>
          if (m1Id == m2Id) {
            // do nothing
          } else {
            val simScore = consinSim(ALSRank, vectorOp, mf1Factor, mf2Factor)
            if (pq.length < m2) {
              pq.enqueue((m2Id, simScore))
            } else {
              val temp = pq.dequeue()
              pq += (if (temp._2 > simScore) temp else (m2Id, simScore))
            }
          }
        }
        pq.foreach { case (mf2Id, score) =>
          output(i) = (m1Id, mf2Id, score)
          i += 1
        }
        pq.clear()
      }
      output.toSeq
  }

// TopNAggregator在下面進行介紹。
val topNAggregator = new TopNAggregator[Int, Int, Float](10, Ordering.by(-_._2))
val rowRes = ratings.as[(Int, Int, Float)]
  .groupByKey(_._1)
  .agg(topNAggregator.toColumn)
  .toDF("moviceID", "similar_movieID")

// 用於轉換數據格式,實際上是將“_2”中的FloatType轉換為DoubleType
val arrayType = ArrayType(
  new StructType()
    .add("_1", IntegerType)
    .add("_2", DoubleType)
)

// 得出結果
val res = rowRes.select($"moviceID", $"similar_movieID".cast(arrayType))

TopNAggregator的實現

class TopNAggregator[K1: TypeTag, K2: TypeTag, V: TypeTag](num: Int, ord: Ordering[(K2, V)])
  extends Aggregator[(K1, K2, V), mutable.PriorityQueue[(K2, V)], Array[(K2, V)]] {

  override def zero: mutable.PriorityQueue[(K2, V)] = new mutable.PriorityQueue[(K2, V)]()(ord)

  override def reduce(q: mutable.PriorityQueue[(K2, V)],
                       a: (K1, K2, V)): mutable.PriorityQueue[(K2, V)] = {
    if (q.size < num) {
      q += ((a._2, a._3))
    } else {
      q += ord.min((a._2, a._3), q.dequeue)
    }
  }

  override def merge(q1: mutable.PriorityQueue[(K2, V)],
                      q2: mutable.PriorityQueue[(K2, V)]): mutable.PriorityQueue[(K2, V)] = {
    q1 ++= q2
    while (q1.length > num) {
      q1.dequeue()
    }
    q1
  }

  override def finish(r: mutable.PriorityQueue[(K2, V)]): Array[(K2, V)] = {
    r.toArray.sorted(ord.reverse)
  }

  override def bufferEncoder: Encoder[mutable.PriorityQueue[(K2, V)]] = {
    Encoders.kryo[mutable.PriorityQueue[(K2, V)]]
  }

  override def outputEncoder: Encoder[Array[(K2, V)]] = ExpressionEncoder[Array[(K2, V)]]()
}

實時推薦

日志預處理:構建kafka流,從大量日志數據中過濾出特定前綴的用戶評分log,並傳到recommender topic讓下面的spark streaming接收。

  1. 構建LogProcessor實現Processor接口,實現對於數據的處理
  2. 構建StreamsConfig配置數據
  3. 構建TopologyBuilder來設置數據處理拓撲關系,就是輸入、處理器和輸出的關系。
  4. 構建KafkaStreams來啟動整個處理

前提

  1. 在Redis集群中存儲了每一個用戶最近對電影的K次評分。實時算法可以快速獲取。
  2. 離線推薦算法已經將電影相似度矩陣提前計算到了MongoDB中。
  3. Kafka已經獲取到了用戶實時的評分數據。

讀取MonogDB中的相似度矩陣然后廣播。讀取kafka stream,提取里面的uid、mid、score評分、timestamp,即某個用戶對某部電影的進行評分從而產生的日志,然后foreachRDD,對每條日志執行下面三個計算。從被廣播的相似度矩陣中獲取這次被評分的電影P最相似的K個電影(當然,實現還會從評分集中獲取用戶已看過的電影,並將這些電影過濾掉)這些作為推薦的候選電影。然后獲取用戶最近N次電影評分,實際是從redis中讀取,這些數據用於調整候選電影的優先級別。后面更多就是利用Scala實現算法的思想了,和spark等技術相關性不大。首先新建一個數組存儲候選電影的評分、兩個map存儲候選電影的增強和減弱因子。然后對每個候選電影進行遍歷,遍歷中再遍歷用戶最近評分的電影。針對每個候選電影,看用戶最近評分的電影中有沒有與它相似的,有的話返回其在相似度表中的分數,沒有就返回0.0。然后如果相似度大於0.6,就將這個相似度作為權重,乘以用戶對其相似電影的評分。得出候選電影的初步評分。然后根據用戶對該相似電影的評分,如果大於3,則候選電影的增強因子+1,否則在減弱因子中-1。最后計算所有部候選電影的初步評分的均值,並加上其增強因子的對數,減去其減弱因子的對數,得出最終得分,並把這個得分表插入到MongoDB用於用戶評分后的推薦。

// 獲取當前最近的M次電影評分。實際是從redis中讀取,結果為Array[(Int, Double)],int是mid,double是評分
val userRecentlyRatings = getUserRecentRatings(ConnHelper.jedis, uid, MAX_USER_RATINGS_COUNT)

// 獲取這次被評分的電影P最相似的K個電影。結果為Array[Int]。思路是先從廣播中的map獲取相似電影Array[(Int, Double)],然后從MONGO_RATING_COLLECTION中獲取用戶已看過的電影,得到Array[Int],最后從相似的電影中filter用戶沒看過的,且只返回mid。
// simMovies實際上就是待推薦電影(僅僅根據相似度表得出的結果)。從電影相似度表中找出與用戶本次評分的電影相似的電影,並去掉用戶已經看過的。
val simMovies = getTopSimMovies(MAX_SIM_MOVIES_NUM, mid, uid, simMoviesBV.value)

// 計算待選電影的推薦優先級別
val streamRecs = computeMovieScores(simMoviesBV.value, userRecentlyRatings, simMovies)
// 具體實現如下
def computeMovieScores(simMovies: Map[Int, Array[(Int, Double)]], userRecentlyRatings: Array[(Int, Double)],
                       topSimMovie: Array[Int]): Array[(Int, Double)] ={
  // 用於保存每個待選電影和最近評分的每一個電影的權重得分
  val score = ArrayBuffer[(Int, Double)]()

  // 用於保存每個電影的增強因子
  val increMap = mutable.HashMap[Int, Int]()

  // 用於保存每個電影的減弱因子
  val decreMap = mutable.HashMap[Int, Int]()

  for (topSimMovie <- topSimMovie; userRecentlyRating <- userRecentlyRatings) {
    // 針對每個topSimMovie,看與它相似的電影中有沒有用戶最近評分的電影,有的話返回其在相似度表中的分數,沒有就返回0.0。具體實現看下面1
    val simScore = getMoviesSimScore(simMovies, userRecentlyRating._1, topSimMovie)
    if (simScore > 0.6) {
      // topSimMovie的相似電影中如果有被用戶最近評過分的電影,且在相似度表的分數大於0.6,就將這個分數 * 用戶對topSimMovie相似電影中的某部電影的分數,並存儲到score
      score += ((topSimMovie, simScore * userRecentlyRating._2))
      // 根據用戶最近的評分對每一個topSimMovie的評分進行調整。
      if (userRecentlyRating._2 > 3) {
        increMap(topSimMovie) = increMap.getOrElse(topSimMovie, 0) + 1
      } else {
        decreMap(topSimMovie) = decreMap.getOrElse(topSimMovie, 0) + 1
      }
    }
  }
  // 最后分組聚合score,同時利用increMap和decreMap對分數進行調整
  score.groupBy(_._1)
    .map{case (mid, sims) => (mid, sims.map(_._2).sum / sims.length + log(increMap(mid)) - log(decreMap(mid)))}
    .toArray
}

// 1
def getMoviesSimScore(simMovies: Map[Int, Array[(Int, Double)]], userRatingMovie: Int, topSimMovie: Int): Double ={
  simMovies.get(topSimMovie) match {
    case Some(sim) => // sim:與某部電影相似的一列電影,包含mid和score
      var res = 0.0
      sim.foreach(smid =>
        res = if (smid._1 == userRatingMovie) smid._2 else 0.0
      )
      res

    case None => 0.0
  }
}

// 最后將結果存儲到MongoDB


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM