1. 導入隱式轉換
import spark.implicits._
2. 讀取 / 存儲 mongodb 數據並轉換為對象 df (不 as 轉換也是 DataFrame 對象,但一般會習慣轉換一下在進行操作)
case class Rating(val uid: Int, val mid: Int, val score: Double, val timestamp: Int) val ratingsDF = spark.read .option("uri", mongoConfig.uri) .option("collection", MONGO_RATINGS_COLLECTION) .format("com.mongodb.spark.sql") .load() .as[Rating] .toDF() count.write .option("uri", mongoConfig.uri) .option("collection", MONGO_MOVIE_SCORE_COUNT) .format("com.mongodb.spark.sql") .mode("overwrite") .save()
3. 將 DataFrame 轉換為 sql 表進行操作, 如果例如有時間格式化等功能需要加入 sql 語句中,需要 注冊一個 UDF 函數 來操作
//統計以月為單位每個電影的評分數 val simpleDateFormat = new SimpleDateFormat("yyyyMM") //注冊一個UDF函數, 用於將timestamp轉換成年月形式 spark.udf.register("changeDate", (x:Int) => simpleDateFormat.format(new Date(x*1000L)).toInt) val mouthCountDF = spark.sql("select mid, count(mid) count, changeDate(timestamp) as yearmouth from ratings group by yearmouth,mid order by count desc")
4. 將 2 個 RDD 通過某個字段進行 join
//統計每種電影類別中評分最高的10個電影 val movieWithScore = movieDF.join(avgMoviesDF, Seq("mid", "mid"))
5. 將 list 轉化為 RDD
val genres = List("Action", "Adventure", "Animation", "Comedy", "Ccrime", "Documentary", "Drama", "Family", "Fantasy", "Foreign", "History", "Horror", "Music", "Mystery", "Romance", "Science", "Tv", "Thriller", "War", "Western") val genresRDD = spark.sparkContext.makeRDD(genres)
6. 過濾 某個 RDD 中包含另一個 RDD 的數據
genresRDD.cartesian(movieWithScore.rdd) .filter{case (genres, row) => row.getAs[String]("genres").contains(genres) }
7. 如果一個RDD 格式為 RDD[(String, Iterable[(Integer, Double)])], 這樣的格式,需要對 rdd 中的迭代器中的某個字段進行排序, 例如 Double 這個字段, 並且只取前 10 條記錄
.map{case (genres, item) => (GenresRecommendation(genres, item.toList.sortWith(_._2 > _._2).take(10).map(x => Recommendation(x._1, x._2)))) }
8. 推薦算法 用戶相似度推薦算法 (根據用戶對各個電影的評分,計算出用戶除了評分電影外,還有可能對那些電影感興趣,包含計算的字段就是 用戶,電影,評分)
//1.首先需要做一個 ALS訓練模型 model,那么久需要創建一個訓練集, 訓練集 為 Rating(org.apache.spark.mllib.recommendation.Rating) 對象, 字段為 3個,如下,: x._1, x._2, x._3
x._1 為 uid 用戶id, x._2 為 mid 電影 id, x._3 為評分 這樣的三個字段, 所以需要先將 ratingsRDD 轉化成這3個字段的rdd,
val trainData = ratingsRDD.map(x => Rating(x._1, x._2, x._3))
//2.設置訓練參數
val (rank, iterations, lambda) = (50, 10, 0.01)
//3.創建訓練 ALS模型, 也就是后面要進行相似度計算的時候要以這個model為原模型進行匹配計算
val model = ALS.train(trainData, rank, iterations, lambda)
//4.要使用這個 model 進行計算,還需要有一個格式為 RDD[(Int, Int)] 這樣一個待計算的 Rdd,因為 model 模型創建的時候是 uid, mid, score, 所以這個RDD 第一個 Int 應該是uid, 第二個RDD 是 mid
所以要先獲得這樣一個 RDD, 首先獲得一個 RDD[Int] 格式的 userRDD, 然后獲得一個 RDD[Int] 格式的 movieRDD, 再做 笛卡爾積 就可以獲得 RDD[(Int, Int)]
val userMovies = userRDD.cartesian(movieRDD)
//5.使用 predict 方法計算相似度
val preRatings = model.predict(userMovies)
//轉換成我們希望的推薦對象矩陣 (計算出的 preRatings 是一個Rating對象, 通過如下字段就能拿到我們想要的數據, 格式為 RDD[(Int, (Int, Double))],也就是 RDD[(uid, (mid, score))]),然后再通過
需求轉換成我們想要的格式活對象
val userRecommender = preRatings.map(rating => (rating.user, (rating.product, rating.rating)))
.groupByKey()
.map {
case (uid, recs) => UserRecommender(uid, recs.toList.sortWith(_._2 > _._2).take(20).map(x => Recommenderation(x._1, x._2)))
}.toDF()
9. 電影相似度推薦算法 (電影相似度是 電影矩陣中 uid, mid, score, 用戶對很多電影進行過評分,根據用戶對電影的喜好計算出那些電影比較相似, 所以還是使用上面的 model 來進行計算)
//1. model.productFeatures 將會獲得一個 RDD[(Int, Array[Double])] 的 RDD, 需要將 Array[Double] 轉化為 DoubleMatrix
val movieFeatures = model.productFeatures.map { case (mid, freatures) => (mid, new DoubleMatrix(freatures)) }
//電影相識度矩陣計算
val movieRecommender = movieFeatures.cartesian(movieFeatures)
.filter { case (a, b) => a._1 != b._1 }
.map {
case (a, b) =>
val simSocore = this.consinSim(a._2, b._2)
(a._1, (b._1, simSocore))
}.filter(_._2._2 > 0.6)
.groupByKey()
.map {
case (mid, recs) => (MovieRecommender(mid, recs.toList.sortWith(_._2 > _._2).map(x => Recommenderation(x._1, x._2))))
}.toDF()
def consinSim(movie1: DoubleMatrix, movie2: DoubleMatrix): Double = {
movie1.dot(movie2) / (movie1.norm2() * movie2.norm2())
}