scala spark 示例代碼


1.   導入隱式轉換

import spark.implicits._

2. 讀取 / 存儲 mongodb 數據並轉換為對象 df (不 as 轉換也是 DataFrame 對象,但一般會習慣轉換一下在進行操作)

case class Rating(val uid: Int, val mid: Int, val score: Double, val timestamp: Int)

val ratingsDF = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGO_RATINGS_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[Rating]
      .toDF()
  count.write
    .option("uri", mongoConfig.uri)
    .option("collection", MONGO_MOVIE_SCORE_COUNT)
    .format("com.mongodb.spark.sql")
    .mode("overwrite")
    .save() 

3. 將 DataFrame 轉換為 sql 表進行操作,   如果例如有時間格式化等功能需要加入 sql 語句中,需要 注冊一個 UDF 函數 來操作

   //統計以月為單位每個電影的評分數
    val simpleDateFormat = new SimpleDateFormat("yyyyMM")
    //注冊一個UDF函數, 用於將timestamp轉換成年月形式
    spark.udf.register("changeDate", (x:Int) => simpleDateFormat.format(new Date(x*1000L)).toInt)
    val mouthCountDF = spark.sql("select mid, count(mid) count, changeDate(timestamp) as yearmouth from ratings group by yearmouth,mid order by count desc")

4. 將 2 個 RDD 通過某個字段進行 join

//統計每種電影類別中評分最高的10個電影
val movieWithScore = movieDF.join(avgMoviesDF, Seq("mid", "mid"))

5. 將 list 轉化為 RDD

val genres = List("Action", "Adventure", "Animation", "Comedy", "Ccrime", "Documentary", "Drama", "Family", "Fantasy", "Foreign", "History", "Horror", "Music", "Mystery", "Romance", "Science", "Tv", "Thriller", "War", "Western") 
val genresRDD = spark.sparkContext.makeRDD(genres)

6. 過濾 某個 RDD 中包含另一個 RDD 的數據

  genresRDD.cartesian(movieWithScore.rdd)
      .filter{case (genres, row) =>
         row.getAs[String]("genres").contains(genres)
    }

7. 如果一個RDD 格式為    RDD[(String, Iterable[(Integer, Double)])],  這樣的格式,需要對 rdd 中的迭代器中的某個字段進行排序, 例如 Double 這個字段, 並且只取前 10 條記錄

  .map{case (genres, item) =>
      (GenresRecommendation(genres, item.toList.sortWith(_._2 > _._2).take(10).map(x => Recommendation(x._1, x._2))))
    }

8. 推薦算法  用戶相似度推薦算法 (根據用戶對各個電影的評分,計算出用戶除了評分電影外,還有可能對那些電影感興趣,包含計算的字段就是 用戶,電影,評分)

//1.首先需要做一個 ALS訓練模型 model,那么久需要創建一個訓練集, 訓練集 為 Rating(org.apache.spark.mllib.recommendation.Rating) 對象, 字段為 3個,如下,: x._1, x._2, x._3   
x._1 為 uid 用戶id, x._2 為 mid 電影 id, x._3 為評分 這樣的三個字段, 所以需要先將 ratingsRDD 轉化成這3個字段的rdd,
val trainData = ratingsRDD.map(x => Rating(x._1, x._2, x._3))
//2.設置訓練參數
val (rank, iterations, lambda) = (50, 10, 0.01)  

  //3.創建訓練 ALS模型,  也就是后面要進行相似度計算的時候要以這個model為原模型進行匹配計算
  val model = ALS.train(trainData, rank, iterations, lambda)

 //4.要使用這個 model 進行計算,還需要有一個格式為 RDD[(Int, Int)] 這樣一個待計算的 Rdd,因為 model 模型創建的時候是  uid, mid, score,  所以這個RDD 第一個 Int 應該是uid, 第二個RDD 是 mid

 所以要先獲得這樣一個 RDD,  首先獲得一個 RDD[Int] 格式的 userRDD,  然后獲得一個 RDD[Int] 格式的 movieRDD, 再做 笛卡爾積 就可以獲得 RDD[(Int, Int)]

  val userMovies = userRDD.cartesian(movieRDD)

  //5.使用 predict 方法計算相似度

 val preRatings = model.predict(userMovies)

 //轉換成我們希望的推薦對象矩陣 (計算出的 preRatings 是一個Rating對象, 通過如下字段就能拿到我們想要的數據, 格式為 RDD[(Int, (Int, Double))],也就是 RDD[(uid, (mid, score))]),然后再通過

  需求轉換成我們想要的格式活對象
 val userRecommender = preRatings.map(rating => (rating.user, (rating.product, rating.rating)))
  .groupByKey()
  .map {
     case (uid, recs) => UserRecommender(uid, recs.toList.sortWith(_._2 > _._2).take(20).map(x => Recommenderation(x._1, x._2)))
  }.toDF()

  

 9. 電影相似度推薦算法 (電影相似度是 電影矩陣中  uid, mid, score, 用戶對很多電影進行過評分,根據用戶對電影的喜好計算出那些電影比較相似, 所以還是使用上面的 model 來進行計算)

//1. model.productFeatures 將會獲得一個  RDD[(Int, Array[Double])] 的 RDD, 需要將 Array[Double] 轉化為 DoubleMatrix
val movieFeatures = model.productFeatures.map {
    case (mid, freatures) =>
      (mid, new DoubleMatrix(freatures))
  }

  //電影相識度矩陣計算
  val movieRecommender = movieFeatures.cartesian(movieFeatures)
    .filter { case (a, b) => a._1 != b._1 }
    .map {
      case (a, b) =>
    val simSocore = this.consinSim(a._2, b._2)
    (a._1, (b._1, simSocore))
    }.filter(_._2._2 > 0.6)
    .groupByKey()
    .map {
      case (mid, recs) => (MovieRecommender(mid, recs.toList.sortWith(_._2 > _._2).map(x => Recommenderation(x._1, x._2))))
    }.toDF()

  def consinSim(movie1: DoubleMatrix, movie2: DoubleMatrix): Double = {
    movie1.dot(movie2) / (movie1.norm2() * movie2.norm2())
  }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM