第十一篇|基於SparkSQL的電影分析項目實戰


在之前的分享中,曾系統地介紹了Spark的基本原理和使用方式,感興趣的可以翻看之前的分享文章。在本篇分享中,將介紹一個完整的項目案例,該案例會真實還原企業中SparkSQL的開發流程,手把手教你構建一個基於SparkSQL的分析系統。為了講解方便,我會對代碼進行拆解,完整的代碼已上傳至GitHub,想看完整代碼可以去clone,記得給個Star。以下是全文,希望本文對你有所幫助。

https://github.com/jiamx/spark_project_practise

公眾號『大數據技術與數倉』,回復『資料』領取大數據資料包

項目介紹

數據集介紹

使用MovieLens的名稱為ml-25m.zip的數據集,使用的文件時movies.csvratings.csv,上述文件的下載地址為:

http://files.grouplens.org/datasets/movielens/ml-25m.zip
  • movies.csv

該文件是電影數據,對應的為維表數據,大小為2.89MB,包括6萬多部電影,其數據格式為[movieId,title,genres],分別對應**[電影id,電影名稱,電影所屬分類]**,樣例數據如下所示:逗號分隔

1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
  • ratings.csv

該文件為定影評分數據,對應為事實表數據,大小為646MB,其數據格式為:[userId,movieId,rating,timestamp],分別對應**[用戶id,電影id,評分,時間戳]**,樣例數據如下所示:逗號分隔

1,296,5,1147880044

項目代碼結構

需求分析

  • 需求1:查找電影評分個數超過5000,且平均評分較高的前十部電影名稱及其對應的平均評分

  • 需求2:查找每個電影類別及其對應的平均評分

  • 需求3:查找被評分次數較多的前十部電影

代碼講解

  • DemoMainApp

該類是程序執行的入口,主要是獲取數據源,轉換成DataFrame,並調用封裝好的業務邏輯類。

object DemoMainApp {
  // 文件路徑
  private val MOVIES_CSV_FILE_PATH = "file:///e:/movies.csv"
  private val RATINGS_CSV_FILE_PATH = "file:///e:/ratings.csv"

  def main(args: Array[String]): Unit = {
    // 創建spark session
    val spark = SparkSession
      .builder
      .master("local[4]")
      .getOrCreate
    // schema信息
    val schemaLoader = new SchemaLoader
    // 讀取Movie數據集
    val movieDF = readCsvIntoDataSet(spark, MOVIES_CSV_FILE_PATH, schemaLoader.getMovieSchema)
    // 讀取Rating數據集
    val ratingDF = readCsvIntoDataSet(spark, RATINGS_CSV_FILE_PATH, schemaLoader.getRatingSchema)

    // 需求1:查找電影評分個數超過5000,且平均評分較高的前十部電影名稱及其對應的平均評分
    val bestFilmsByOverallRating = new BestFilmsByOverallRating
    //bestFilmsByOverallRating.run(movieDF, ratingDF, spark)

    // 需求2:查找每個電影類別及其對應的平均評分
    val genresByAverageRating = new GenresByAverageRating
    //genresByAverageRating.run(movieDF, ratingDF, spark)

    // 需求3:查找被評分次數較多的前十部電影
    val mostRatedFilms = new MostRatedFilms
    mostRatedFilms.run(movieDF, ratingDF, spark)

    spark.close()

  }
  /** * 讀取數據文件,轉成DataFrame * * @param spark * @param path * @param schema * @return */
  def readCsvIntoDataSet(spark: SparkSession, path: String, schema: StructType) = {

    val dataSet = spark.read
      .format("csv")
      .option("header", "true")
      .schema(schema)
      .load(path)
    dataSet
  }
}
  • Entry

該類為實體類,封裝了數據源的樣例類和結果表的樣例類

class Entry {

}

case class Movies(
                   movieId: String, // 電影的id
                   title: String, // 電影的標題
                   genres: String // 電影類別
                 )

case class Ratings(
                    userId: String, // 用戶的id
                    movieId: String, // 電影的id
                    rating: String, // 用戶評分
                    timestamp: String // 時間戳
                  )

// 需求1MySQL結果表
case class tenGreatestMoviesByAverageRating(
                                             movieId: String, // 電影的id
                                             title: String, // 電影的標題
                                             avgRating: String // 電影平均評分
                                           )

// 需求2MySQL結果表
case class topGenresByAverageRating(
                                     genres: String, //電影類別
                                     avgRating: String // 平均評分
                                   )

// 需求3MySQL結果表
case class tenMostRatedFilms(
                              movieId: String, // 電影的id
                              title: String, // 電影的標題
                              ratingCnt: String // 電影被評分的次數
                            )
  • SchemaLoader

該類封裝了數據集的schema信息,主要用於讀取數據源是指定schema信息

class SchemaLoader {
  // movies數據集schema信息
  private val movieSchema = new StructType()
    .add("movieId", DataTypes.StringType, false)
    .add("title", DataTypes.StringType, false)
    .add("genres", DataTypes.StringType, false)
 // ratings數據集schema信息
  private val ratingSchema = new StructType()
    .add("userId", DataTypes.StringType, false)
    .add("movieId", DataTypes.StringType, false)
    .add("rating", DataTypes.StringType, false)
    .add("timestamp", DataTypes.StringType, false)

  def getMovieSchema: StructType = movieSchema

  def getRatingSchema: StructType = ratingSchema
}
  • JDBCUtil

該類封裝了連接MySQL的邏輯,主要用於連接MySQL,在業務邏輯代碼中會使用該工具類獲取MySQL連接,將結果數據寫入到MySQL中。

object JDBCUtil {
  val dataSource = new ComboPooledDataSource()
  val user = "root"
  val password = "123qwe"
  val url = "jdbc:mysql://localhost:3306/mydb"

  dataSource.setUser(user)
  dataSource.setPassword(password)
  dataSource.setDriverClass("com.mysql.jdbc.Driver")
  dataSource.setJdbcUrl(url)
  dataSource.setAutoCommitOnClose(false)
// 獲取連接
  def getQueryRunner(): Option[QueryRunner]={
    try {
      Some(new QueryRunner(dataSource))
    }catch {
      case e:Exception =>
        e.printStackTrace()
        None
    }
  }
}

需求1實現

  • BestFilmsByOverallRating

需求1實現的業務邏輯封裝。該類有一個run()方法,主要是封裝計算邏輯。

/** * 需求1:查找電影評分個數超過5000,且平均評分較高的前十部電影名稱及其對應的平均評分 */
class BestFilmsByOverallRating extends Serializable {

  def run(moviesDataset: DataFrame, ratingsDataset: DataFrame, spark: SparkSession) = {
    import spark.implicits._

    // 將moviesDataset注冊成表
    moviesDataset.createOrReplaceTempView("movies")
    // 將ratingsDataset注冊成表
    ratingsDataset.createOrReplaceTempView("ratings")

    // 查詢SQL語句
    val ressql1 =
      """ |WITH ratings_filter_cnt AS ( |SELECT | movieId, | count( * ) AS rating_cnt, | avg( rating ) AS avg_rating |FROM | ratings |GROUP BY | movieId |HAVING | count( * ) >= 5000 |), |ratings_filter_score AS ( |SELECT | movieId, -- 電影id | avg_rating -- 電影平均評分 |FROM ratings_filter_cnt |ORDER BY avg_rating DESC -- 平均評分降序排序 |LIMIT 10 -- 平均分較高的前十部電影 |) |SELECT | m.movieId, | m.title, | r.avg_rating AS avgRating |FROM | ratings_filter_score r |JOIN movies m ON m.movieId = r.movieId """.stripMargin

    val resultDS = spark.sql(ressql1).as[tenGreatestMoviesByAverageRating]
    // 打印數據
    resultDS.show(10)
    resultDS.printSchema()
    // 寫入MySQL
    resultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))
  }

  /** * 獲取連接,調用寫入MySQL數據的方法 * * @param res */
  private def insert2Mysql(res: tenGreatestMoviesByAverageRating): Unit = {
    lazy val conn = JDBCUtil.getQueryRunner()
    conn match {
      case Some(connection) => {
        upsert(res, connection)
      }
      case None => {
        println("Mysql連接失敗")
        System.exit(-1)
      }
    }
  }

  /** * 封裝將結果寫入MySQL的方法 * 執行寫入操作 * * @param r * @param conn */
  private def upsert(r: tenGreatestMoviesByAverageRating, conn: QueryRunner): Unit = {
    try {
      val sql =
        s""" |REPLACE INTO `ten_movies_averagerating`( |movieId, |title, |avgRating |) |VALUES |(?,?,?) """.stripMargin
      // 執行insert操作
      conn.update(
        sql,
        r.movieId,
        r.title,
        r.avgRating
      )
    } catch {
      case e: Exception => {
        e.printStackTrace()
        System.exit(-1)
      }
    }
  }
}

需求1結果

  • 結果表建表語句
CREATE TABLE `ten_movies_averagerating` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `movieId` int(11) NOT NULL COMMENT '電影id',
  `title` varchar(100) NOT NULL COMMENT '電影名稱',
  `avgRating` decimal(10,2) NOT NULL COMMENT '平均評分',
  `update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新時間',
  PRIMARY KEY (`id`),
  UNIQUE KEY `movie_id_UNIQUE` (`movieId`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8;
  • 統計結果

平均評分最高的前十部電影如下:

movieId title avgRating
318 Shawshank Redemption, The (1994) 4.41
858 Godfather, The (1972) 4.32
50 Usual Suspects, The (1995) 4.28
1221 Godfather: Part II, The (1974) 4.26
527 Schindler’s List (1993) 4.25
2019 Seven Samurai (Shichinin no samurai) (1954) 4.25
904 Rear Window (1954) 4.24
1203 12 Angry Men (1957) 4.24
2959 Fight Club (1999) 4.23
1193 One Flew Over the Cuckoo’s Nest (1975) 4.22

上述電影評分對應的電影中文名稱為:

英文名稱 中文名稱
Shawshank Redemption, The (1994) 肖申克的救贖
Godfather, The (1972) 教父1
Usual Suspects, The (1995) 非常嫌疑犯
Godfather: Part II, The (1974) 教父2
Schindler’s List (1993) 辛德勒的名單
Seven Samurai (Shichinin no samurai) (1954) 七武士
Rear Window (1954) 后窗
12 Angry Men (1957) 十二怒漢
Fight Club (1999) 搏擊俱樂部
One Flew Over the Cuckoo’s Nest (1975) 飛越瘋人院

需求2實現

  • GenresByAverageRating

需求2實現的業務邏輯封裝。該類有一個run()方法,主要是封裝計算邏輯。

**
  * 需求2:查找每個電影類別及其對應的平均評分
  */
class GenresByAverageRating extends Serializable {
  def run(moviesDataset: DataFrame, ratingsDataset: DataFrame, spark: SparkSession) = {
    import spark.implicits._
    // 將moviesDataset注冊成表
    moviesDataset.createOrReplaceTempView("movies")
    // 將ratingsDataset注冊成表
    ratingsDataset.createOrReplaceTempView("ratings")

    val ressql2 =
      """ |WITH explode_movies AS ( |SELECT | movieId, | title, | category |FROM | movies lateral VIEW explode ( split ( genres, "\\|" ) ) temp AS category |) |SELECT | m.category AS genres, | avg( r.rating ) AS avgRating |FROM | explode_movies m | JOIN ratings r ON m.movieId = r.movieId |GROUP BY | m.category | """.stripMargin

    val resultDS = spark.sql(ressql2).as[topGenresByAverageRating]

    // 打印數據
    resultDS.show(10)
    resultDS.printSchema()
    // 寫入MySQL
    resultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))

  }

  /** * 獲取連接,調用寫入MySQL數據的方法 * * @param res */
  private def insert2Mysql(res: topGenresByAverageRating): Unit = {
    lazy val conn = JDBCUtil.getQueryRunner()
    conn match {
      case Some(connection) => {
        upsert(res, connection)
      }
      case None => {
        println("Mysql連接失敗")
        System.exit(-1)
      }
    }
  }

  /** * 封裝將結果寫入MySQL的方法 * 執行寫入操作 * * @param r * @param conn */
  private def upsert(r: topGenresByAverageRating, conn: QueryRunner): Unit = {
    try {
      val sql =
        s""" |REPLACE INTO `genres_average_rating`( |genres, |avgRating |) |VALUES |(?,?) """.stripMargin
      // 執行insert操作
      conn.update(
        sql,
        r.genres,
        r.avgRating
      )
    } catch {
      case e: Exception => {
        e.printStackTrace()
        System.exit(-1)
      }
    }
  }
}

需求2結果

  • 結果表建表語句
CREATE TABLE genres_average_rating (
    `id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '自增id',
    `genres` VARCHAR ( 100 ) NOT NULL COMMENT '電影類別',
    `avgRating` DECIMAL ( 10, 2 ) NOT NULL COMMENT '電影類別平均評分',
    `update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新時間',
PRIMARY KEY ( `id` ),
UNIQUE KEY `genres_UNIQUE` ( `genres` )
) ENGINE = INNODB DEFAULT CHARSET = utf8;
  • 統計結果

共有20個電影分類,每個電影分類的平均評分為:

genres avgRating
Film-Noir 3.93
War 3.79
Documentary 3.71
Crime 3.69
Drama 3.68
Mystery 3.67
Animation 3.61
IMAX 3.6
Western 3.59
Musical 3.55
Romance 3.54
Adventure 3.52
Thriller 3.52
Fantasy 3.51
Sci-Fi 3.48
Action 3.47
Children 3.43
Comedy 3.42
(no genres listed) 3.33
Horror 3.29

電影分類對應的中文名稱為:

分類 中文名稱
Film-Noir 黑色電影
War 戰爭
Documentary 紀錄片
Crime 犯罪
Drama 歷史劇
Mystery 推理
Animation 動畫片
IMAX 巨幕電影
Western 西部電影
Musical 音樂
Romance 浪漫
Adventure 冒險
Thriller 驚悚片
Fantasy 魔幻電影
Sci-Fi 科幻
Action 動作
Children 兒童
Comedy 喜劇
(no genres listed) 未分類
Horror 恐怖

需求3實現

  • MostRatedFilms

    需求3實現的業務邏輯封裝。該類有一個run()方法,主要是封裝計算邏輯。

/** * 需求3:查找被評分次數較多的前十部電影. */
class MostRatedFilms extends Serializable {
   def run(moviesDataset: DataFrame, ratingsDataset: DataFrame,spark: SparkSession) = {

     import spark.implicits._

     // 將moviesDataset注冊成表
     moviesDataset.createOrReplaceTempView("movies")
     // 將ratingsDataset注冊成表
     ratingsDataset.createOrReplaceTempView("ratings")

val ressql3 =
  """ |WITH rating_group AS ( | SELECT | movieId, | count( * ) AS ratingCnt | FROM ratings | GROUP BY movieId |), |rating_filter AS ( | SELECT | movieId, | ratingCnt | FROM rating_group | ORDER BY ratingCnt DESC | LIMIT 10 |) |SELECT | m.movieId, | m.title, | r.ratingCnt |FROM | rating_filter r |JOIN movies m ON r.movieId = m.movieId | """.stripMargin

     val resultDS = spark.sql(ressql3).as[tenMostRatedFilms]
     // 打印數據
     resultDS.show(10)
     resultDS.printSchema()
     // 寫入MySQL
     resultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))

  }

  /** * 獲取連接,調用寫入MySQL數據的方法 * * @param res */
  private def insert2Mysql(res: tenMostRatedFilms): Unit = {
    lazy val conn = JDBCUtil.getQueryRunner()
    conn match {
      case Some(connection) => {
        upsert(res, connection)
      }
      case None => {
        println("Mysql連接失敗")
        System.exit(-1)
      }
    }
  }

  /** * 封裝將結果寫入MySQL的方法 * 執行寫入操作 * * @param r * @param conn */
  private def upsert(r: tenMostRatedFilms, conn: QueryRunner): Unit = {
    try {
      val sql =
        s""" |REPLACE INTO `ten_most_rated_films`( |movieId, |title, |ratingCnt |) |VALUES |(?,?,?) """.stripMargin
      // 執行insert操作
      conn.update(
        sql,
        r.movieId,
        r.title,
        r.ratingCnt
      )
    } catch {
      case e: Exception => {
        e.printStackTrace()
        System.exit(-1)
      }
    }
  }

}

需求3結果

  • 結果表創建語句
CREATE TABLE ten_most_rated_films (
    `id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '自增id',
    `movieId` INT ( 11 ) NOT NULL COMMENT '電影Id',
    `title` varchar(100) NOT NULL COMMENT '電影名稱',
    `ratingCnt` INT(11) NOT NULL COMMENT '電影被評分的次數',
    `update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新時間',
PRIMARY KEY ( `id` ),
UNIQUE KEY `movie_id_UNIQUE` ( `movieId` )
) ENGINE = INNODB DEFAULT CHARSET = utf8;
  • 統計結果
movieId title ratingCnt
356 Forrest Gump (1994) 81491
318 Shawshank Redemption, The (1994) 81482
296 Pulp Fiction (1994) 79672
593 Silence of the Lambs, The (1991) 74127
2571 Matrix, The (1999) 72674
260 Star Wars: Episode IV - A New Hope (1977) 68717
480 Jurassic Park (1993) 64144
527 Schindler’s List (1993) 60411
110 Braveheart (1995) 59184
2959 Fight Club (1999) 58773

評分次數較多的電影對應的中文名稱為:

英文名稱 中文名稱
Forrest Gump (1994) 阿甘正傳
Shawshank Redemption, The (1994) 肖申克的救贖
Pulp Fiction (1994) 低俗小說
Silence of the Lambs, The (1991) 沉默的羔羊
Matrix, The (1999) 黑客帝國
Star Wars: Episode IV - A New Hope (1977) 星球大戰
Jurassic Park (1993) 侏羅紀公園
Schindler’s List (1993) 辛德勒的名單
Braveheart (1995) 勇敢的心
Fight Club (1999) 搏擊俱樂部

總結

本文主要是基於SparkSQL對MovieLens數據集進行統計分析,完整實現了三個需求,並給對每個需求都給出了詳細的代碼實現和結果分析。本案例還原了企業使用SparkSQL進行實現數據統計的基本流程,通過本文,或許你對SparkSQL的應用有了更加深刻的認識,希望本文對你有所幫助。

公眾號『大數據技術與數倉』,回復『資料』領取大數據資料包


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM