【大數據】SparkSql學習筆記


 

1Spark SQL概述

1.1 什么是Spark SQL

Spark SQLSpark用來處理結構化數據的一個模塊,它提供了2個編程抽象:DataFrame

DataSet,並且作為分布式SQL查詢引擎的作用。

我們已經學習了Hive,它是將Hive SQL轉換成MapReduce然后提交到集群上執行,大大簡化了編寫MapReduc的程序的復雜性,由於MapReduce這種計算模型執行效率比較慢。所有Spark SQL的應運而生,它是將Spark SQL轉換成RDD,然后提交到集群執行,執行效率非常快!

1.2 Spark SQL的特點

1)易整合

 

2)統一的數據訪問方式

 

3)兼容Hive

 

4)標准的數據連接

 

1.3 什么是DataFrame

RDD類似,DataFrame也是一個分布式數據容器。然而DataFrame更像傳統數據庫的二維表格,除了數據以外,還記錄數據的結構信息,即schema。同時,與Hive類似,DataFrame也支持嵌套數據類型(structarraymap)。從API易用性的角度上看,DataFrame API提供的是一套高層的關系操作,比函數式的RDD API要更加友好,門檻更低。

 

 上圖直觀地體現了DataFrameRDD的區別。左側的RDD[Person]雖然以Person為類型參數,但Spark框架本身不了解Person類的內部結構。而右側的DataFrame卻提供了詳細的結構信息,使得Spark SQL可以清楚地知道該數據集中包含哪些列,每列的名稱和類型各是什么。DataFrame是為數據提供了Schema的視圖。可以把它當做數據庫中的一張表來對待DataFrame也是懶執行的性能上比RDD高,主要原因:

優化的執行計划查詢計划通過Spark catalyst optimiser進行優化

 

比如下面一個例子

 

 為了說明查詢優化,我們來看上圖展示的人口數據分析的示例。圖中構造了兩個DataFrame,將它們join之后又做了一次filter操作。如果原封不動地執行這個執行計划,最終的執行效率是不高的。因為join是一個代價較大的操作,也可能會產生一個較大的數據集。如果我們能將filter下推到 join下方,先對DataFrame進行過濾,再join過濾后的較小的結果集,便可以有效縮短執行時間。而Spark SQL的查詢優化器正是這樣做的。簡而言之,邏輯查詢計划優化就是一個利用基於關系代數的等價變換,將高成本的操作替換為低成本操作的過程。

1.4 什么是DataSet

1Dataframe API的一個擴展,是Spark最新的數據抽象。

2用戶友好的API風格,既具有類型安全檢查也具有Dataframe的查詢優化特性

3Dataset支持編解碼器,當需要訪問非堆上的數據時可以避免反序列化整個對象,提高了效率。

4)樣例類被用來在Dataset中定義數據的結構信息,樣例類中每個屬性的名稱直接映射到DataSet中的字段名稱。

5 DataframeDataset的特列,DataFrame=Dataset[Row] ,所以可以通過as方法將Dataframe轉換為DatasetRow是一個類型,跟CarPerson這些的類型一樣,所有的表結構信息我都用Row來表示。

6DataSet是強類型的。比如可以有Dataset[Car]Dataset[Person].

7)DataFrame只是知道字段,但是不知道字段的類型,所以在執行這些操作的時候是沒辦法在編譯的時候檢查是否類型失敗的,比如你可以對一個String進行減法操作,在執行的時候才報錯,而DataSet不僅僅知道字段,而且知道字段類型,所以有更嚴格的錯誤檢查。就跟JSON對象和類對象之間的類比。

2SparkSQL編程

2.1 SparkSession新的起始點

在老的版本中,SparkSQL提供兩種SQL查詢起始點:一個叫SQLContext,用於Spark自己提供的SQL查詢;一個叫HiveContext,用於連接Hive的查詢。

SparkSessionSpark最新的SQL查詢起始點,實質上是SQLContextHiveContext的組合,所以在SQLContextHiveContext上可用的APISparkSession上同樣是可以使用的。SparkSession內部封裝了sparkContext,所以計算實際上是由sparkContext完成的。

2.2 DataFrame

2.2.1

Spark SQLSparkSession是創建DataFrame和執行SQL的入口,創建DataFrame有三種方式:通過Spark的數據源進行創建;從一個存在的RDD進行轉換;還可以從Hive Table進行查詢返回。

1)Spark數據源

(1)查看Spark數據源建的文件格式

scala> spark.read.

csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile

2)讀取json文件創建DataFrame

scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

(3)展示結果

scala> df.show

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

+----+-------+

2)從RDD進行轉換

2.5節我們專門討論

3)從Hive Table進行查詢返回

3.3節我們專門討論

2.2.2 SQL風格語法(主要)

1)創建一個DataFrame

scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

2)對DataFrame創建一個臨時表

scala> df.createOrReplaceTempView("people")

3)通過SQL語句實現查詢全表

scala> val sqlDF = spark.sql("SELECT * FROM people")

sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

4)結果展示

scala> sqlDF.show

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

+----+-------+

注意:臨時表是Session范圍內的,Session退出后,表就失效了。如果想應用內有效,可以使用全局表。注意使用全局表需要全路徑訪問,如:global_temp.people

5)對於DataFrame創建一個全局表

scala> df.createGlobalTempView("people")

6)通過SQL語句實現查詢全表

scala> spark.sql("SELECT * FROM global_temp.people").show()

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

 

scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

+----+-------+

2.2.3 DSL風格語法(次要)

1)創建一個DateFrame

scala> spark.read.

csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile

2)查看DataFrameSchema信息

scala> df.printSchema

root

 |-- age: long (nullable = true)

 |-- name: string (nullable = true)

3)只查看”name”列數據

scala> df.select("name").show()

+-------+

|   name|

+-------+

|Michael|

|   Andy|

| Justin|

+-------+

4)查看”name”列數據以及”age+1”數據

scala> df.select($"name", $"age" + 1).show()

+-------+---------+

|   name|(age + 1)|

+-------+---------+

|Michael|     null|

|   Andy|       31|

| Justin|       20|

+-------+---------+

5)查看”age”大於”21”的數據

scala> df.filter($"age" > 21).show()

+---+----+

|age|name|

+---+----+

| 30|Andy|

+---+----+

6)按照”age”分組,查看數據條數

scala> df.groupBy("age").count().show()

+----+-----+

| age|count|

+----+-----+

|  19|     1|

|null|     1|

|  30|     1|

+----+-----+

2.2.4 RDD轉換為DateFrame

注意:如果需要RDD與DF或者DS之間操作,那么都需要引入 import spark.implicits._  【spark不是包名,而是sparkSession對象的名稱】

前置條件:導入隱式轉換並創建一個RDD

scala> import spark.implicits._

import spark.implicits._

 

scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")

peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27

1)通過手動確定轉換

scala> peopleRDD.map{x=>val para = x.split(",");(para(0),para(1).trim.toInt)}.toDF("name","age")

res1: org.apache.spark.sql.DataFrame = [name: string, age: int]

2)通過反射確定(需要用到樣例類)

1)創建一個樣例類

scala> case class People(name:String, age:Int)

2)根據樣例類將RDD轉換為DataFrame

scala> peopleRDD.map{ x => val para = x.split(",");People(para(0),para(1).trim.toInt)}.toDF

res2: org.apache.spark.sql.DataFrame = [name: string, age: int]

3)通過編程的方式(了解)

1)導入所需的類型

scala> import org.apache.spark.sql.types._

import org.apache.spark.sql.types._

2)創建Schema

scala> val structType: StructType = StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil)

structType: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))

(3)導入所需的類型

scala> import org.apache.spark.sql.Row

import org.apache.spark.sql.Row

4)根據給定的類型創建二元組RDD

scala> val data = peopleRDD.map{ x => val para = x.split(",");Row(para(0),para(1).trim.toInt)}

data: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at <console>:33

5)根據數據及給定的schema創建DataFrame

scala> val dataFrame = spark.createDataFrame(data, structType)

dataFrame: org.apache.spark.sql.DataFrame = [name: string, age: int]

2.2.5 DateFrame轉換為RDD

直接調用rdd即可

1)創建一個DataFrame

scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

2)將DataFrame轉換為RDD

scala> val dfToRDD = df.rdd

dfToRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[19] at rdd at <console>:29

3)打印RDD

scala> dfToRDD.collect

res13: Array[org.apache.spark.sql.Row] = Array([Michael, 29], [Andy, 30], [Justin, 19])

2.3 DataSet

Dataset是具有強類型的數據集合,需要提供對應的類型信息。

2.3.1 創建

1)創建一個樣例類

scala> case class Person(name: String, age: Long)

defined class Person

2)創建DataSet

scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()

caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

2.3.2 RDD轉換為DataSet

SparkSQL能夠自動將包含有case類的RDD轉換成DataFramecase類定義了table的結構,case類屬性通過反射變成了表的列名。Case類可以包含諸如Seqs或者Array等復雜的結構

1)創建一個RDD

scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")

peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27

2)創建一個樣例類

scala> case class Person(name: String, age: Long)

defined class Person

3)RDD轉化為DataSet

4)

scala> peopleRDD.map(line => {val para = line.split(",");Person(para(0),para(1).trim.toInt)}).toDS()

res8: org.apache.spark.sql.DataSet[People] = [name: string, age: bigint]

2.3.3 DataSet轉換為RDD

調用rdd方法即可。

1)創建一個DataSet

scala> val DS = Seq(Person("Andy", 32)).toDS()

DS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

2)將DataSet轉換為RDD

scala> DS.rdd

res11: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[15] at rdd at <console>:28

2.4 DataFrameDataSet的互操作

1. DataFrame轉換為DataSet

1)創建一個DateFrame

scala> val df = spark.read.json("examples/src/main/resources/people.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

2)創建一個樣例類

scala> case class Person(name: String, age: Long)

defined class Person

3)將DateFrame轉化為DataSet

scala> df.as[Person]

res14: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

2. DataSet轉換為DataFrame

1)創建一個樣例類

scala> case class Person(name: String, age: Long)

defined class Person

2)創建DataSet

scala> val ds = Seq(Person("Andy", 32)).toDS()

ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

3)將DataSet轉化為DataFrame

scala> val df = ds.toDF

df: org.apache.spark.sql.DataFrame = [name: string, age: bigint]

4)展示

scala> df.show

+----+---+

|name|age|

+----+---+

|Andy| 32|

+----+---+

2.4.1 DatasetDataFrame

這個很簡單,因為只是把case class封裝成Row

1)導入隱式轉換

import spark.implicits._

(2)轉換

val testDF = testDS.toDF

2.4.2 DataFrameDataset

1)導入隱式轉換

import spark.implicits._

2)創建樣例類

case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型

3)轉換

val testDS = testDF.as[Coltest]

這種方法就是在給出每一列的類型后,使用as方法,轉成Dataset,這在數據類型是DataFrame又需要針對各個字段處理時極為方便在使用一些特殊的操作時,一定要加上 import spark.implicits._ 不然toDFtoDS無法使用

2.5 RDDDataFrameDataSet

 

 SparkSQLSpark為我們提供了兩個新的抽象,分別是DataFrame和DataSet他們和RDD有什么區別呢?首先從版本的產生上來看:

RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)

如果同樣的數據都給到這三個數據結構,他們分別計算之后,都會給出相同的結果。不同是的他們的執行效率和執行方式。

在后期的Spark版本中,DataSet會逐步取代RDDDataFrame成為唯一的API接口。

2.5.1 三者的共性

1RDDDataFrameDataset全都是spark平台下的分布式彈性數據集,為處理超大型數據提供便利

2、三者都有惰性機制,在進行創建、轉換,如map方法時,不會立即執行,只有在遇到Actionforeach時,三者才會開始遍歷運算

3、三者都會根據spark的內存情況自動緩存運算,這樣即使數據量很大,也不用擔心會內存溢出

4、三者都有partition的概念

5、三者有許多共同的函數,如filter,排序等

6、在對DataFrameDataset進行操作許多操作都需要這個包進行支持

import spark.implicits._

7DataFrameDataset均可使用模式匹配獲取各個字段的值和類型

DataFrame:

testDF.map{

      case Row(col1:String,col2:Int)=>

        println(col1);println(col2)

        col1

      case _=>

        ""

    }

Dataset:

case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型

    testDS.map{

      case Coltest(col1:String,col2:Int)=>

        println(col1);println(col2)

        col1

      case _=>

        ""

    }

2.5.2 三者的區別

1. RDD:

1)RDD一般和spark mlib同時使用

2)RDD不支持sparksql操作

2. DataFrame:

1)RDDDataset不同,DataFrame每一行的類型固定為Row,每一列的值沒法直接訪問只有通過解析才能獲取各個字段的值,如

testDF.foreach{

  line =>

    val col1=line.getAs[String]("col1")

    val col2=line.getAs[String]("col2")

}

2)DataFrameDataset一般不與spark mlib同時使用

3)DataFrameDataset均支持sparksql的操作,比如selectgroupby之類,還能注冊臨時表/視窗,進行sql語句操作,如

dataDF.createOrReplaceTempView("tmp")

spark.sql("select  ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)

4)DataFrameDataset支持一些特別方便的保存方式,比如保存成csv,可以帶上表頭,這樣每一列的字段名一目了然

//保存

val saveoptions = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://hadoop102:9000/test")

datawDF.write.format("com.atguigu.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()

//讀取

val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://hadoop102:9000/test")

val datarDF= spark.read.options(options).format("com.atguigu.spark.csv").load()

利用這樣的保存方式,可以方便的獲得字段名和列的對應,而且分隔符(delimiter)可以自由指定

3. Dataset:

1)DatasetDataFrame擁有完全相同的成員函數,區別只是每一行的數據類型不同

2)DataFrame也可以叫Dataset[Row],每一行的類型是Row,不解析,每一行究竟有哪些字段,各個字段又是什么類型都無從得知,只能用上面提到的getAS方法或者共性中的第七條提到的模式匹配拿出特定字段Dataset中,每一行是什么類型是不一定的,在自定義了case class之后可以很自由的獲得每一行的信息

case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型

/**

 rdd

 ("a", 1)

 ("b", 1)

 ("a", 1)

**/

val test: Dataset[Coltest]=rdd.map{line=>

      Coltest(line._1,line._2)

    }.toDS

test.map{

      line=>

        println(line.col1)

        println(line.col2)

    }

可以看出,Dataset在需要訪問列中的某個字段時是非常方便的,然而,如果要寫一些適配性很強的函數時,如果使用Dataset,行的類型又不確定,可能是各種case class,無法實現適配,這時候用DataFrameDataset[Row]就能比較好的解決問題

2.6 IDEA創建SparkSQL程序

IDEA中程序的打包和運行方式都和SparkCore類似,Maven依賴中需要添加新的依賴項:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.1.1</version>
</dependency>

程序如下:

package com.atguigu.sparksql

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

object HelloWorld {

  def main(args: Array[String]) {
    //創建SparkConf()並設置App名稱
    val spark = SparkSession
      .builder().master(“local[*]”)
      .appName("Spark SQL basic example")
      .getOrCreate()

    // For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._

    val df = spark.read.json("file:/F:/hajima.json")

    // Displays the content of the DataFrame to stdout
    df.show()

    df.filter($"age" > 21).show()

    df.createOrReplaceTempView("persons")

    spark.sql("SELECT * FROM persons where age > 21").show()

    spark.stop()
  }

}

2.7 用戶自定義函數

Shell窗口中可以通過spark.udf功能用戶可以自定義函數。

2.7.1 用戶自定義UDF函數

scala> val df = spark.read.json("examples/src/main/resources/people.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

 

scala> df.show()

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

+----+-------+

 

 

scala> spark.udf.register("addName", (x:String)=> "Name:"+x)

res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

 

scala> df.createOrReplaceTempView("people")

 

scala> spark.sql("Select addName(name), age from people").show()

+-----------------+----+

|UDF:addName(name)| age|

+-----------------+----+

|     Name:Michael|null|

|        Name:Andy|  30|

|      Name:Justin|  19|

+-----------------+----+

2.7.2 用戶自定義聚合函數

強類型的Dataset型的DataFrame都提供了相關的聚合函數, 如 count()countDistinct()avg()max()min()。除此之外,用可以定自己的自定聚合函數。

弱類型用戶自定義聚合函數:通過繼承UserDefinedAggregateFunction來實現用戶自定義聚合函數。下面展示一個求平均工資的自定義聚合函數。

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

object MyAverage extends UserDefinedAggregateFunction {
// 聚合函數輸入參數的數據類型
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
// 聚合緩沖區中值得數據類型
def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
// 返回值的數據類型
def dataType: DataType = DoubleType
// 對於相同的輸入是否一直返回相同的輸出。
def deterministic: Boolean = true
// 初始化
def initialize(buffer: MutableAggregationBuffer): Unit = {

// 存工資的總額
buffer(0) = 0L

// 存工資的個數
buffer(1) = 0L
}
// 相同Execute間的數據合並。
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
// 不同Execute間的數據合並
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// 計算最終結果

def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}

// 注冊函數
spark.udf.register("myAverage", MyAverage)

val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

強類型用戶自定義聚合函數:通過繼承Aggregator來實現強類型自定義聚合函數,同樣是求平均工資

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.SparkSession
// 既然是強類型,可能有case
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

object MyAverage extends Aggregator[Employee, Average, Double] {
// 定義一個數據結構,保存工資總數和工資總個數,初始都為0
def zero: Average = Average(0L, 0L)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// 聚合不同execute的結果
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// 計算輸出
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// 設定之間值類型的編碼器,要轉換成case

// Encoders.product是進行scala元組和case類轉換的編碼器
def bufferEncoder: Encoder[Average] = Encoders.product
// 設定最終輸出值的編碼器
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

import spark.implicits._


val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

3 SparkSQL數據源

3.1 通用加載/保存方法

3.1.1 手動指定選項

Spark SQLDataFrame接口支持多種數據源的操作。一個DataFrame可以進行RDDs方式的操作,也可以被注冊為臨時表。把DataFrame注冊為臨時表之后,就可以對該DataFrame執行SQL查詢。

Spark SQL的默認數據源為Parquet格式。數據源為Parquet文件時,Spark SQL可以方便的執行所有的操作。修改配置項spark.sql.sources.default,可修改默認數據源格式。

val df = spark.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

當數據源格式不是parquet格式文件時,需要手動指定數據源的格式。數據源格式需要指定全名(例如:org.apache.spark.sql.parquet),如果數據源格式為內置格式,則只需要指定簡稱定json, parquet, jdbc, orc, libsvm, csv, text來指定數據的格式。

可以通過SparkSession提供的read.load方法用於通用加載數據,使用writesave保存數據。 

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.write.format("parquet").save("hdfs://hadoop102:9000/namesAndAges.parquet")

除此之外,可以直接運行SQL在文件上:

val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://hadoop102:9000/namesAndAges.parquet`")

sqlDF.show()

scala> val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")

peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

 

scala> peopleDF.write.format("parquet").save("hdfs://hadoop102:9000/namesAndAges.parquet")

 

scala> peopleDF.show()

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

+----+-------+

 

scala> val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs:// hadoop102:9000/namesAndAges.parquet`")

17/09/05 04:21:11 WARN ObjectStore: Failed to get database parquet, returning NoSuchObjectException

sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

 

scala> sqlDF.show()

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

+----+-------+

3.1.2 文件保存選項

可以采用SaveMode執行存儲操作,SaveMode定義了對數據的處理模式。需要注意的是,這些保存模式不使用任何鎖定,不是原子操作。此外,當使用Overwrite方式執行時,在輸出新數據之前原數據就已經被刪除。SaveMode詳細介紹如下表:

 

Scala/Java

Any Language

Meaning

SaveMode.ErrorIfExists(default)

"error"(default)

如果文件存在,則報錯

SaveMode.Append

"append"

追加

SaveMode.Overwrite

"overwrite"

覆寫

SaveMode.Ignore

"ignore"

數據存在,則忽略

3.2 JSON文件

Spark SQL 能夠自動推測 JSON數據集的結構,並將它加載為一個Dataset[Row]. 可以通過SparkSession.read.json()去加載一個 一個JSON 文件。

注意:這個JSON文件不是一個傳統的JSON文件,每一行都得是一個JSON串。

{"name":"Michael"}

{"name":"Andy", "age":30}

{"name":"Justin", "age":19}

 

// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)

// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|

3.3 Parquet文件

Parquet是一種流行的列式存儲格式,可以高效地存儲具有嵌套字段的記錄。Parquet格式經常在Hadoop生態圈中被使用,它也支持Spark SQL的全部數據類型。Spark SQL 提供了直接讀取和存儲 Parquet 格式文件的方法。

importing spark.implicits._
import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

peopleDF.write.parquet("hdfs://hadoop102:9000/people.parquet")

val parquetFileDF = spark.read.parquet("hdfs:// hadoop102:9000/people.parquet")

parquetFileDF.createOrReplaceTempView("parquetFile")


val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

3.4 JDBC

Spark SQL可以通過JDBC從關系型數據庫中讀取數據的方式創建DataFrame,通過對DataFrame一系列的計算后,還可以將數據再寫回關系型數據庫中。

注意:需要將相關的數據庫驅動放到spark的類路徑下。

$ bin/spark-shell --master spark://hadoop102:7077 --jars mysql-connector-java-5.1.27-bin.jar

 

// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop102:3306/rdd").option("dbtable", " rddtable").option("user", "root").option("password", "000000").load()

val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "hive")
val jdbcDF2 = spark.read
.jdbc("jdbc:mysql://hadoop102:3306/rdd", "rddtable", connectionProperties)

// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/rdd")
.option("dbtable", "dftable")
.option("user", "root")
.option("password", "000000")
.save()

jdbcDF2.write
.jdbc("jdbc:mysql://hadoop102:3306/mysql", "db", connectionProperties)

3.5 Hive數據庫

Apache HiveHadoop上的SQL引擎,Spark SQL編譯時可以包含Hive支持,也可以不包含。包含Hive支持的Spark SQL可以支持Hive表訪問、UDF(用戶自定義函數)以及 Hive 查詢語言(HiveQL/HQL)。需要強調的一點是,如果要在Spark SQL中包含Hive的庫,並不需要事先安裝Hive。一般來說,最好還是在編譯Spark SQL時引入Hive支持,這樣就可以使用這些特性了。如果你下載的是二進制版本的 Spark,它應該已經在編譯時添加了 Hive 支持。

若要把Spark SQL連接到一個部署好的Hive上,你必須把hive-site.xml復制到 Spark的配置文件目錄中($SPARK_HOME/conf)。即使沒有部署好HiveSpark SQL也可以運行。 需要注意的是,如果你沒有部署好HiveSpark SQL會在當前的工作目錄中創建出自己的Hive 元數據倉庫,叫作 metastore_db。此外,如果你嘗試使用 HiveQL 中的 CREATE TABLE (並非 CREATE EXTERNAL TABLE)語句來創建表,這些表會被放在你默認的文件系統中的 /user/hive/warehouse 目錄中(如果你的 classpath 中有配好的 hdfs-site.xml,默認的文件系統就是 HDFS,否則就是本地文件系統)

import java.io.File

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

case class Record(key: Int, value: String)

// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")

// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// |  5| val_5|  5| val_5|

3.5.1 內嵌Hive應用

如果要使用內嵌的Hive,什么都不用做,直接用就可以了。 --conf : spark.sql.warehouse.dir=

 

注意如果你使用的是內部的Hive,Spark2.0之后,spark.sql.warehouse.dir用於指定數據倉庫的地址如果你需要是用HDFS作為路徑那么需要將core-site.xmlhdfs-site.xml 加入到Spark conf目錄否則只會創建master節點上的warehouse目錄查詢時會出現文件找不到的問題這是需要向使用HDFS,則需要將metastore刪除重啟集群

3.5.2 外部Hive應用

如果想連接外部已經部署好的Hive,需要通過以下幾個步驟。

1) Hive中的hive-site.xml拷貝或者軟連接到Spark安裝目錄下的conf目錄下。

2) 打開spark shell,注意帶上訪問Hive元數據庫的JDBC客戶端

$ bin/spark-shell --master spark://hadoop102:7077 --jars mysql-connector-java-5.1.27-bin.jar

3.5.3 運行Spark SQL CLI

Spark SQL CLI可以很方便的在本地運行Hive元數據服務以及從命令行執行查詢任務。在Spark目錄下執行如下命令啟動Spark SQL CLI

./bin/spark-sql

配置Hive需要替換 conf/ 下的 hive-site.xml 

// +---------------+----+

4 Spark SQL實戰

4.1 數據說明

數據集是貨品交易數據集。

每個訂單可能包含多個貨品,每個訂單可以產生多次交易,不同的貨品有不同的單價。

4.2 加載數據

tbStock

scala> case class tbStock(ordernumber:String,locationid:String,dateid:String) extends Serializable

defined class tbStock

 

scala> val tbStockRdd = spark.sparkContext.textFile("tbStock.txt")

tbStockRdd: org.apache.spark.rdd.RDD[String] = tbStock.txt MapPartitionsRDD[1] at textFile at <console>:23

 

scala> val tbStockDS = tbStockRdd.map(_.split(",")).map(attr=>tbStock(attr(0),attr(1),attr(2))).toDS

tbStockDS: org.apache.spark.sql.Dataset[tbStock] = [ordernumber: string, locationid: string ... 1 more field]

 

scala> tbStockDS.show()

+------------+----------+---------+

| ordernumber|locationid|   dataid|

+------------+----------+---------+

|BYSL00000893|      ZHAO|2007-8-23|

|BYSL00000897|      ZHAO|2007-8-24|

|BYSL00000898|      ZHAO|2007-8-25|

|BYSL00000899|      ZHAO|2007-8-26|

|BYSL00000900|      ZHAO|2007-8-26|

|BYSL00000901|      ZHAO|2007-8-27|

|BYSL00000902|      ZHAO|2007-8-27|

|BYSL00000904|      ZHAO|2007-8-28|

|BYSL00000905|      ZHAO|2007-8-28|

|BYSL00000906|      ZHAO|2007-8-28|

|BYSL00000907|      ZHAO|2007-8-29|

|BYSL00000908|      ZHAO|2007-8-30|

|BYSL00000909|      ZHAO| 2007-9-1|

|BYSL00000910|      ZHAO| 2007-9-1|

|BYSL00000911|      ZHAO|2007-8-31|

|BYSL00000912|      ZHAO| 2007-9-2|

|BYSL00000913|      ZHAO| 2007-9-3|

|BYSL00000914|      ZHAO| 2007-9-3|

|BYSL00000915|      ZHAO| 2007-9-4|

|BYSL00000916|      ZHAO| 2007-9-4|

+------------+----------+---------+

only showing top 20 rows

 

tbStockDetail:

scala> case class tbStockDetail(ordernumber:String, rownum:Int, itemid:String, number:Int, price:Double, amount:Double) extends Serializable

defined class tbStockDetail

 

scala> val tbStockDetailRdd = spark.sparkContext.textFile("tbStockDetail.txt")

tbStockDetailRdd: org.apache.spark.rdd.RDD[String] = tbStockDetail.txt MapPartitionsRDD[13] at textFile at <console>:23

 

scala> val tbStockDetailDS = tbStockDetailRdd.map(_.split(",")).map(attr=> tbStockDetail(attr(0),attr(1).trim().toInt,attr(2),attr(3).trim().toInt,attr(4).trim().toDouble, attr(5).trim().toDouble)).toDS

tbStockDetailDS: org.apache.spark.sql.Dataset[tbStockDetail] = [ordernumber: string, rownum: int ... 4 more fields]

 

scala> tbStockDetailDS.show()

+------------+------+--------------+------+-----+------+

| ordernumber|rownum|        itemid|number|price|amount|

+------------+------+--------------+------+-----+------+

|BYSL00000893|     0|FS527258160501|    -1|268.0|-268.0|

|BYSL00000893|     1|FS527258169701|     1|268.0| 268.0|

|BYSL00000893|     2|FS527230163001|     1|198.0| 198.0|

|BYSL00000893|     3|24627209125406|     1|298.0| 298.0|

|BYSL00000893|     4|K9527220210202|     1|120.0| 120.0|

|BYSL00000893|     5|01527291670102|     1|268.0| 268.0|

|BYSL00000893|     6|QY527271800242|     1|158.0| 158.0|

|BYSL00000893|     7|ST040000010000|     8|  0.0|   0.0|

|BYSL00000897|     0|04527200711305|     1|198.0| 198.0|

|BYSL00000897|     1|MY627234650201|     1|120.0| 120.0|

|BYSL00000897|     2|01227111791001|     1|249.0| 249.0|

|BYSL00000897|     3|MY627234610402|     1|120.0| 120.0|

|BYSL00000897|     4|01527282681202|     1|268.0| 268.0|

|BYSL00000897|     5|84126182820102|     1|158.0| 158.0|

|BYSL00000897|     6|K9127105010402|     1|239.0| 239.0|

|BYSL00000897|     7|QY127175210405|     1|199.0| 199.0|

|BYSL00000897|     8|24127151630206|     1|299.0| 299.0|

|BYSL00000897|     9|G1126101350002|     1|158.0| 158.0|

|BYSL00000897|    10|FS527258160501|     1|198.0| 198.0|

|BYSL00000897|    11|ST040000010000|    13|  0.0|   0.0|

+------------+------+--------------+------+-----+------+

only showing top 20 rows

 

tbDate:

scala> case class tbDate(dateid:String, years:Int, theyear:Int, month:Int, day:Int, weekday:Int, week:Int, quarter:Int, period:Int, halfmonth:Int) extends Serializable

defined class tbDate

 

scala> val tbDateRdd = spark.sparkContext.textFile("tbDate.txt")

tbDateRdd: org.apache.spark.rdd.RDD[String] = tbDate.txt MapPartitionsRDD[20] at textFile at <console>:23

 

scala> val tbDateDS = tbDateRdd.map(_.split(",")).map(attr=> tbDate(attr(0),attr(1).trim().toInt, attr(2).trim().toInt,attr(3).trim().toInt, attr(4).trim().toInt, attr(5).trim().toInt, attr(6).trim().toInt, attr(7).trim().toInt, attr(8).trim().toInt, attr(9).trim().toInt)).toDS

tbDateDS: org.apache.spark.sql.Dataset[tbDate] = [dateid: string, years: int ... 8 more fields]

 

scala> tbDateDS.show()

+---------+------+-------+-----+---+-------+----+-------+------+---------+

|   dateid| years|theyear|month|day|weekday|week|quarter|period|halfmonth|

+---------+------+-------+-----+---+-------+----+-------+------+---------+

| 2003-1-1|200301|   2003|    1|  1|      3|   1|      1|     1|        1|

| 2003-1-2|200301|   2003|    1|  2|      4|   1|      1|     1|        1|

| 2003-1-3|200301|   2003|    1|  3|      5|   1|      1|     1|        1|

| 2003-1-4|200301|   2003|    1|  4|      6|   1|      1|     1|        1|

| 2003-1-5|200301|   2003|    1|  5|      7|   1|      1|     1|        1|

| 2003-1-6|200301|   2003|    1|  6|      1|   2|      1|     1|        1|

| 2003-1-7|200301|   2003|    1|  7|      2|   2|      1|     1|        1|

| 2003-1-8|200301|   2003|    1|  8|      3|   2|      1|     1|        1|

| 2003-1-9|200301|   2003|    1|  9|      4|   2|      1|     1|        1|

|2003-1-10|200301|   2003|    1| 10|      5|   2|      1|     1|        1|

|2003-1-11|200301|   2003|    1| 11|      6|   2|      1|     2|        1|

|2003-1-12|200301|   2003|    1| 12|      7|   2|      1|     2|        1|

|2003-1-13|200301|   2003|    1| 13|      1|   3|      1|     2|        1|

|2003-1-14|200301|   2003|    1| 14|      2|   3|      1|     2|        1|

|2003-1-15|200301|   2003|    1| 15|      3|   3|      1|     2|        1|

|2003-1-16|200301|   2003|    1| 16|      4|   3|      1|     2|        2|

|2003-1-17|200301|   2003|    1| 17|      5|   3|      1|     2|        2|

|2003-1-18|200301|   2003|    1| 18|      6|   3|      1|     2|        2|

|2003-1-19|200301|   2003|    1| 19|      7|   3|      1|     2|        2|

|2003-1-20|200301|   2003|    1| 20|      1|   4|      1|     2|        2|

+---------+------+-------+-----+---+-------+----+-------+------+---------+

only showing top 20 rows

注冊表:

scala> tbStockDS.createOrReplaceTempView("tbStock")

 

scala> tbDateDS.createOrReplaceTempView("tbDate")

 

scala> tbStockDetailDS.createOrReplaceTempView("tbStockDetail")

4.3 計算所有訂單中每年的銷售單數、銷售總額

統計所有訂單中每年的銷售單數、銷售總額

三個表連接后以count(distinct a.ordernumber)計銷售單數,sum(b.amount)計銷售總額

 

SELECT c.theyear, COUNT(DISTINCT a.ordernumber), SUM(b.amount)

FROM tbStock a

JOIN tbStockDetail b ON a.ordernumber = b.ordernumber

JOIN tbDate c ON a.dateid = c.dateid

GROUP BY c.theyear

ORDER BY c.theyear

 

spark.sql("SELECT c.theyear, COUNT(DISTINCT a.ordernumber), SUM(b.amount) FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear ORDER BY c.theyear").show

結果如下:

+-------+---------------------------+--------------------+                      

|theyear|count(DISTINCT ordernumber)|         sum(amount)|

+-------+---------------------------+--------------------+

|   2004|                          1094|   3268115.499199999|

|   2005|                          3828|1.3257564149999991E7|

|   2006|                         3772|1.3680982900000006E7|

|   2007|                          4885|1.6719354559999993E7|

|   2008|                           4861| 1.467429530000001E7|

|   2009|                            2619|   6323697.189999999|

|   2010|                              94|  210949.65999999997|

+-------+---------------------------+--------------------+

4.4 計算所有訂單每年最大金額訂單的銷售額

目標:統計每年最大金額訂單的銷售額:

 

1) 統計每年,每個訂單一共有多少銷售額

SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount

FROM tbStock a

JOIN tbStockDetail b ON a.ordernumber = b.ordernumber

GROUP BY a.dateid, a.ordernumber

 

spark.sql("SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber GROUP BY a.dateid, a.ordernumber").show

結果如下:

+----------+------------+------------------+

|    dateid| ordernumber|       SumOfAmount|

+----------+------------+------------------+

|  2008-4-9|BYSL00001175|             350.0|

| 2008-5-12|BYSL00001214|             592.0|

| 2008-7-29|BYSL00011545|            2064.0|

|  2008-9-5|DGSL00012056|            1782.0|

| 2008-12-1|DGSL00013189|             318.0|

|2008-12-18|DGSL00013374|             963.0|

|  2009-8-9|DGSL00015223|            4655.0|

| 2009-10-5|DGSL00015585|            3445.0|

| 2010-1-14|DGSL00016374|            2934.0|

| 2006-9-24|GCSL00000673|3556.1000000000004|

| 2007-1-26|GCSL00000826| 9375.199999999999|

| 2007-5-24|GCSL00001020| 6171.300000000002|

|  2008-1-8|GCSL00001217|            7601.6|

| 2008-9-16|GCSL00012204|            2018.0|

| 2006-7-27|GHSL00000603|            2835.6|

|2006-11-15|GHSL00000741|           3951.94|

|  2007-6-6|GHSL00001149|               0.0|

| 2008-4-18|GHSL00001631|              12.0|

| 2008-7-15|GHSL00011367|             578.0|

|  2009-5-8|GHSL00014637|            1797.6|

+----------+------------+------------------+

2) 以上一步查詢結果為基礎表,和表tbDate使用dateid join,求出每年最大金額訂單的銷售額

SELECT theyear, MAX(c.SumOfAmount) AS SumOfAmount

FROM (SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount

FROM tbStock a

JOIN tbStockDetail b ON a.ordernumber = b.ordernumber

GROUP BY a.dateid, a.ordernumber

) c

JOIN tbDate d ON c.dateid = d.dateid

GROUP BY theyear

ORDER BY theyear DESC

 

spark.sql("SELECT theyear, MAX(c.SumOfAmount) AS SumOfAmount FROM (SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber GROUP BY a.dateid, a.ordernumber ) c JOIN tbDate d ON c.dateid = d.dateid GROUP BY theyear ORDER BY theyear DESC").show

結果如下:

+-------+------------------+                                                    

|theyear|       SumOfAmount|

+-------+------------------+

|   2010|13065.280000000002|

|   2009|25813.200000000008|

|   2008|           55828.0|

|   2007|          159126.0|

|   2006|           36124.0|

|   2005|38186.399999999994|

|   2004| 23656.79999999997|

+-------+------------------+

4.5 計算所有訂單中每年最暢銷貨品

目標:統計每年最暢銷貨品(哪個貨品銷售額amount在當年最高,哪個就是最暢銷貨品)

 

第一步、求出每年每個貨品的銷售額

SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount

FROM tbStock a

JOIN tbStockDetail b ON a.ordernumber = b.ordernumber

JOIN tbDate c ON a.dateid = c.dateid

GROUP BY c.theyear, b.itemid

 

spark.sql("SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid").show

結果如下:

+-------+--------------+------------------+                                     

|theyear|        itemid|       SumOfAmount|

+-------+--------------+------------------+

|   2004|43824480810202|           4474.72|

|   2006|YA214325360101|             556.0|

|   2006|BT624202120102|             360.0|

|   2007|AK215371910101|24603.639999999992|

|   2008|AK216169120201|29144.199999999997|

|   2008|YL526228310106|16073.099999999999|

|   2009|KM529221590106| 5124.800000000001|

|   2004|HT224181030201|2898.6000000000004|

|   2004|SG224308320206|           7307.06|

|   2007|04426485470201|14468.800000000001|

|   2007|84326389100102|           9134.11|

|   2007|B4426438020201|           19884.2|

|   2008|YL427437320101|12331.799999999997|

|   2008|MH215303070101|            8827.0|

|   2009|YL629228280106|           12698.4|

|   2009|BL529298020602|            2415.8|

|   2009|F5127363019006|             614.0|

|   2005|24425428180101|          34890.74|

|   2007|YA214127270101|             240.0|

|   2007|MY127134830105|          11099.92|

+-------+--------------+------------------+

第二步、在第一步的基礎上,統計每年單個貨品中的最大金額

SELECT d.theyear, MAX(d.SumOfAmount) AS MaxOfAmount

FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount

FROM tbStock a

JOIN tbStockDetail b ON a.ordernumber = b.ordernumber

JOIN tbDate c ON a.dateid = c.dateid

GROUP BY c.theyear, b.itemid

) d

GROUP BY d.theyear

 

spark.sql("SELECT d.theyear, MAX(d.SumOfAmount) AS MaxOfAmount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) d GROUP BY d.theyear").show

結果如下:

+-------+------------------+                                                    

|theyear|       MaxOfAmount|

+-------+------------------+

|   2007|           70225.1|

|   2006|          113720.6|

|   2004|53401.759999999995|

|   2009|           30029.2|

|   2005|56627.329999999994|

|   2010|            4494.0|

|   2008| 98003.60000000003|

+-------+------------------+

第三步、用最大銷售額和統計好的每個貨品的銷售額join,以及用年join,集合得到最暢銷貨品那一行信息

SELECT DISTINCT e.theyear, e.itemid, f.MaxOfAmount

FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount

FROM tbStock a

JOIN tbStockDetail b ON a.ordernumber = b.ordernumber

JOIN tbDate c ON a.dateid = c.dateid

GROUP BY c.theyear, b.itemid

) e

JOIN (SELECT d.theyear, MAX(d.SumOfAmount) AS MaxOfAmount

FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount

FROM tbStock a

JOIN tbStockDetail b ON a.ordernumber = b.ordernumber

JOIN tbDate c ON a.dateid = c.dateid

GROUP BY c.theyear, b.itemid

) d

GROUP BY d.theyear

) f ON e.theyear = f.theyear

AND e.SumOfAmount = f.MaxOfAmount

ORDER BY e.theyear

 

spark.sql("SELECT DISTINCT e.theyear, e.itemid, f.maxofamount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS sumofamount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) e JOIN (SELECT d.theyear, MAX(d.sumofamount) AS maxofamount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS sumofamount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) d GROUP BY d.theyear ) f ON e.theyear = f.theyear AND e.sumofamount = f.maxofamount ORDER BY e.theyear").show

結果如下:

+-------+--------------+------------------+                                     

|theyear|        itemid|       maxofamount|

+-------+--------------+------------------+

|   2004|JY424420810101|53401.759999999995|

|   2005|24124118880102|56627.329999999994|

|   2006|JY425468460101|          113720.6|

|   2007|JY425468460101|           70225.1|

|   2008|E2628204040101| 98003.60000000003|

|   2009|YL327439080102|           30029.2|

|   2010|SQ429425090101|            4494.0|

+-------+--------------+------------------+


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM