在之前的分享中,曾系統地介紹了Spark的基本原理和使用方式,感興趣的可以翻看之前的分享文章。在本篇分享中,將介紹一個完整的項目案例,該案例會真實還原企業中SparkSQL的開發流程,手把手教你構建一個基於SparkSQL的分析系統。為了講解方便,我會對代碼進行拆解,完整的代碼已上傳至GitHub,想看完整代碼可以去clone,記得給個Star。以下是全文,希望本文對你有所幫助。
https://github.com/jiamx/spark_project_practise
公眾號『大數據技術與數倉』,回復『資料』領取大數據資料包
項目介紹
數據集介紹
使用MovieLens的名稱為ml-25m.zip的數據集,使用的文件時movies.csv和ratings.csv,上述文件的下載地址為:
http://files.grouplens.org/datasets/movielens/ml-25m.zip
- movies.csv
該文件是電影數據,對應的為維表數據,大小為2.89MB,包括6萬多部電影,其數據格式為[movieId,title,genres],分別對應**[電影id,電影名稱,電影所屬分類]**,樣例數據如下所示:逗號分隔
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
- ratings.csv
該文件為定影評分數據,對應為事實表數據,大小為646MB,其數據格式為:[userId,movieId,rating,timestamp],分別對應**[用戶id,電影id,評分,時間戳]**,樣例數據如下所示:逗號分隔
1,296,5,1147880044
項目代碼結構
需求分析
- 需求1:查找電影評分個數超過5000,且平均評分較高的前十部電影名稱及其對應的平均評分
- 需求2:查找每個電影類別及其對應的平均評分
- 需求3:查找被評分次數較多的前十部電影
代碼講解
- DemoMainApp
該類是程序執行的入口,主要是獲取數據源,轉換成DataFrame,並調用封裝好的業務邏輯類。
object DemoMainApp {
// 文件路徑
private val MOVIES_CSV_FILE_PATH = "file:///e:/movies.csv"
private val RATINGS_CSV_FILE_PATH = "file:///e:/ratings.csv"
def main(args: Array[String]): Unit = {
// 創建spark session
val spark = SparkSession
.builder
.master("local[4]")
.getOrCreate
// schema信息
val schemaLoader = new SchemaLoader
// 讀取Movie數據集
val movieDF = readCsvIntoDataSet(spark, MOVIES_CSV_FILE_PATH, schemaLoader.getMovieSchema)
// 讀取Rating數據集
val ratingDF = readCsvIntoDataSet(spark, RATINGS_CSV_FILE_PATH, schemaLoader.getRatingSchema)
// 需求1:查找電影評分個數超過5000,且平均評分較高的前十部電影名稱及其對應的平均評分
val bestFilmsByOverallRating = new BestFilmsByOverallRating
//bestFilmsByOverallRating.run(movieDF, ratingDF, spark)
// 需求2:查找每個電影類別及其對應的平均評分
val genresByAverageRating = new GenresByAverageRating
//genresByAverageRating.run(movieDF, ratingDF, spark)
// 需求3:查找被評分次數較多的前十部電影
val mostRatedFilms = new MostRatedFilms
mostRatedFilms.run(movieDF, ratingDF, spark)
spark.close()
}
/** * 讀取數據文件,轉成DataFrame * * @param spark * @param path * @param schema * @return */
def readCsvIntoDataSet(spark: SparkSession, path: String, schema: StructType) = {
val dataSet = spark.read
.format("csv")
.option("header", "true")
.schema(schema)
.load(path)
dataSet
}
}
- Entry
該類為實體類,封裝了數據源的樣例類和結果表的樣例類
class Entry {
}
case class Movies(
movieId: String, // 電影的id
title: String, // 電影的標題
genres: String // 電影類別
)
case class Ratings(
userId: String, // 用戶的id
movieId: String, // 電影的id
rating: String, // 用戶評分
timestamp: String // 時間戳
)
// 需求1MySQL結果表
case class tenGreatestMoviesByAverageRating(
movieId: String, // 電影的id
title: String, // 電影的標題
avgRating: String // 電影平均評分
)
// 需求2MySQL結果表
case class topGenresByAverageRating(
genres: String, //電影類別
avgRating: String // 平均評分
)
// 需求3MySQL結果表
case class tenMostRatedFilms(
movieId: String, // 電影的id
title: String, // 電影的標題
ratingCnt: String // 電影被評分的次數
)
- SchemaLoader
該類封裝了數據集的schema信息,主要用於讀取數據源是指定schema信息
class SchemaLoader {
// movies數據集schema信息
private val movieSchema = new StructType()
.add("movieId", DataTypes.StringType, false)
.add("title", DataTypes.StringType, false)
.add("genres", DataTypes.StringType, false)
// ratings數據集schema信息
private val ratingSchema = new StructType()
.add("userId", DataTypes.StringType, false)
.add("movieId", DataTypes.StringType, false)
.add("rating", DataTypes.StringType, false)
.add("timestamp", DataTypes.StringType, false)
def getMovieSchema: StructType = movieSchema
def getRatingSchema: StructType = ratingSchema
}
- JDBCUtil
該類封裝了連接MySQL的邏輯,主要用於連接MySQL,在業務邏輯代碼中會使用該工具類獲取MySQL連接,將結果數據寫入到MySQL中。
object JDBCUtil {
val dataSource = new ComboPooledDataSource()
val user = "root"
val password = "123qwe"
val url = "jdbc:mysql://localhost:3306/mydb"
dataSource.setUser(user)
dataSource.setPassword(password)
dataSource.setDriverClass("com.mysql.jdbc.Driver")
dataSource.setJdbcUrl(url)
dataSource.setAutoCommitOnClose(false)
// 獲取連接
def getQueryRunner(): Option[QueryRunner]={
try {
Some(new QueryRunner(dataSource))
}catch {
case e:Exception =>
e.printStackTrace()
None
}
}
}
需求1實現
- BestFilmsByOverallRating
需求1實現的業務邏輯封裝。該類有一個run()方法,主要是封裝計算邏輯。
/** * 需求1:查找電影評分個數超過5000,且平均評分較高的前十部電影名稱及其對應的平均評分 */
class BestFilmsByOverallRating extends Serializable {
def run(moviesDataset: DataFrame, ratingsDataset: DataFrame, spark: SparkSession) = {
import spark.implicits._
// 將moviesDataset注冊成表
moviesDataset.createOrReplaceTempView("movies")
// 將ratingsDataset注冊成表
ratingsDataset.createOrReplaceTempView("ratings")
// 查詢SQL語句
val ressql1 =
""" |WITH ratings_filter_cnt AS ( |SELECT | movieId, | count( * ) AS rating_cnt, | avg( rating ) AS avg_rating |FROM | ratings |GROUP BY | movieId |HAVING | count( * ) >= 5000 |), |ratings_filter_score AS ( |SELECT | movieId, -- 電影id | avg_rating -- 電影平均評分 |FROM ratings_filter_cnt |ORDER BY avg_rating DESC -- 平均評分降序排序 |LIMIT 10 -- 平均分較高的前十部電影 |) |SELECT | m.movieId, | m.title, | r.avg_rating AS avgRating |FROM | ratings_filter_score r |JOIN movies m ON m.movieId = r.movieId """.stripMargin
val resultDS = spark.sql(ressql1).as[tenGreatestMoviesByAverageRating]
// 打印數據
resultDS.show(10)
resultDS.printSchema()
// 寫入MySQL
resultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))
}
/** * 獲取連接,調用寫入MySQL數據的方法 * * @param res */
private def insert2Mysql(res: tenGreatestMoviesByAverageRating): Unit = {
lazy val conn = JDBCUtil.getQueryRunner()
conn match {
case Some(connection) => {
upsert(res, connection)
}
case None => {
println("Mysql連接失敗")
System.exit(-1)
}
}
}
/** * 封裝將結果寫入MySQL的方法 * 執行寫入操作 * * @param r * @param conn */
private def upsert(r: tenGreatestMoviesByAverageRating, conn: QueryRunner): Unit = {
try {
val sql =
s""" |REPLACE INTO `ten_movies_averagerating`( |movieId, |title, |avgRating |) |VALUES |(?,?,?) """.stripMargin
// 執行insert操作
conn.update(
sql,
r.movieId,
r.title,
r.avgRating
)
} catch {
case e: Exception => {
e.printStackTrace()
System.exit(-1)
}
}
}
}
需求1結果
- 結果表建表語句
CREATE TABLE `ten_movies_averagerating` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
`movieId` int(11) NOT NULL COMMENT '電影id',
`title` varchar(100) NOT NULL COMMENT '電影名稱',
`avgRating` decimal(10,2) NOT NULL COMMENT '平均評分',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新時間',
PRIMARY KEY (`id`),
UNIQUE KEY `movie_id_UNIQUE` (`movieId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 統計結果
平均評分最高的前十部電影如下:
movieId | title | avgRating |
---|---|---|
318 | Shawshank Redemption, The (1994) | 4.41 |
858 | Godfather, The (1972) | 4.32 |
50 | Usual Suspects, The (1995) | 4.28 |
1221 | Godfather: Part II, The (1974) | 4.26 |
527 | Schindler’s List (1993) | 4.25 |
2019 | Seven Samurai (Shichinin no samurai) (1954) | 4.25 |
904 | Rear Window (1954) | 4.24 |
1203 | 12 Angry Men (1957) | 4.24 |
2959 | Fight Club (1999) | 4.23 |
1193 | One Flew Over the Cuckoo’s Nest (1975) | 4.22 |
上述電影評分對應的電影中文名稱為:
英文名稱 | 中文名稱 |
---|---|
Shawshank Redemption, The (1994) | 肖申克的救贖 |
Godfather, The (1972) | 教父1 |
Usual Suspects, The (1995) | 非常嫌疑犯 |
Godfather: Part II, The (1974) | 教父2 |
Schindler’s List (1993) | 辛德勒的名單 |
Seven Samurai (Shichinin no samurai) (1954) | 七武士 |
Rear Window (1954) | 后窗 |
12 Angry Men (1957) | 十二怒漢 |
Fight Club (1999) | 搏擊俱樂部 |
One Flew Over the Cuckoo’s Nest (1975) | 飛越瘋人院 |
需求2實現
- GenresByAverageRating
需求2實現的業務邏輯封裝。該類有一個run()方法,主要是封裝計算邏輯。
**
* 需求2:查找每個電影類別及其對應的平均評分
*/
class GenresByAverageRating extends Serializable {
def run(moviesDataset: DataFrame, ratingsDataset: DataFrame, spark: SparkSession) = {
import spark.implicits._
// 將moviesDataset注冊成表
moviesDataset.createOrReplaceTempView("movies")
// 將ratingsDataset注冊成表
ratingsDataset.createOrReplaceTempView("ratings")
val ressql2 =
""" |WITH explode_movies AS ( |SELECT | movieId, | title, | category |FROM | movies lateral VIEW explode ( split ( genres, "\\|" ) ) temp AS category |) |SELECT | m.category AS genres, | avg( r.rating ) AS avgRating |FROM | explode_movies m | JOIN ratings r ON m.movieId = r.movieId |GROUP BY | m.category | """.stripMargin
val resultDS = spark.sql(ressql2).as[topGenresByAverageRating]
// 打印數據
resultDS.show(10)
resultDS.printSchema()
// 寫入MySQL
resultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))
}
/** * 獲取連接,調用寫入MySQL數據的方法 * * @param res */
private def insert2Mysql(res: topGenresByAverageRating): Unit = {
lazy val conn = JDBCUtil.getQueryRunner()
conn match {
case Some(connection) => {
upsert(res, connection)
}
case None => {
println("Mysql連接失敗")
System.exit(-1)
}
}
}
/** * 封裝將結果寫入MySQL的方法 * 執行寫入操作 * * @param r * @param conn */
private def upsert(r: topGenresByAverageRating, conn: QueryRunner): Unit = {
try {
val sql =
s""" |REPLACE INTO `genres_average_rating`( |genres, |avgRating |) |VALUES |(?,?) """.stripMargin
// 執行insert操作
conn.update(
sql,
r.genres,
r.avgRating
)
} catch {
case e: Exception => {
e.printStackTrace()
System.exit(-1)
}
}
}
}
需求2結果
- 結果表建表語句
CREATE TABLE genres_average_rating (
`id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '自增id',
`genres` VARCHAR ( 100 ) NOT NULL COMMENT '電影類別',
`avgRating` DECIMAL ( 10, 2 ) NOT NULL COMMENT '電影類別平均評分',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新時間',
PRIMARY KEY ( `id` ),
UNIQUE KEY `genres_UNIQUE` ( `genres` )
) ENGINE = INNODB DEFAULT CHARSET = utf8;
- 統計結果
共有20個電影分類,每個電影分類的平均評分為:
genres | avgRating |
---|---|
Film-Noir | 3.93 |
War | 3.79 |
Documentary | 3.71 |
Crime | 3.69 |
Drama | 3.68 |
Mystery | 3.67 |
Animation | 3.61 |
IMAX | 3.6 |
Western | 3.59 |
Musical | 3.55 |
Romance | 3.54 |
Adventure | 3.52 |
Thriller | 3.52 |
Fantasy | 3.51 |
Sci-Fi | 3.48 |
Action | 3.47 |
Children | 3.43 |
Comedy | 3.42 |
(no genres listed) | 3.33 |
Horror | 3.29 |
電影分類對應的中文名稱為:
分類 | 中文名稱 |
---|---|
Film-Noir | 黑色電影 |
War | 戰爭 |
Documentary | 紀錄片 |
Crime | 犯罪 |
Drama | 歷史劇 |
Mystery | 推理 |
Animation | 動畫片 |
IMAX | 巨幕電影 |
Western | 西部電影 |
Musical | 音樂 |
Romance | 浪漫 |
Adventure | 冒險 |
Thriller | 驚悚片 |
Fantasy | 魔幻電影 |
Sci-Fi | 科幻 |
Action | 動作 |
Children | 兒童 |
Comedy | 喜劇 |
(no genres listed) | 未分類 |
Horror | 恐怖 |
需求3實現
-
MostRatedFilms
需求3實現的業務邏輯封裝。該類有一個run()方法,主要是封裝計算邏輯。
/** * 需求3:查找被評分次數較多的前十部電影. */
class MostRatedFilms extends Serializable {
def run(moviesDataset: DataFrame, ratingsDataset: DataFrame,spark: SparkSession) = {
import spark.implicits._
// 將moviesDataset注冊成表
moviesDataset.createOrReplaceTempView("movies")
// 將ratingsDataset注冊成表
ratingsDataset.createOrReplaceTempView("ratings")
val ressql3 =
""" |WITH rating_group AS ( | SELECT | movieId, | count( * ) AS ratingCnt | FROM ratings | GROUP BY movieId |), |rating_filter AS ( | SELECT | movieId, | ratingCnt | FROM rating_group | ORDER BY ratingCnt DESC | LIMIT 10 |) |SELECT | m.movieId, | m.title, | r.ratingCnt |FROM | rating_filter r |JOIN movies m ON r.movieId = m.movieId | """.stripMargin
val resultDS = spark.sql(ressql3).as[tenMostRatedFilms]
// 打印數據
resultDS.show(10)
resultDS.printSchema()
// 寫入MySQL
resultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))
}
/** * 獲取連接,調用寫入MySQL數據的方法 * * @param res */
private def insert2Mysql(res: tenMostRatedFilms): Unit = {
lazy val conn = JDBCUtil.getQueryRunner()
conn match {
case Some(connection) => {
upsert(res, connection)
}
case None => {
println("Mysql連接失敗")
System.exit(-1)
}
}
}
/** * 封裝將結果寫入MySQL的方法 * 執行寫入操作 * * @param r * @param conn */
private def upsert(r: tenMostRatedFilms, conn: QueryRunner): Unit = {
try {
val sql =
s""" |REPLACE INTO `ten_most_rated_films`( |movieId, |title, |ratingCnt |) |VALUES |(?,?,?) """.stripMargin
// 執行insert操作
conn.update(
sql,
r.movieId,
r.title,
r.ratingCnt
)
} catch {
case e: Exception => {
e.printStackTrace()
System.exit(-1)
}
}
}
}
需求3結果
- 結果表創建語句
CREATE TABLE ten_most_rated_films (
`id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '自增id',
`movieId` INT ( 11 ) NOT NULL COMMENT '電影Id',
`title` varchar(100) NOT NULL COMMENT '電影名稱',
`ratingCnt` INT(11) NOT NULL COMMENT '電影被評分的次數',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新時間',
PRIMARY KEY ( `id` ),
UNIQUE KEY `movie_id_UNIQUE` ( `movieId` )
) ENGINE = INNODB DEFAULT CHARSET = utf8;
- 統計結果
movieId | title | ratingCnt |
---|---|---|
356 | Forrest Gump (1994) | 81491 |
318 | Shawshank Redemption, The (1994) | 81482 |
296 | Pulp Fiction (1994) | 79672 |
593 | Silence of the Lambs, The (1991) | 74127 |
2571 | Matrix, The (1999) | 72674 |
260 | Star Wars: Episode IV - A New Hope (1977) | 68717 |
480 | Jurassic Park (1993) | 64144 |
527 | Schindler’s List (1993) | 60411 |
110 | Braveheart (1995) | 59184 |
2959 | Fight Club (1999) | 58773 |
評分次數較多的電影對應的中文名稱為:
英文名稱 | 中文名稱 |
---|---|
Forrest Gump (1994) | 阿甘正傳 |
Shawshank Redemption, The (1994) | 肖申克的救贖 |
Pulp Fiction (1994) | 低俗小說 |
Silence of the Lambs, The (1991) | 沉默的羔羊 |
Matrix, The (1999) | 黑客帝國 |
Star Wars: Episode IV - A New Hope (1977) | 星球大戰 |
Jurassic Park (1993) | 侏羅紀公園 |
Schindler’s List (1993) | 辛德勒的名單 |
Braveheart (1995) | 勇敢的心 |
Fight Club (1999) | 搏擊俱樂部 |
總結
本文主要是基於SparkSQL對MovieLens數據集進行統計分析,完整實現了三個需求,並給對每個需求都給出了詳細的代碼實現和結果分析。本案例還原了企業使用SparkSQL進行實現數據統計的基本流程,通過本文,或許你對SparkSQL的應用有了更加深刻的認識,希望本文對你有所幫助。
公眾號『大數據技術與數倉』,回復『資料』領取大數據資料包