首先我們的數據源如下:
tbDate這張表記錄的是時間信息;
tbStockDetail這張表記錄的訂單的詳細信息;
tbStock這張表將訂單與實踐和地點兩個維度的信息連接在一起。
數據屬性如下:
每個訂單可能包含多個貨品,每個訂單可以產生多次交易,不同的貨品有不同的單價。也就是說:
tbStock與tbStockDetail是一對多的關系,ordernumber與itemid是一對多的關系
加載數據
數據存放於txt文件中;用SparkContext提供的textfile方法一行一行地將書讀出,封裝為case類,建立一個RDD。再將這個包含case類的RDD轉換成Dataset,在Dataset的基礎上,建立視圖
創建case類
case class tbDate(dateid:String,//日期 years:String,//年月 theyear:Int,//年 month:Int,//月 day:Int,//天 weekday:Int,//周天 week:Int,//周 quarter:Int,//季度 period:Int,//旬 halfmonth:Int)//半月 //訂單號,位置id,時期id case class tbStock(ordernumber:String,locationid:String,dateid:String) //訂單號,行號?,貨品id,數量,價格,銷售額 case class tbStockDetail(ordernumber:String, rownum:Int, itemid:String, number:Int, price:Double, amount:Double)
讀入數據
這里將數據的讀入代碼抽取出來封裝為三個方法,在主類中調用這三個方法
object ReadFromFile { def readDate(spark:SparkSession,path:String):Dataset[tbDate] ={ //引入隱式轉換 import spark.implicits._ //接下來讀入tbDate.txt數據,形成RDD,並轉換成DataSet val tbDateDS: Dataset[tbDate] = spark.sparkContext.textFile(path) .map { x => { val words: Array[String] = x.split("\\s{1,}") //封裝成tbDate對象 tbDate( words(0).trim(), words(1).trim(), words(2).trim.toInt, words(3).trim.toInt, words(4).trim.toInt, words(5).trim.toInt, words(6).trim.toInt, words(7).trim.toInt, words(8).trim.toInt, words(9).trim.toInt ) } }.toDS() tbDateDS.show() tbDateDS } def readStock(spark:SparkSession,path:String) : Dataset[tbStock] = { //引入隱式轉換 import spark.implicits._ //接下來讀入tbStock.txt數據,形成RDD,並轉換成DataSet val tbStockDS: Dataset[tbStock] = spark.sparkContext.textFile(path) .map { x => { val words: Array[String] = x.split("\\s{1,}") tbStock(words(0).trim, words(1).trim, words(2).trim) } }.toDS() tbStockDS.show() tbStockDS } def readStockDetail(spark:SparkSession,path:String) : Dataset[tbStockDetail] = { //引入隱式轉換 import spark.implicits._ //接下來讀入tbStockDetail.txt數據,形成RDD,並轉換成DataSet val dbStockDeatilDS = spark.sparkContext.textFile(path) .map { x => { val words: Array[String] = x.split("\\s{1,}") tbStockDetail( words(0).trim, words(1).trim.toInt, words(2).trim, words(3).trim.toInt, words(4).trim.toDouble, words(5).trim.toDouble ) } }.toDS() dbStockDeatilDS.show dbStockDeatilDS } }
注冊表
//然后創建Dataset val tbDateDS: Dataset[tbDate] = ReadFromFile.readDate(spark, "E:/idea/spark3/in/tbDate.txt") val tbStockDS: Dataset[tbStock] = ReadFromFile.readStock(spark, "E:/idea/spark3/in/tbStock.txt") val tbStockDetailDS: Dataset[tbStockDetail] = ReadFromFile.readStockDetail(spark, "E:/idea/spark3/in/tbStockDetail.txt") //注冊表 tbDateDS.createOrReplaceTempView("dateTable") tbStockDS.createOrReplaceTempView("stockTable") tbStockDetailDS.createOrReplaceTempView("stockDetailTable")
計算所有訂單中每年的銷售單數、銷售總額
查詢涉及到的字段有年份theyear,count(distinct ordernumber),sum(amount),而且是以年份來分組;
sql語句如下:
/* 將三個表進行連接,並按照年份來分組 count(ordernumber)得到銷售單數 sum(amount)得到銷售總額 */ val sql1 = "SELECT c.theyear, COUNT(DISTINCT a.ordernumber), SUM(b.amount) FROM " + "stockTable a JOIN stockDetailTable b ON a.ordernumber = b.ordernumber " + "JOIN dateTable c ON a.dateid = c.dateid " + "GROUP BY c.theyear " + "ORDER BY c.theyear" spark.sql(sql1).show
計算所有訂單每年最大金額訂單的銷售額
最后要得出的字段有
年份,訂單號,銷售額
比如在2007年,訂單b的銷售額達到2000,是最大的;
比如在2008年,訂單d的銷售額達到4000,是最大的;
第一步,查詢出三個字段:某個訂單在某個年份的銷售總額是多少
比如:
訂單a在2007年銷售總額是1000
訂單b在2007年銷售總額是2000
訂單c在2008年銷售總額是3000
訂單d在2008年銷售總額是4000
第二步,以年份為分組,在分組內求最大值
sql語句如下:
val sql2 = "SELECT a.ordernumber,c.theyear,sum(b.amount) AS sumOfAmount FROM " + "stockTable a JOIN stockDetailTable b ON a.ordernumber = b.ordernumber " + "JOIN dateTable c ON a.dateid = c.dateid " + "group by a.ordernumber,c.theyear" spark.sql(sql2).createOrReplaceTempView("tmpOfSumYear");
val sql3 = "SELECT * FROM tmpOfSumYear a WHERE sumOfAmount = " +
"(SELECT MAX(sumOfAmount) FROM tmpOfSumYear WHERE theyear = a.theyear GROUP BY theyear)"
spark.sql(sql3).show()
計算所有訂單中每年最暢銷的貨品
也就是找出每年銷售額最大的貨品,即為年度最暢銷貨品。由於這個需求與上一個高度雷同,僅僅把訂單id改成貨品id就可實現。所以不再討論。
坑:
1)在導入數據的時候,要注意數據的格式,比如錯誤的空格、空行,會導致莫名其妙的問題。
2)關於group by語句引發的語法錯誤,見博客https://www.cnblogs.com/chxyshaodiao/p/12411819.html
3)注意在建表的時候,要避開一些數據庫中的關鍵字,比如表名為order,字段為year。