首先我们的数据源如下:
tbDate这张表记录的是时间信息;
tbStockDetail这张表记录的订单的详细信息;
tbStock这张表将订单与实践和地点两个维度的信息连接在一起。
数据属性如下:
每个订单可能包含多个货品,每个订单可以产生多次交易,不同的货品有不同的单价。也就是说:
tbStock与tbStockDetail是一对多的关系,ordernumber与itemid是一对多的关系
加载数据
数据存放于txt文件中;用SparkContext提供的textfile方法一行一行地将书读出,封装为case类,建立一个RDD。再将这个包含case类的RDD转换成Dataset,在Dataset的基础上,建立视图
创建case类
case class tbDate(dateid:String,//日期 years:String,//年月 theyear:Int,//年 month:Int,//月 day:Int,//天 weekday:Int,//周天 week:Int,//周 quarter:Int,//季度 period:Int,//旬 halfmonth:Int)//半月 //订单号,位置id,时期id case class tbStock(ordernumber:String,locationid:String,dateid:String) //订单号,行号?,货品id,数量,价格,销售额 case class tbStockDetail(ordernumber:String, rownum:Int, itemid:String, number:Int, price:Double, amount:Double)
读入数据
这里将数据的读入代码抽取出来封装为三个方法,在主类中调用这三个方法
object ReadFromFile { def readDate(spark:SparkSession,path:String):Dataset[tbDate] ={ //引入隐式转换 import spark.implicits._ //接下来读入tbDate.txt数据,形成RDD,并转换成DataSet val tbDateDS: Dataset[tbDate] = spark.sparkContext.textFile(path) .map { x => { val words: Array[String] = x.split("\\s{1,}") //封装成tbDate对象 tbDate( words(0).trim(), words(1).trim(), words(2).trim.toInt, words(3).trim.toInt, words(4).trim.toInt, words(5).trim.toInt, words(6).trim.toInt, words(7).trim.toInt, words(8).trim.toInt, words(9).trim.toInt ) } }.toDS() tbDateDS.show() tbDateDS } def readStock(spark:SparkSession,path:String) : Dataset[tbStock] = { //引入隐式转换 import spark.implicits._ //接下来读入tbStock.txt数据,形成RDD,并转换成DataSet val tbStockDS: Dataset[tbStock] = spark.sparkContext.textFile(path) .map { x => { val words: Array[String] = x.split("\\s{1,}") tbStock(words(0).trim, words(1).trim, words(2).trim) } }.toDS() tbStockDS.show() tbStockDS } def readStockDetail(spark:SparkSession,path:String) : Dataset[tbStockDetail] = { //引入隐式转换 import spark.implicits._ //接下来读入tbStockDetail.txt数据,形成RDD,并转换成DataSet val dbStockDeatilDS = spark.sparkContext.textFile(path) .map { x => { val words: Array[String] = x.split("\\s{1,}") tbStockDetail( words(0).trim, words(1).trim.toInt, words(2).trim, words(3).trim.toInt, words(4).trim.toDouble, words(5).trim.toDouble ) } }.toDS() dbStockDeatilDS.show dbStockDeatilDS } }
注册表
//然后创建Dataset val tbDateDS: Dataset[tbDate] = ReadFromFile.readDate(spark, "E:/idea/spark3/in/tbDate.txt") val tbStockDS: Dataset[tbStock] = ReadFromFile.readStock(spark, "E:/idea/spark3/in/tbStock.txt") val tbStockDetailDS: Dataset[tbStockDetail] = ReadFromFile.readStockDetail(spark, "E:/idea/spark3/in/tbStockDetail.txt") //注册表 tbDateDS.createOrReplaceTempView("dateTable") tbStockDS.createOrReplaceTempView("stockTable") tbStockDetailDS.createOrReplaceTempView("stockDetailTable")
计算所有订单中每年的销售单数、销售总额
查询涉及到的字段有年份theyear,count(distinct ordernumber),sum(amount),而且是以年份来分组;
sql语句如下:
/* 将三个表进行连接,并按照年份来分组 count(ordernumber)得到销售单数 sum(amount)得到销售总额 */ val sql1 = "SELECT c.theyear, COUNT(DISTINCT a.ordernumber), SUM(b.amount) FROM " + "stockTable a JOIN stockDetailTable b ON a.ordernumber = b.ordernumber " + "JOIN dateTable c ON a.dateid = c.dateid " + "GROUP BY c.theyear " + "ORDER BY c.theyear" spark.sql(sql1).show
计算所有订单每年最大金额订单的销售额
最后要得出的字段有
年份,订单号,销售额
比如在2007年,订单b的销售额达到2000,是最大的;
比如在2008年,订单d的销售额达到4000,是最大的;
第一步,查询出三个字段:某个订单在某个年份的销售总额是多少
比如:
订单a在2007年销售总额是1000
订单b在2007年销售总额是2000
订单c在2008年销售总额是3000
订单d在2008年销售总额是4000
第二步,以年份为分组,在分组内求最大值
sql语句如下:
val sql2 = "SELECT a.ordernumber,c.theyear,sum(b.amount) AS sumOfAmount FROM " + "stockTable a JOIN stockDetailTable b ON a.ordernumber = b.ordernumber " + "JOIN dateTable c ON a.dateid = c.dateid " + "group by a.ordernumber,c.theyear" spark.sql(sql2).createOrReplaceTempView("tmpOfSumYear");
val sql3 = "SELECT * FROM tmpOfSumYear a WHERE sumOfAmount = " +
"(SELECT MAX(sumOfAmount) FROM tmpOfSumYear WHERE theyear = a.theyear GROUP BY theyear)"
spark.sql(sql3).show()
计算所有订单中每年最畅销的货品
也就是找出每年销售额最大的货品,即为年度最畅销货品。由于这个需求与上一个高度雷同,仅仅把订单id改成货品id就可实现。所以不再讨论。
坑:
1)在导入数据的时候,要注意数据的格式,比如错误的空格、空行,会导致莫名其妙的问题。
2)关于group by语句引发的语法错误,见博客https://www.cnblogs.com/chxyshaodiao/p/12411819.html
3)注意在建表的时候,要避开一些数据库中的关键字,比如表名为order,字段为year。