SparkSQL(六)——案例實戰


首先我們的數據源如下:

 

tbDate這張表記錄的是時間信息;

tbStockDetail這張表記錄的訂單的詳細信息;

tbStock這張表將訂單與實踐和地點兩個維度的信息連接在一起。

數據屬性如下:

每個訂單可能包含多個貨品,每個訂單可以產生多次交易,不同的貨品有不同的單價。也就是說:

tbStock與tbStockDetail是一對多的關系,ordernumber與itemid是一對多的關系

加載數據

數據存放於txt文件中;用SparkContext提供的textfile方法一行一行地將書讀出,封裝為case類,建立一個RDD。再將這個包含case類的RDD轉換成Dataset,在Dataset的基礎上,建立視圖

創建case類

case class tbDate(dateid:String,//日期
                  years:String,//年月
                  theyear:Int,//
                  month:Int,//
                  day:Int,//
                  weekday:Int,//周天
                  week:Int,//
                  quarter:Int,//季度
                  period:Int,//
                  halfmonth:Int)//半月

//訂單號,位置id,時期id
case class tbStock(ordernumber:String,locationid:String,dateid:String)

//訂單號,行號?,貨品id,數量,價格,銷售額
case class tbStockDetail(ordernumber:String, rownum:Int, itemid:String, number:Int, price:Double, amount:Double)

 

讀入數據

這里將數據的讀入代碼抽取出來封裝為三個方法,在主類中調用這三個方法

object ReadFromFile {


  def readDate(spark:SparkSession,path:String):Dataset[tbDate] ={

    //引入隱式轉換
    import spark.implicits._

    //接下來讀入tbDate.txt數據,形成RDD,並轉換成DataSet
    val tbDateDS: Dataset[tbDate] = spark.sparkContext.textFile(path)
      .map {
        x => {
          val words: Array[String] = x.split("\\s{1,}")
          //封裝成tbDate對象
          tbDate(
            words(0).trim(),
            words(1).trim(),
            words(2).trim.toInt,
            words(3).trim.toInt,
            words(4).trim.toInt,
            words(5).trim.toInt,
            words(6).trim.toInt,
            words(7).trim.toInt,
            words(8).trim.toInt,
            words(9).trim.toInt
          )
        }
      }.toDS()

    tbDateDS.show()
    tbDateDS
  }

  def readStock(spark:SparkSession,path:String) : Dataset[tbStock] = {

    //引入隱式轉換
    import spark.implicits._

    //接下來讀入tbStock.txt數據,形成RDD,並轉換成DataSet
    val tbStockDS: Dataset[tbStock] = spark.sparkContext.textFile(path)
      .map {
        x => {
          val words: Array[String] = x.split("\\s{1,}")
          tbStock(words(0).trim, words(1).trim, words(2).trim)
        }
      }.toDS()

    tbStockDS.show()
    tbStockDS
  }

  def readStockDetail(spark:SparkSession,path:String) : Dataset[tbStockDetail] = {

    //引入隱式轉換
    import spark.implicits._

    //接下來讀入tbStockDetail.txt數據,形成RDD,並轉換成DataSet
    val dbStockDeatilDS = spark.sparkContext.textFile(path)
      .map {
        x => {
          val words: Array[String] = x.split("\\s{1,}")
          tbStockDetail(
            words(0).trim,
            words(1).trim.toInt,
            words(2).trim,
            words(3).trim.toInt,
            words(4).trim.toDouble,
            words(5).trim.toDouble
          )
        }
      }.toDS()

    dbStockDeatilDS.show
    dbStockDeatilDS
  }
}

注冊表

//然后創建Dataset
    val tbDateDS: Dataset[tbDate] = ReadFromFile.readDate(spark, "E:/idea/spark3/in/tbDate.txt")
    val tbStockDS: Dataset[tbStock] = ReadFromFile.readStock(spark, "E:/idea/spark3/in/tbStock.txt")
    val tbStockDetailDS: Dataset[tbStockDetail] = ReadFromFile.readStockDetail(spark, "E:/idea/spark3/in/tbStockDetail.txt")

    //注冊表
    tbDateDS.createOrReplaceTempView("dateTable")
    tbStockDS.createOrReplaceTempView("stockTable")
    tbStockDetailDS.createOrReplaceTempView("stockDetailTable")

 

計算所有訂單中每年的銷售單數、銷售總額

查詢涉及到的字段有年份theyear,count(distinct ordernumber),sum(amount),而且是以年份來分組;

sql語句如下:

 

/*
    將三個表進行連接,並按照年份來分組
    count(ordernumber)得到銷售單數
    sum(amount)得到銷售總額
    */
    val sql1 = "SELECT c.theyear, COUNT(DISTINCT a.ordernumber), SUM(b.amount) FROM " +
      "stockTable a JOIN stockDetailTable b ON a.ordernumber = b.ordernumber " +
      "JOIN dateTable c ON a.dateid = c.dateid " +
      "GROUP BY c.theyear " +
      "ORDER BY c.theyear"

    spark.sql(sql1).show

 

 

計算所有訂單每年最大金額訂單的銷售額 

最后要得出的字段有
年份,訂單號,銷售額
比如在2007年,訂單b的銷售額達到2000,是最大的;
比如在2008年,訂單d的銷售額達到4000,是最大的;

第一步,查詢出三個字段:某個訂單在某個年份的銷售總額是多少
比如:
訂單a在2007年銷售總額是1000
訂單b在2007年銷售總額是2000
訂單c在2008年銷售總額是3000
訂單d在2008年銷售總額是4000

第二步,以年份為分組,在分組內求最大值

sql語句如下:

val sql2 = "SELECT a.ordernumber,c.theyear,sum(b.amount) AS sumOfAmount FROM " +
      "stockTable a JOIN stockDetailTable b ON a.ordernumber = b.ordernumber " +
      "JOIN dateTable c ON a.dateid = c.dateid " +
      "group by a.ordernumber,c.theyear"

    spark.sql(sql2).createOrReplaceTempView("tmpOfSumYear");
val sql3 = "SELECT * FROM tmpOfSumYear a  WHERE sumOfAmount = " +
"(SELECT MAX(sumOfAmount) FROM tmpOfSumYear WHERE theyear = a.theyear GROUP BY theyear)"
 spark.sql(sql3).show() 

 

計算所有訂單中每年最暢銷的貨品

也就是找出每年銷售額最大的貨品,即為年度最暢銷貨品。由於這個需求與上一個高度雷同,僅僅把訂單id改成貨品id就可實現。所以不再討論。

 

坑:

1)在導入數據的時候,要注意數據的格式,比如錯誤的空格、空行,會導致莫名其妙的問題。

2)關於group by語句引發的語法錯誤,見博客https://www.cnblogs.com/chxyshaodiao/p/12411819.html

3)注意在建表的時候,要避開一些數據庫中的關鍵字,比如表名為order,字段為year。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM