第1章 Spark SQL 概述1.1 什么是 Spark SQL1.2 RDD vs DataFrames vs DataSet1.2.1 RDD1.2.2 DataFrame1.2.3 DataSet1.2.4 三者的共性1.2.5 三者的區別第2章 執行 Spark SQL 查詢2.1 命令行查詢流程2.2 IDEA 創建 Spark SQL 程序第3章 Spark SQL 解析3.1 新的起始點 SparkSession3.2 創建 DataFrames3.3 DataFrame 常用操作3.3.1 DSL 風格語法3.3.2 SQL 風格語法3.4 創建 DataSet3.5 DataFrame 和 RDD 互操作3.5.1 通過反射的方式獲取 Scheam3.5.2 通過編程的方式設置 Schema(StructType)3.6 類型之間的轉換總結3.7 用戶自定義函數3.7.1 用戶自定義 UDF 函數3.7.2 用戶自定義 UDAF 函數(即聚合函數)第4章 Spark SQL 數據源4.1 通用加載/保存方法4.1.1 手動指定選項4.1.2 文件保存選項4.2 Parquet 文件4.2.1 Parquet 讀寫4.2.2 解析分區信息4.2.3 Schema 合並4.3 Hive 數據庫4.3.1 內嵌 Hive 應用4.3.2 外部 Hive 應用4.4 JSON 數據集4.5 JDBC第5章 JDBC/ODBC 服務器第6章 運行 Spark SQL CLI第7章 Spark SQL 實戰7.1 數據說明7.2 加載數據7.3 計算所有訂單中每年的銷售單數、銷售總額7.4 計算所有訂單每年最大金額訂單的銷售額7.5 計算所有訂單中每年最暢銷貨品
第1章 Spark SQL 概述
1.1 什么是 Spark SQL
Spark SQL:http://spark.apache.org/sql/
![]()
Spark SQL 是 Spark 用來處理結構化數據的一個模塊,它提供了一個編程抽象叫做 DataFrame,並且作為分布式 SQL 查詢引擎的作用。
我們已經學習了 Hive,它是將 Hive SQL 轉換成 MapReduce 然后提交到集群上執行,大大簡化了編寫 MapReduce 的程序的復雜性,由於 MapReduce 這種計算模型執行效率比較慢。所以 Spark SQL 的應運而生,它是將 Spark SQL 轉換成 RDD,然后提交到集群執行,執行效率非常快!
![]()
Spark SQL 的特點:
1、易整合(易集成)
2、統一的數據訪問方式
3、兼容 Hive
4、標准的數據連接![]()
Spark SQL我們要學什么?SparkSQL 可以看做是一個轉換層,向下對接各種不同的結構化數據源,向上提供不同的數據訪問方式。
![]()
1.2 RDD vs DataFrames vs DataSet
Spark SQL 的數據抽象
![]()
在 SparkSQL 中 Spark 為我們提供了兩個新的抽象,分別是 DataFrame 和 DataSet。他們和 RDD 有什么區別呢?首先從版本的產生上來看:RDD(Spark1.0) —> DataFrame(Spark1.3) —> DataSet(Spark1.6)
如果同樣的數據都給到這三個數據結構,他們分別計算之后,都會給出相同的結果。不同是的他們的執行效率和執行方式。
在后期的 Spark 版本中,DataSet 會逐步取代 RDD 和 DataFrame 成為唯一的 API 接口。
![]()
1.2.1 RDD
RDD 彈性分布式數據集,Spark 計算的基石,為用戶屏蔽了底層對數據的復雜抽象和處理,為用戶提供了一組方便的數據轉換與求值方法。
RDD 是一個懶執行的不可變的可以支持 Lambda 表達式的並行數據集合。
RDD 的最大好處就是簡單,API 的人性化程度很高。
RDD 的劣勢是性能限制,它是一個 JVM 駐內存對象,這也就決定了存在 GC 的限制和數據增加時 Java 序列化成本的升高。
RDD 例子如下:![]()
1.2.2 DataFrame
與 RDD 類似,DataFrame 也是一個分布式數據容器。然而 DataFrame 更像傳統數據庫的二維表格,除了數據以外,還記錄數據的結構信息,即 schema。同時,與 Hive 類似,DataFrame 也支持嵌套數據類型(struct、array 和 map)。從 API 易用性的角度上看,DataFrame API 提供的是一套高層的關系操作,比函數式的 RDD API 要更加友好,門檻更低。由於與 R 和 Pandas 的 DataFrame 類似,Spark DataFrame 很好地繼承了傳統單機數據分析的開發體驗。
![]()
上圖直觀地體現了 DataFrame 和 RDD 的區別。左側的 RDD[Person] 雖然以 Person 為類型參數,但 Spark 框架本身不了解 Person 類的內部結構。而右側的 DataFrame 卻提供了詳細的結構信息,使得 Spark SQL 可以清楚地知道該數據集中包含哪些列,每列的名稱和類型各是什么。DataFrame 多了數據的結構信息,即 schema。RDD 是分布式的Java對象的集合。DataFrame 是分布式的Row對象的集合。DataFrame 除了提供了比 RDD 更豐富的算子以外,更重要的特點是提升執行效率、減少數據讀取以及執行計划的優化,比如 filter 下推、裁剪等。
DataFrame 是為數據提供了 Schema 的視圖。可以把它當做數據庫中的一張表來對待。
DataFrame 也是懶執行的。
性能上比 RDD 要高,主要有兩方面原因:
(1)定制化內存管理:數據以二進制的方式存在於非堆內存,節省了大量空間之外,還擺脫了 GC 的限制。![]()
(2) 優化的執行計划:查詢計划通過 Spark catalyst optimiser 進行優化。
![]()
比如下面一個例子:
![]()
人口數據分析的示例:
![]()
為了說明查詢優化,我們來看上圖展示的人口數據分析的示例。圖中構造了兩個 DataFrame,將它們 join 之后又做了一次 filter 操作。如果原封不動地執行這個執行計划,最終的執行效率是不高的。因為 join 是一個代價較大的操作,也可能會產生一個較大的數據集。如果我們能將 filter 下推到 join 下方,先對 DataFrame 進行過濾,再 join 過濾后的較小的結果集,便可以有效縮短執行時間。而 Spark SQL 的查詢優化器正是這樣做的。簡而言之,邏輯查詢計划優化就是一個利用基於關系代數的等價變換,將高成本的操作替換為低成本操作的過程。
得到的優化執行計划在轉換成物理執行計划的過程中,還可以根據具體的數據源的特性將過濾條件下推至數據源內。最右側的物理執行計划中 filter 之所以消失不見,就是因為融入了用於執行最終的讀取操作的表掃描節點內。
對於普通開發者而言,查詢優化器的意義在於,即便是經驗並不豐富的程序員寫出的次優的查詢,也可以被盡量轉換為高效的形式予以執行。
DataFrame 的劣勢在於在編譯期缺少類型安全檢查,導致運行時出錯。
1.2.3 DataSet
1)是 DataFrame API 的一個擴展,是 Spark 最新的數據抽象。
2)用戶友好的 API 風格,既具有類型安全檢查也具有 DataFrame 的查詢優化特性。
3)DataSet 支持編解碼器,當需要訪問非堆上的數據時可以避免反序列化整個對象,提高了效率。
4)樣例類被用來在 DataSet 中定義數據的結構信息,樣例類中每個屬性的名稱直接映射到 DataSet 中的字段名稱。
5)DataFrame 是 DataSet 的特列,type DataFrame = Dataset[Row] ,所以可以通過 as 方法將 DataFrame 轉換為 DataSet。Row 是一個類型,跟 Car、Person 這些的類型一樣,所有的表結構信息都用 Row 來表示。
6)DataSet 是強類型的。比如可以有 Dataset[Car],Dataset[Person],DataFrame 只是知道字段,但是不知道字段的類型,所以在執行這些操作的時候是沒辦法在編譯的時候檢查是否類型失敗的,比如你可以對一個 String 進行減法操作,在執行的時候才報錯,而 DataSet 不僅僅知道字段,而且知道字段類型,所以有更嚴格的錯誤檢查。就跟 JSON 對象和類對象之間的類比。![]()
RDD 讓我們能夠決定怎么做,而 DataFrame 和 DataSet 讓我們決定做什么,控制的粒度不一樣。![]()
1.2.4 三者的共性
1、RDD、DataFrame、DataSet 全都是 spark 平台下的分布式彈性數據集,為處理超大型數據提供便利。
2、三者都有惰性機制,在進行創建、轉換,如 map 方法時,不會立即執行,只有在遇到 action,如 foreach 時,三者才會開始遍歷運算,極端情況下,如果代碼里面有創建、轉換,但是后面沒有在 action 中使用對應的結果,在執行時會被直接跳過。
val sparkconf = new SparkConf().setMaster("local").setAppName("test").set("spark.port.maxRetries","1000")
val spark = SparkSession.builder().config(sparkconf).getOrCreate()
val rdd=spark.sparkContext.parallelize(Seq(("a", 1), ("b", 1), ("a", 1)))
// map 不運行
rdd.map { line =>
println("運行")
line._1
}
3、三者都會根據 spark 的內存情況自動緩存運算,這樣即使數據量很大,也不用擔心會內存溢出。
4、三者都有 partition 的概念。
5、三者有許多共同的函數,如 filter,排序等。
6、在對 DataFrame 和 DataSet 進行許多操作都需要這個包進行支持
import spark.implicits._
7、DataFrame 和 DataSet 均可使用模式匹配獲取各個字段的值和類型
DataFrame:
testDF.map {
case Row(col1: String, col2: Int) =>
println(col1)
println(col2)
col1
case _=>
""
}
DataSet:
case class Coltest(col1: String, col2: Int)extends Serializable // 定義字段名和類型
testDS.map {
case Coltest(col1: String, col2: Int) =>
println(col1)
println(col2)
col1
case _=>
""
}
1.2.5 三者的區別
RDD:
1、RDD 一般和 spark MLlib 同時使用
2、RDD 不支持 spark sql 操作
DataFrame:
1、與 RDD 和 DataSet 不同,DataFrame 每一行的類型固定為 Row,只有通過解析才能獲取各個字段的值,如
testDF.foreach{
line =>
val col1=line.getAs[String]("col1")
val col2=line.getAs[String]("col2")
}
每一列的值沒法直接訪問
2、DataFrame 與 DataSet 一般與 spark MLlib 同時使用
3、DataFrame 與 DataSet 均支持 spark sql 的操作,比如 select,groupby 之類,還能注冊臨時表/視窗,進行 sql 語句操作,如
dataDF.createOrReplaceTempView("tmp")
spark.sql("select ROW, DATE from tmp where DATE is not null order by DATE").show(100, false)
4、DataFrame 與 DataSet 支持一些特別方便的保存方式,比如 保存成 csv,可以帶上表頭,這樣每一列的字段名一目了然
// 保存
val saveoptions = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://hadoop102:9000/test")
datawDF.write.format("com.atguigu.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()
// 讀取
val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://hadoop102:9000/test")
val datarDF= spark.read.options(options).format("com.atguigu.spark.csv").load()
利用這樣的保存方式,可以方便的獲得字段名和列的對應,而且分隔符(delimiter)可以自由指定。
DataSet:
DataSet 和 DataFrame 擁有完全相同的成員函數,區別只是每一行的數據類型不同。DataFrame 也可以叫 Dataset[Row],即每一行的類型是 Row,不解析,每一行究竟有哪些字段,各個字段又是什么類型都無從得知,只能用上面提到的 getAS 方法或者共性中的第七條提到的模式匹配拿出特定字段。而 DataSet 中,每一行是什么類型是不一定的,在自定義了 case class 之后可以很自由的獲得每一行的信息。
case class Coltest(col1: String, col2: Int) extends Serializable // 定義字段名和類型
/**
rdd
("a", 1)
("b", 1)
("a", 1)
**/
val test: Dataset[Coltest] = rdd.map { line =>
Coltest(line._1, line._2)
}.toDS
test.map{
line =>
println(line.col1)
println(line.col2)
}
可以看出,DataSet 在需要訪問列中的某個字段時是非常方便的,然而,如果要寫一些適配性很強的函數時,如果使用 DataSet,行的類型又不確定,可能是各種 case class,無法實現適配,這時候用 DataFrame,即 Dataset[Row] 就能比較好的解決問題。
第2章 執行 Spark SQL 查詢
2.1 命令行查詢流程
打開 spark-shell
例子:查詢大於 30 歲的用戶
創建如下 JSON 文件,注意 JSON 的格式:
{"name":"Michael", "age":30}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
操作步驟如下:
2.2 IDEA 創建 Spark SQL 程序
Spark SQL 在 IDEA 中程序的打包和運行方式都和 Spark Core 類似,Maven 依賴中需要添加新的依賴項:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<!-- provided 表示編譯期可用,運行期不可用 -->
<!--<scope>provided</scope>-->
</dependency>
程序如下:
package com.atguigu.sparksql
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object HelloWorld {
val logger = LoggerFactory.getLogger(HelloWorld.getClass)
def main(args: Array[String]) {
// 創建 SparkSession 並設置 App 名稱
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// 通過隱式轉換將 RDD 操作添加到 DataFrame 上
import spark.implicits._
// 通過 spark.read 操作讀取 JSON 數據
val df = spark.read.json("examples/src/main/resources/people.json")
// show 操作類似於 Action,將 DataFrame 直接打印到 Console 上
df.show()
// DSL 風格的使用方式:屬性的獲取方法 $
df.filter($"age" > 21).show()
//將 DataFrame 注冊為表
df.createOrReplaceTempView("persons")
// 執行 Spark SQL 查詢操作
spark.sql("select * from perosns where age > 21").show()
// 關閉資源
spark.stop()
}
}
第3章 Spark SQL 解析
3.1 新的起始點 SparkSession
在老的版本中,SparkSQL 提供兩種 SQL 查詢起始點,一個叫 SQLContext,用於 Spark 自己提供的 SQL 查詢,一個叫 HiveContext,用於連接 Hive 的查詢,SparkSession 是 Spark 最新的 SQL 查詢起始點,實質上是 SQLContext 和 HiveContext 的組合,所以在 SQLContext 和 HiveContext 上可用的 API 在 SparkSession 上同樣是可以使用的。SparkSession 內部封裝了 SparkContext,所以計算實際上是由 SparkContext 完成的。
import org.apache.spark.sql.SparkSession
// 創建 SparkSession 並設置 App 名稱
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// 通過隱式轉換將 RDD 操作添加到 DataFrame 上
import spark.implicits._
SparkSession.builder 用於創建一個 SparkSession。
import spark.implicits._ 的引入是用於將 DataFrames 隱式轉換成 RDD,使 df 能夠使用 RDD 中的方法。
如果需要 Hive 支持,則需要以下創建語句:
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.enableHiveSupport()
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
3.2 創建 DataFrames
在 Spark SQL 中 SparkSession 是創建 DataFrames 和執行 SQL 的入口,創建 DataFrames 有三種方式,一種是可以從一個存在的 RDD 進行轉換,還可以從 Hive Table 進行查詢返回,或者通過 Spark 的數據源進行創建。
1、從 Spark 數據源進行創建:
val df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
+---+-------+
|age| name|
+---+-------+
| 30|Michael|
| 30| Andy|
| 19| Justin|
+---+-------+
2、從 RDD 進行轉換:
/**
Michael, 29
Andy, 30
Justin, 19
**/
scala> val personRdd = sc.textFile("examples/src/main/resources/people.txt")
personRdd: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[18] at textFile at <console>:24
// 把每一行的數據用 "," 隔開,然后通過第二個 map 轉換成一個 Array 再通過 toDF 映射給 name 和 age
scala> val personDF3 = personRdd.map(_.split(",")).map(paras => (paras(0).trim(), paras(1).trim().toInt)).toDF("name", "age")
personDF3: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> personDF3.collect
res0: Array[org.apache.spark.sql.Row] = Array([Michael,29], [Andy,30], [Justin,19])
scala> personDF.show()
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
3、從 Hive Table 進行查詢返回,我們在數據源章節介紹。
3.3 DataFrame 常用操作
3.3.1 DSL 風格語法
// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| 31|
| Andy| 31|
| Justin| 20|
+-------+---------+
// Select person older than 21
df.filter($"age" > 21).show()
+---+-------+
|age| name|
+---+-------+
| 30|Michael|
| 30| Andy|
+---+-------+
// Count person by age
df.groupBy("age").count().show()
+---+-----+
|age|count|
+---+-----+
| 19| 1|
| 30| 2|
+---+-----+
3.3.2 SQL 風格語法
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("persons")
val sqlDF = spark.sql("SELECT * FROM persons")
sqlDF.show()
+---+-------+
|age| name|
+---+-------+
| 30|Michael|
| 30| Andy|
| 19| Justin|
+---+-------+
// Register the DataFrame as a global temporary view
df.createGlobalTempView("persons")
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.persons").show()
+---+-------+
|age| name|
+---+-------+
| 30|Michael|
| 30| Andy|
| 19| Justin|
+---+-------+
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.persons").show()
+---+-------+
|age| name|
+---+-------+
| 30|Michael|
| 30| Andy|
| 19| Justin|
+---+-------+
臨時表是 Session 范圍內的,Session 退出后,表就失效了。如果想應用范圍內有效,可以使用全局表。注意:使用全局表時需要全路徑訪問,如:global_temp.persons
3.4 創建 DataSet
DataSet 是具有強類型的數據集合,需要提供對應的類型信息。
scala> case class Person(name: String, age: Long)
defined class Person
scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala> caseClassDS.show()
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+
scala> val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> primitiveDS.map(_ + 1).collect()
res1: Array[Int] = Array(2, 3, 4)
scala> val path = "examples/src/main/resources/people.json"
path: String = examples/src/main/resources/people.json
scala> val peopleDS = spark.read.json(path).as[Person]
peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
scala> peopleDS.show()
+---+-------+
|age| name|
+---+-------+
| 30|Michael|
| 30| Andy|
| 19| Justin|
+---+-------+
3.5 DataFrame 和 RDD 互操作
Spark SQL 支持通過兩種方式將存在的 RDD 轉換為 DataSet,轉換的過程中需要讓 DataSet 獲取 RDD 中的 Schema 信息。
主要有兩種方式:
第一種:是通過反射來獲取 RDD 中的 Schema 信息,這種方式適合於列名已知的情況下。
第二種:是通過編程接口的方式將 Schema 信息應用於 RDD,這種方式可以處理那種在運行時才能知道列的情況下。
3.5.1 通過反射的方式獲取 Scheam
Spark SQL 能夠自動將包含有 case 類的 RDD 轉換成 DataFrame,case 類定義了 table 的結構,case 類屬性通過反射變成了表的列名。case 類可以包含諸如 Seqs 或者 Array 等復雜的結構。
// For implicit conversions from RDDs to DataFrames
import spark.implicits._
// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0).trim(), attributes(1).trim().toInt))
.toDF()
peopleDF.show
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 25")
// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show() // 通過 row 對象的索引進行訪問
+------------+
| value|
+------------+
|Name: Justin|
+------------+
// or by field name 通過 row 對象的 getAs 方法訪問
teenagersDF.map(teenager => "Name: " + teenager.getAs[Int](0)).show() // 以索引訪問
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() // 以列名訪問
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// No pre-defined encoders for Dataset[Map[K,V]], define explicitly (沒有為數據集 [Map [K,V]] 預定義的編碼器明確定義)
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as (原始類型和樣例類也可以定義為)
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] (row.getValuesMap [T] 一次檢索多個列到 Map [String,T])
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array[Map[String,Any]] = Array(Map(name -> Justin, age -> 19))
3.5.2 通過編程的方式設置 Schema(StructType)
如果 case 類不能夠提前定義,可以通過下面三個步驟定義一個 DataFrame,步驟如下:
1、創建一個多行結構的 RDD。
2、創建用 StructType 來表示的行結構信息。
3、通過 SparkSession 提供的 createDataFrame 方法來應用 Schema。
import org.apache.spark.sql.types._
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string (schema 以字符串形式編碼)
val schemaString = "name age" // 實際開發中 schemaString 是動態生成的
// Generate the schema based on the string of schema (根據 schema 字符串生成 schema)
// 把 name 和 age 都設置成 StringType 類型
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true)) // Array[org.apache.spark.sql.types.StructField]
val schema = StructType(fields)
// 把 name 設置成 StringType 類型,把 age 設置成 IntegerType 類型
// val fields = schemaString.split(" ").map(fieldName => fieldName match {
// case "name" => StructField(fieldName, StringType, nullable = true);
// case "age" => StructField(fieldName, IntegerType, nullable = true)
// }) // Array[org.apache.spark.sql.types.StructField]
// Convert records of the RDD (people) to Rows (將 RDD (people) 的記錄轉換為很多行)
import org.apache.spark.sql._
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0).trim, attributes(1).trim)) // org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: string] 注意:此時的 name 和 age 都是 StringType 類型
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name (可以通過字段索引或字段名稱訪問結果中行的列)
results.map(attributes => "Name: " + attributes(0)).show()
+-------------+
| value|
+-------------+
|Name: Michael|
| Name: Andy|
| Name: Justin|
+-------------+
results.show
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
3.6 類型之間的轉換總結
RDD、DataFrame、Dataset 三者有許多共性,有各自適用的場景常常需要在三者之間轉換。
1、RDD -> DataFrame : rdd.map(para => (para(0).trim(), para(1).trim().toInt)).toDF("name", "age")
2、DataFrame -> RDD : df.rdd
注意輸出類型:res2: Array[org.apache.spark.sql.Row] = Array([Michael,29], [Andy,30], [Justin,19])
1、RDD -> DataSet : rdd.map(para => Person(para(0).trim(), para(1).trim().toInt)).toDS
2、DataSet -> RDD : ds.rdd
注意輸出類型:res5: Array[Person] = Array(Person(Michael,29), Person(Andy,30), Person(Justin,19))
1、DataFrame -> DataSet : df.as[Person]
2、DataSet -> DataFrame : ds.toDF
小結:
DataFrame/Dataset 轉 RDD:
val rdd1 = testDF.rdd
val rdd2 = testDS.rdd
RDD 轉 DataFrame:
import spark.implicits._
val testDF = rdd.map { line =>
(line._1, line._2)
}.toDF("col1", "col2")
一般用元組把一行的數據寫在一起,然后在 toDF 中指定字段名。
RDD 轉 DataSet:
import spark.implicits._
case class Coltest(col1:String, col2:Int) extends Serializable // 定義字段名和類型
val testDS = rdd.map { line =>
Coltest(line._1, line._2)
}.toDS
可以注意到,定義每一行的類型 case class 時,已經給出了字段名和類型,后面只要往 case class 里面添加值即可。
Dataset 轉 DataFrame:
這個也很簡單,因為只是把 case class 封裝成 Row。
import spark.implicits._
val testDF = testDS.toDF
DataFrame 轉 DataSet:
import spark.implicits._
case class Coltest(col1:String, col2:Int) extends Serializable // 定義字段名和類型
val testDS = testDF.as[Coltest]
這種方法就是在給出每一列的類型后,使用 as 方法,轉成 DataSet,這在數據類型是 DataFrame 又需要針對各個字段處理時極為方便。
在使用一些特殊的操作時,一定要加上 import spark.implicits._ 不然 toDF、toDS 無法使用。
3.7 用戶自定義函數
通過 spark.udf 功能用戶可以自定義函數。
3.7.1 用戶自定義 UDF 函數
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show()
+---+-------+
|age| name|
+---+-------+
| 30|Michael|
| 30| Andy|
| 19| Justin|
+---+-------+
scala> spark.udf.register("addName", (x: String) => "Name:" + x)
res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
scala> df.createOrReplaceTempView("people")
scala> spark.sql("select addName(name), age from people").show()
+-----------------+---+
|UDF:addName(name)|age|
+-----------------+---+
| Name:Michael| 30|
| Name:Andy| 30|
| Name:Justin| 19|
+-----------------+---+
scala> spark.sql("select addName(name) as newName, age from people").show()
+------------+---+
| newName|age|
+------------+---+
|Name:Michael| 30|
| Name:Andy| 30|
| Name:Justin| 19|
+------------+---+
3.7.2 用戶自定義 UDAF 函數(即聚合函數)
強類型的 Dataset 和弱類型的 DataFrame 都提供了相關的聚合函數,如 count(),countDistinct(),avg(),max(),min()。除此之外,用戶可以設定自己的自定義聚合函數。
弱類型用戶自定義聚合函數
通過繼承 UserDefinedAggregateFunction 來實現用戶自定義聚合函數。下面展示一個求平均工資的自定義聚合函數:
package com.atguigu.spark
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
object MyAverage extends UserDefinedAggregateFunction {
// 聚合函數輸入參數的數據類型
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil) // :: 用於的是向隊列的頭部追加數據,產生新的列表
// 聚合緩沖區中值的數據類型
def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) // Nil 是一個空的 List,定義為 List[Nothing]
}
// 返回值的數據類型
def dataType: DataType = DoubleType
// 對於相同的輸入是否一直返回相同的輸出
def deterministic: Boolean = true
// 初始化
def initialize(buffer: MutableAggregationBuffer): Unit = {
// 存工資的總額
buffer(0) = 0L
// 存工資的個數
buffer(1) = 0L
}
// 相同 Execute 間的數據合並(同一分區)
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
// 不同 Execute 間的數據合並(不同分區)
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// 計算最終結果
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
def main(args: Array[String]) {
// 創建 SparkSession 並設置 App 名稱
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
// .config("spark.some.config.option", "some-value")
.master("local[*]") // 本地測試
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
spark.udf.register("myAverage", MyAverage)
// val df = spark.read.json("examples/src/main/resources/employees.json")
val df = spark.read.json("D:\\learn\\JetBrains\\workspace_idea\\spark\\doc\\employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+
spark.stop()
}
}
強類型用戶自定義聚合函數
通過繼承 Aggregator 來實現強類型自定義聚合函數,同樣是求平均工資:
package com.atguigu.spark
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
// 既然是強類型,可能有 case 類
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
// 其中 Employee 是在應用聚合函數的時候傳入的對象,Average 是聚合函數在運行的時候內部需要的數據結構,Double 是聚合函數最終需要輸出的類型
object MyAverage extends Aggregator[Employee, Average, Double] {
// 定義一個數據結構,保存工資總數和工資總個數,初始都為0
def zero: Average = Average(0L, 0L)
// 相同 Execute 間的數據合並(同一分區)
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// 聚合不同 Execute 的結果
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// 計算最終結果
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// 設定之間值類型的編碼器,要轉換成 case 類
// Encoders.product 是進行 scala 元組和 case 類轉換的編碼器
def bufferEncoder: Encoder[Average] = Encoders.product
// 設定最終輸出值的編碼器
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
def main(args: Array[String]) {
// 創建 SparkSession 並設置 App 名稱
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
// .config("spark.some.config.option", "some-value")
.master("local[*]") // 本地測試
.getOrCreate()
import spark.implicits._
// For implicit conversions like converting RDDs to DataFrames
// val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
val ds = spark.read.json("D:\\learn\\JetBrains\\workspace_idea\\spark\\doc\\employees.json").as[Employee]
ds.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+
spark.stop()
}
}
第4章 Spark SQL 數據源
4.1 通用加載/保存方法
4.1.1 手動指定選項
Spark SQL 的 DataFrame 接口支持多種數據源的操作。一個 DataFrame 可以進行 RDDs 方式的操作,也可以被注冊為臨時表。把 DataFrame 注冊為臨時表之后,就可以對該 DataFrame 執行 SQL 查詢。
Spark SQL 的默認數據源為 Parquet 格式。數據源為 Parquet 文件時,Spark SQL 可以方便的執行所有的操作。修改配置項 spark.sql.sources.default,可修改默認數據源格式。示例代碼如下:
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
當數據源格式不是 parquet 格式文件時,需要手動指定數據源的格式。數據源格式需要指定全名(例如:org.apache.spark.sql.parquet),如果數據源格式為內置格式,則只需要指定簡稱定 json, parquet, jdbc, orc, libsvm, csv, text 來指定數據的格式。
可以通過 SparkSession 提供的 read.load 方法用於通用加載數據,使用 write 和 save 保存數據。 示例代碼如下:
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.write.format("parquet").save("hdfs://hadoop102:9000/namesAndAges.parquet")
除此之外,還可以直接運行 SQL 在文件上。示例代碼如下:
val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://hadoop102:9000/namesAndAges.parquet`")
sqlDF.show()
spark-shell 下的演示代碼:
scala> val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json") // Spark SQL 的通用輸入模式
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> peopleDF.write.format("parquet").save("hdfs://hadoop102:9000/namesAndAges.parquet") // Spark SQL 的通用輸出模式
scala> peopleDF.show()
+---+-------+
|age| name|
+---+-------+
| 30|Michael|
| 30| Andy|
| 19| Justin|
+---+-------+
scala> val peopleDF = spark.read.json("examples/src/main/resources/people.json") // Spark SQL 的專業輸入模式
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> peopleDF.write.parquet("hdfs://hadoop102:9000/namesAndAges.parquet") // Spark SQL 的專業輸出模式
scala> peopleDF.show()
+---+-------+
|age| name|
+---+-------+
| 30|Michael|
| 30| Andy|
| 19| Justin|
+---+-------+
scala> val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://hadoop102:9000/namesAndAges.parquet`")
19/04/27 21:32:55 WARN ObjectStore: Failed to get database parquet, returning NoSuchObjectException
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> sqlDF.show()
+---+-------+
|age| name|
+---+-------+
| 30|Michael|
| 30| Andy|
| 19| Justin|
+---+-------+
4.1.2 文件保存選項
可以采用 SaveMode 執行存儲操作,SaveMode 定義了對數據的處理模式。需要注意的是,
這些保存模式不使用任何鎖定,不是原子操作。此外,當使用 Overwrite 方式執行時,在輸出新數據之前原數據就已經被刪除。
SaveMode 詳細介紹如下表:![]()
4.2 Parquet 文件
Parquet 是一種流行的列式存儲格式,可以高效地存儲具有嵌套字段的記錄。
![]()
4.2.1 Parquet 讀寫
Parquet 格式經常在 Hadoop 生態圈中被使用,它也支持 Spark SQL 的全部數據類型。Spark SQL 提供了直接讀取和存儲 Parquet 格式文件的方法。示例代碼如下:
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("hdfs://hadoop102:9000/people.parquet")
// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("hdfs://hadoop102:9000/people.parquet")
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 25")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
4.2.2 解析分區信息
對表進行分區是對數據進行優化的方式之一。在分區的表內,數據通過分區列將數據存儲在不同的目錄下。Parquet 數據源現在能夠自動發現並解析分區信息。例如,對人口數據進行分區存儲,分區列為 gender 和 country,使用下面的目錄結構:
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...
通過傳遞 path/to/table 給 SQLContext.read.parquet 或 SQLContext.read.load,Spark SQL 將自動解析分區信息。返回的 DataFrame 的 Schema 如下:
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
需要注意的是,
數據的分區列的數據類型是自動解析的。當前,支持數值類型和字符串類型。自動解析分區類型的參數為:spark.sql.sources.partitionColumnTypeInference.enabled,默認值為 true。如果想關閉該功能,直接將該參數設置為 disabled。此時,分區列數據格式將被默認設置為 String 類型,不再進行類型解析。
4.2.3 Schema 合並
像 ProtocolBuffer、Avro 和 Thrift 那樣,Parquet 也支持 Schema evolution(Schema 演變)。用戶可以先定義一個簡單的 Schema,然后逐漸的向 Schema 中增加列描述。通過這種方式,用戶可以獲取多個有不同 Schema 但相互兼容的 Parquet 文件。現在 Parquet 數據源能自動檢測這種情況,並合並這些文件的 schemas。
因為 Schema 合並是一個高消耗的操作,在大多數情況下並不需要,所以 Spark SQL 從 1.5.0 開始默認關閉了該功能。可以通過下面兩種方式開啟該功能:
當數據源為 Parquet 文件時,將數據源選項 mergeSchema 設置為 true。
設置全局 SQL 選項 spark.sql.parquet.mergeSchema 為 true。
示例代碼如下:
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("hdfs://hadoop102:9000/data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("hdfs://hadoop102:9000/data/test_table/key=2")
// Read the partitioned table
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("hdfs://hadoop102:9000/data/test_table")
df3.printSchema()
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)
4.3 Hive 數據庫
Apache Hive 是 Hadoop 上的 SQL 引擎,Spark SQL 編譯時可以包含 Hive 支持,也可以不包含。包含 Hive 支持的 Spark SQL 可以支持 Hive 表訪問、UDF(用戶自定義函數) 以及 Hive 查詢語言 (HiveQL/HQL) 等。需要強調的一點是,
如果要在 Spark SQL 中包含 Hive 的庫,並不需要事先安裝 Hive。一般來說,最好還是在編譯 Spark SQL 時引入 Hive 支持,這樣就可以使用這些特性了。如果你下載的是二進制版本的 Spark,它應該已經在編譯時添加了對 Hive 支持。
若要把 Spark SQL 連接到一個部署好的 Hive 上,你必須把 hive-site.xml 復制到 Spark 的配置文件目錄中($SPARK_HOME/conf)。即使沒有部署好 Hive,Spark SQL 也可以運行。需要注意的是,如果你沒有部署好 Hive,Spark SQL 會在當前的工作目錄中創建出自己的 Hive 元數據倉庫,叫作 metastore_db。此外,如果你嘗試使用 HiveQL 中的 CREATE TABLE (並非 CREATE EXTERNAL TABLE) 語句來創建表,這些表會被放在你默認的文件系統中的/user/hive/warehouse目錄中 (如果你的 classpath 中有配好的 hdfs-site.xml,默認的文件系統就是 HDFS,否則就是本地文件系統)。示例代碼如下:
import java.io.File
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
case class Record(key: Int, value: String)
// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ......
// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ......
// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// ......
4.3.1 內嵌 Hive 應用
先做兩個准備工作:
(1)為了方便以后的操作,我們先將 /opt/module/hive/conf 目錄下的 hive-site.xml 和 /opt/module/hadoop-2.7.2/etc/hadoop 目錄下的 core-site.xml、hdfs-site.xml 拷貝至 /opt/module/spark-2.1.1-bin-hadoop2.7 目錄下,然后分發至其他機器節點。以后我們就操作 /opt/module/spark-2.1.1-bin-hadoop2.7 目錄下的文件就好了!
(2)由於我們使用 /opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-sql 和 /opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-shell 時打出的日志很多,影響觀看,所以我們修改下日志的輸出級別 INFO 為 WARN,然后分發至其他機器節點。
[atguigu@hadoop102 conf]$ pwd
/opt/module/spark-2.1.1-bin-hadoop2.7/conf
[atguigu@hadoop102 conf]$ cp log4j.properties.template log4j.properties
[atguigu@hadoop102 conf]$ vim log4j.properties
[atguigu@hadoop102 conf]$ xsync log4j.properties
將 log4j.rootCategory=INFO, console 修改為 log4j.rootCategory=WARN, console
1、Spark 內置有 Hive,Spark 2.1.1 內置的 Hive 是 1.2.1。
2、如果要使用內嵌的 Hive,什么都不用做,直接用就可以了。但是呢,此時的我們只能創建表,且表放在本地的 spark-warehouse 目錄中,如果查詢表的話會報錯,原因是:本地有 spark-warehouse 目錄,而其他機器節點沒有 spark-warehouse 目錄,自然無法訪問表了。
解決辦法如下:需要將 core-site.xml 和 hdfs-site.xml 拷貝到 spark 的 conf 目錄下,然后分發至其他機器節點。如果 spark 路徑下發現有 metastore_db 和 spark-warehouse,刪除掉。然后重啟集群。
3、在你第一次啟動創建 metastore 的時候,你需要指定 spark.sql.warehouse.dir 這個參數(Spark 2.x 版本的新內容):
比如:bin/spark-shell --conf spark.sql.warehouse.dir=hdfs://hadoop102:9000/spark_warehouse,之后我們再使用 bin/spark-sql 就可以了。此時我們創建的表放在 HDFS 集群上,那么就可以查詢表了。
4、注意:如果在 load 數據的時候,需要將數據放到 HDFS 上。
4.3.2 外部 Hive 應用
如果想連接外部已經部署好的 Hive,需要通過以下幾個步驟:
1) 將 Hive 中的 hive-site.xml 拷貝或者軟連接到 Spark 安裝目錄下的 conf 目錄下。
2) 打開 spark-shell,注意帶上訪問 Hive 元數據庫的 JDBC 客戶端 或者 如果 hive 的 metestore 使用的是 mysql 數據庫,那么需要將 mysql 的 jdbc 驅動包放到 spark 的 jars 目錄下。
$ bin/spark-shell --master spark://hadoop102:7077 --jars mysql-connector-java-5.1.27-bin.jar
4.4 JSON 數據集
Spark SQL 能夠自動推測 JSON 數據集的結構,並將它加載為一個 Dataset[Row]. 可以通過 SparkSession.read.json() 去加載一個 Dataset[String] 或者一個 JSON 文件。
注意:這個 JSON 文件不是一個傳統的 JSON 文件,每一行都得是一個 JSON 串。
示例 JSON 文件如下:
{"name":"Michael", "age":30}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
示例代碼如下:
// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)
// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// | name|
// +------+
// |Justin|
// +------+
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
4.5 JDBC
Spark SQL 可以通過 JDBC 從關系型數據庫中讀取數據的方式創建 DataFrame,通過對 DataFrame 一系列的計算后,還可以將數據再寫回關系型數據庫中。
注意:需要將相關的數據庫驅動放到 spark 的類路徑下。
$ bin/spark-shell --master spark://hadoop102:7077 --jars mysql-connector-java-5.1.27-bin.jar
示例代碼:
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop102:3306/mysql").option("dbtable", "db").option("user", "root").option("password", "123456").load()
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "hive")
val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop102:3306/mysql", "db", connectionProperties)
// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/mysql")
.option("dbtable", "db")
.option("user", "root")
.option("password", "123456")
.save()
jdbcDF2.write.jdbc("jdbc:mysql://hadoop102:3306/mysql", "db", connectionProperties)
// Specifying create table column data types on write
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:mysql://hadoop102:3306/mysql", "db", connectionProperties)
第5章 JDBC/ODBC 服務器
Spark SQL 也提供 JDBC 連接支持,這對於讓商業智能(BI)工具連接到 Spark 集群上以及在多用戶間共享一個集群的場景都非常有用。JDBC 服務器作為一個獨立的 Spark 驅動器程序運行,可以在多用戶之間共享。任意一個客戶端都可以在內存中緩存數據表,對表進行查詢。集群的資源以及緩存數據都在所有用戶之間共享。
Spark SQL 的 JDBC 服務器與 Hive 中的 HiveServer2 相一致。由於使用了 Thrift 通信協議,它也被稱為 “Thrift server”。
服務器可以通過 Spark 目錄中的 sbin/start-thriftserver.sh 啟動。這個 腳本接受的參數選項大多與 spark-submit 相同。默認情況下,服務器會在 localhost:10000 上進行監聽,我們可以通過環境變量(HIVE_SERVER2_THRIFT_PORT和HIVE_SERVER2_THRIFT_BIND_HOST)修改這些設置,也可以通過 Hive 配置選項( hive.server2.thrift.port 和 hive.server2.thrift.bind.host )來修改。你也可以通過命令行參數 --hiveconf property=value 來設置 Hive 選項。
./sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=<listening-port> \
--hiveconf hive.server2.thrift.bind.host=<listening-host> \
--master <master-uri>
......
./bin/beeline
beeline> !connect jdbc:hive2://hadoop102:10000
在 Beeline 客戶端中,你可以使用標准的 HiveQL 命令來創建、列舉以及查詢數據表。參考文檔鏈接:https://www.cnblogs.com/chenmingjun/p/10428809.html#_label1_5
Hive 的 JDBC 訪問:
[atguigu@hadoop102 spark-2.1.1-bin-hadoop2.7]$ pwd
/opt/module/spark-2.1.1-bin-hadoop2.7
[atguigu@hadoop102 spark-2.1.1-bin-hadoop2.7]$ ./sbin/start-thriftserver.sh
starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /opt/module/spark-2.1.1-bin-hadoop2.7/logs/spark-atguigu-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-hadoop102.out
[atguigu@hadoop102 spark-2.1.1-bin-hadoop2.7]$ ./bin/beeline
Beeline version 1.2.1.spark2 by Apache Hive
beeline> !connect jdbc:hive2://hadoop102:10000
Connecting to jdbc:hive2://hadoop102:10000(回車)
Enter username for jdbc:hive2://hadoop102:10000: atguigu(回車)
Enter password for jdbc:hive2://hadoop102:10000: (直接回車)
Connected to: Spark SQL (version 2.1.1)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://hadoop102:10000> show tables;
+-----------+---------------------+--------------+--+
| database | tableName | isTemporary |
+-----------+---------------------+--------------+--+
| default | event_logs20151220 | false |
| default | student22 | false |
+-----------+---------------------+--------------+--+
2 rows selected (0.519 seconds)
0: jdbc:hive2://hadoop102:10000>
第6章 運行 Spark SQL CLI
Spark SQL CLI 可以很方便的在本地運行 Hive 元數據服務以及從命令行執行查詢任務。需要注意的是,Spark SQL CLI 不能與 Thrift JDBC 服務交互。
在 Spark 目錄下執行如下命令啟動 Spark SQL CLI:
[atguigu@hadoop102 spark-2.1.1-bin-hadoop2.7]$ pwd
/opt/module/spark-2.1.1-bin-hadoop2.7
[atguigu@hadoop102 spark-2.1.1-bin-hadoop2.7]$ ./bin/spark-sql
如下圖所示:
配置外部 Hive 需要替換 conf/ 下的 hive-site.xml 。
第7章 Spark SQL 實戰
7.1 數據說明
數據集是貨品交易數據集。
每個訂單可能包含多個貨品,每個訂單可以產生多次交易,不同的貨品有不同的單價。
7.2 加載數據
tbStock:
scala> case class tbStock(ordernumber: String, locationid: String, dateid: String) extends Serializable
defined class tbStock
scala> val tbStockRdd = spark.sparkContext.textFile("tbStock.txt")
tbStockRdd: org.apache.spark.rdd.RDD[String] = tbStock.txt MapPartitionsRDD[1] at textFile at <console>:23
scala> val tbStockDS = tbStockRdd.map(_.split(",")).map(attr => tbStock(attr(0), attr(1), attr(2))).toDS
tbStockDS: org.apache.spark.sql.Dataset[tbStock] = [ordernumber: string, locationid: string ... 1 more field]
scala> tbStockDS.show()
+------------+----------+---------+
| ordernumber|locationid| dataid|
+------------+----------+---------+
|BYSL00000893| ZHAO|2007-8-23|
|BYSL00000897| ZHAO|2007-8-24|
|BYSL00000898| ZHAO|2007-8-25|
|BYSL00000899| ZHAO|2007-8-26|
|BYSL00000900| ZHAO|2007-8-26|
|BYSL00000901| ZHAO|2007-8-27|
|BYSL00000902| ZHAO|2007-8-27|
|BYSL00000904| ZHAO|2007-8-28|
|BYSL00000905| ZHAO|2007-8-28|
|BYSL00000906| ZHAO|2007-8-28|
|BYSL00000907| ZHAO|2007-8-29|
|BYSL00000908| ZHAO|2007-8-30|
|BYSL00000909| ZHAO| 2007-9-1|
|BYSL00000910| ZHAO| 2007-9-1|
|BYSL00000911| ZHAO|2007-8-31|
|BYSL00000912| ZHAO| 2007-9-2|
|BYSL00000913| ZHAO| 2007-9-3|
|BYSL00000914| ZHAO| 2007-9-3|
|BYSL00000915| ZHAO| 2007-9-4|
|BYSL00000916| ZHAO| 2007-9-4|
+------------+----------+---------+
only showing top 20 rows
tbStockDetail:
scala> case class tbStockDetail(ordernumber: String, rownum: Int, itemid: String, number: Int, price: Double, amount: Double) extends Serializable
defined class tbStockDetail
scala> val tbStockDetailRdd = spark.sparkContext.textFile("tbStockDetail.txt")
tbStockDetailRdd: org.apache.spark.rdd.RDD[String] = tbStockDetail.txt MapPartitionsRDD[13] at textFile at <console>:23
scala> val tbStockDetailDS = tbStockDetailRdd.map(_.split(",")).map(attr => tbStockDetail(attr(0), attr(1).trim().toInt, attr(2), attr(3).trim().toInt, attr(4).trim().toDouble, attr(5).trim().toDouble)).toDS
tbStockDetailDS: org.apache.spark.sql.Dataset[tbStockDetail] = [ordernumber: string, rownum: int ... 4 more fields]
scala> tbStockDetailDS.show()
+------------+------+--------------+------+-----+------+
| ordernumber|rownum| itemid|number|price|amount|
+------------+------+--------------+------+-----+------+
|BYSL00000893| 0|FS527258160501| -1|268.0|-268.0|
|BYSL00000893| 1|FS527258169701| 1|268.0| 268.0|
|BYSL00000893| 2|FS527230163001| 1|198.0| 198.0|
|BYSL00000893| 3|24627209125406| 1|298.0| 298.0|
|BYSL00000893| 4|K9527220210202| 1|120.0| 120.0|
|BYSL00000893| 5|01527291670102| 1|268.0| 268.0|
|BYSL00000893| 6|QY527271800242| 1|158.0| 158.0|
|BYSL00000893| 7|ST040000010000| 8| 0.0| 0.0|
|BYSL00000897| 0|04527200711305| 1|198.0| 198.0|
|BYSL00000897| 1|MY627234650201| 1|120.0| 120.0|
|BYSL00000897| 2|01227111791001| 1|249.0| 249.0|
|BYSL00000897| 3|MY627234610402| 1|120.0| 120.0|
|BYSL00000897| 4|01527282681202| 1|268.0| 268.0|
|BYSL00000897| 5|84126182820102| 1|158.0| 158.0|
|BYSL00000897| 6|K9127105010402| 1|239.0| 239.0|
|BYSL00000897| 7|QY127175210405| 1|199.0| 199.0|
|BYSL00000897| 8|24127151630206| 1|299.0| 299.0|
|BYSL00000897| 9|G1126101350002| 1|158.0| 158.0|
|BYSL00000897| 10|FS527258160501| 1|198.0| 198.0|
|BYSL00000897| 11|ST040000010000| 13| 0.0| 0.0|
+------------+------+--------------+------+-----+------+
only showing top 20 rows
tbDate:
scala> case class tbDate(dateid: String, years: Int, theyear: Int, month: Int, day: Int, weekday: Int, week: Int, quarter: Int, period: Int, halfmonth: Int) extends Serializable
defined class tbDate
scala> val tbDateRdd = spark.sparkContext.textFile("tbDate.txt")
tbDateRdd: org.apache.spark.rdd.RDD[String] = tbDate.txt MapPartitionsRDD[20] at textFile at <console>:23
scala> val tbDateDS = tbDateRdd.map(_.split(",")).map(attr => tbDate(attr(0), attr(1).trim().toInt, attr(2).trim().toInt, attr(3).trim().toInt, attr(4).trim().toInt, attr(5).trim().toInt, attr(6).trim().toInt, attr(7).trim().toInt, attr(8).trim().toInt, attr(9).trim().toInt)).toDS
tbDateDS: org.apache.spark.sql.Dataset[tbDate] = [dateid: string, years: int ... 8 more fields]
scala> tbDateDS.show()
+---------+------+-------+-----+---+-------+----+-------+------+---------+
| dateid| years|theyear|month|day|weekday|week|quarter|period|halfmonth|
+---------+------+-------+-----+---+-------+----+-------+------+---------+
| 2003-1-1|200301| 2003| 1| 1| 3| 1| 1| 1| 1|
| 2003-1-2|200301| 2003| 1| 2| 4| 1| 1| 1| 1|
| 2003-1-3|200301| 2003| 1| 3| 5| 1| 1| 1| 1|
| 2003-1-4|200301| 2003| 1| 4| 6| 1| 1| 1| 1|
| 2003-1-5|200301| 2003| 1| 5| 7| 1| 1| 1| 1|
| 2003-1-6|200301| 2003| 1| 6| 1| 2| 1| 1| 1|
| 2003-1-7|200301| 2003| 1| 7| 2| 2| 1| 1| 1|
| 2003-1-8|200301| 2003| 1| 8| 3| 2| 1| 1| 1|
| 2003-1-9|200301| 2003| 1| 9| 4| 2| 1| 1| 1|
|2003-1-10|200301| 2003| 1| 10| 5| 2| 1| 1| 1|
|2003-1-11|200301| 2003| 1| 11| 6| 2| 1| 2| 1|
|2003-1-12|200301| 2003| 1| 12| 7| 2| 1| 2| 1|
|2003-1-13|200301| 2003| 1| 13| 1| 3| 1| 2| 1|
|2003-1-14|200301| 2003| 1| 14| 2| 3| 1| 2| 1|
|2003-1-15|200301| 2003| 1| 15| 3| 3| 1| 2| 1|
|2003-1-16|200301| 2003| 1| 16| 4| 3| 1| 2| 2|
|2003-1-17|200301| 2003| 1| 17| 5| 3| 1| 2| 2|
|2003-1-18|200301| 2003| 1| 18| 6| 3| 1| 2| 2|
|2003-1-19|200301| 2003| 1| 19| 7| 3| 1| 2| 2|
|2003-1-20|200301| 2003| 1| 20| 1| 4| 1| 2| 2|
+---------+------+-------+-----+---+-------+----+-------+------+---------+
only showing top 20 rows
注冊為臨時表:
scala> tbStockDS.createOrReplaceTempView("tbStock")
scala> tbDateDS.createOrReplaceTempView("tbDate")
scala> tbStockDetailDS.createOrReplaceTempView("tbStockDetail")
7.3 計算所有訂單中每年的銷售單數、銷售總額
統計所有訂單中每年的銷售單數、銷售總額
三個表連接后以 count(distinct a.ordernumber) 計銷售單數,以 sum(b.amount) 計銷售總額
示例代碼:
SELECT c.theyear, COUNT(DISTINCT a.ordernumber), SUM(b.amount)
FROM tbStock a
JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
JOIN tbDate c ON a.dateid = c.dateid
GROUP BY c.theyear
ORDER BY c.theyear
spark.sql("SELECT c.theyear, COUNT(DISTINCT a.ordernumber), SUM(b.amount) FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear ORDER BY c.theyear").show
輸出結果如下:
+-------+---------------------------+--------------------+
|theyear|count(DISTINCT ordernumber)| sum(amount)|
+-------+---------------------------+--------------------+
| 2004| 1094| 3268115.499199999|
| 2005| 3828|1.3257564149999991E7|
| 2006| 3772|1.3680982900000006E7|
| 2007| 4885|1.6719354559999993E7|
| 2008| 4861| 1.467429530000001E7|
| 2009| 2619| 6323697.189999999|
| 2010| 94| 210949.65999999997|
+-------+---------------------------+--------------------+
7.4 計算所有訂單每年最大金額訂單的銷售額
目標:統計每年最大金額訂單的銷售額。
第一步、統計每年,每個訂單一共有多少銷售額
示例代碼:
SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount
FROM tbStock a
JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
GROUP BY a.dateid, a.ordernumber
spark.sql("SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber GROUP BY a.dateid, a.ordernumber").show
輸出結果如下:
+----------+------------+------------------+
| dateid| ordernumber| SumOfAmount|
+----------+------------+------------------+
| 2008-4-9|BYSL00001175| 350.0|
| 2008-5-12|BYSL00001214| 592.0|
| 2008-7-29|BYSL00011545| 2064.0|
| 2008-9-5|DGSL00012056| 1782.0|
| 2008-12-1|DGSL00013189| 318.0|
|2008-12-18|DGSL00013374| 963.0|
| 2009-8-9|DGSL00015223| 4655.0|
| 2009-10-5|DGSL00015585| 3445.0|
| 2010-1-14|DGSL00016374| 2934.0|
| 2006-9-24|GCSL00000673|3556.1000000000004|
| 2007-1-26|GCSL00000826| 9375.199999999999|
| 2007-5-24|GCSL00001020| 6171.300000000002|
| 2008-1-8|GCSL00001217| 7601.6|
| 2008-9-16|GCSL00012204| 2018.0|
| 2006-7-27|GHSL00000603| 2835.6|
|2006-11-15|GHSL00000741| 3951.94|
| 2007-6-6|GHSL00001149| 0.0|
| 2008-4-18|GHSL00001631| 12.0|
| 2008-7-15|GHSL00011367| 578.0|
| 2009-5-8|GHSL00014637| 1797.6|
+----------+------------+------------------+
第二步、以上一步查詢結果為基礎表,和表 tbDate 使用 dateid join,求出每年最大金額訂單的銷售額
示例代碼:
SELECT theyear, MAX(c.SumOfAmount) AS SumOfAmount
FROM (SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount
FROM tbStock a
JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
GROUP BY a.dateid, a.ordernumber
) c
JOIN tbDate d ON c.dateid = d.dateid
GROUP BY theyear
ORDER BY theyear DESC
spark.sql("SELECT theyear, MAX(c.SumOfAmount) AS SumOfAmount FROM (SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber GROUP BY a.dateid, a.ordernumber ) c JOIN tbDate d ON c.dateid = d.dateid GROUP BY theyear ORDER BY theyear DESC").show
輸出結果如下:
+-------+------------------+
|theyear| SumOfAmount|
+-------+------------------+
| 2010|13065.280000000002|
| 2009|25813.200000000008|
| 2008| 55828.0|
| 2007| 159126.0|
| 2006| 36124.0|
| 2005|38186.399999999994|
| 2004| 23656.79999999997|
+-------+------------------+
7.5 計算所有訂單中每年最暢銷貨品
目標:統計每年最暢銷貨品(哪個貨品銷售額 amount 在當年最高,哪個就是最暢銷貨品)。
第一步、求出每年每個貨品的銷售額
示例代碼:
SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount
FROM tbStock a
JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
JOIN tbDate c ON a.dateid = c.dateid
GROUP BY c.theyear, b.itemid
spark.sql("SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid").show
輸出結果如下:
+-------+--------------+------------------+
|theyear| itemid| SumOfAmount|
+-------+--------------+------------------+
| 2004|43824480810202| 4474.72|
| 2006|YA214325360101| 556.0|
| 2006|BT624202120102| 360.0|
| 2007|AK215371910101|24603.639999999992|
| 2008|AK216169120201|29144.199999999997|
| 2008|YL526228310106|16073.099999999999|
| 2009|KM529221590106| 5124.800000000001|
| 2004|HT224181030201|2898.6000000000004|
| 2004|SG224308320206| 7307.06|
| 2007|04426485470201|14468.800000000001|
| 2007|84326389100102| 9134.11|
| 2007|B4426438020201| 19884.2|
| 2008|YL427437320101|12331.799999999997|
| 2008|MH215303070101| 8827.0|
| 2009|YL629228280106| 12698.4|
| 2009|BL529298020602| 2415.8|
| 2009|F5127363019006| 614.0|
| 2005|24425428180101| 34890.74|
| 2007|YA214127270101| 240.0|
| 2007|MY127134830105| 11099.92|
+-------+--------------+------------------+
第二步、在第一步的基礎上,統計每年單個貨品中的最大金額
示例代碼:
SELECT d.theyear, MAX(d.SumOfAmount) AS MaxOfAmount
FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount
FROM tbStock a
JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
JOIN tbDate c ON a.dateid = c.dateid
GROUP BY c.theyear, b.itemid
) d
GROUP BY d.theyear
spark.sql("SELECT d.theyear, MAX(d.SumOfAmount) AS MaxOfAmount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) d GROUP BY d.theyear").show
輸出結果如下:
+-------+------------------+
|theyear| MaxOfAmount|
+-------+------------------+
| 2007| 70225.1|
| 2006| 113720.6|
| 2004|53401.759999999995|
| 2009| 30029.2|
| 2005|56627.329999999994|
| 2010| 4494.0|
| 2008| 98003.60000000003|
+-------+------------------+
第三步、用最大銷售額和統計好的每個貨品的銷售額 join,以及用年 join,得到最暢銷貨品那一行信息
示例代碼:
SELECT DISTINCT e.theyear, e.itemid, f.MaxOfAmount
FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount
FROM tbStock a
JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
JOIN tbDate c ON a.dateid = c.dateid
GROUP BY c.theyear, b.itemid
) e
JOIN (SELECT d.theyear, MAX(d.SumOfAmount) AS MaxOfAmount
FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount
FROM tbStock a
JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
JOIN tbDate c ON a.dateid = c.dateid
GROUP BY c.theyear, b.itemid
) d
GROUP BY d.theyear
) f ON e.theyear = f.theyear
AND e.SumOfAmount = f.MaxOfAmount
ORDER BY e.theyear
spark.sql("SELECT DISTINCT e.theyear, e.itemid, f.maxofamount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS sumofamount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) e JOIN (SELECT d.theyear, MAX(d.sumofamount) AS maxofamount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS sumofamount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) d GROUP BY d.theyear ) f ON e.theyear = f.theyear AND e.sumofamount = f.maxofamount ORDER BY e.theyear").show
輸出結果如下:
+-------+--------------+------------------+
|theyear| itemid| maxofamount|
+-------+--------------+------------------+
| 2004|JY424420810101|53401.759999999995|
| 2005|24124118880102|56627.329999999994|
| 2006|JY425468460101| 113720.6|
| 2007|JY425468460101| 70225.1|
| 2008|E2628204040101| 98003.60000000003|
| 2009|YL327439080102| 30029.2|
| 2010|SQ429425090101| 4494.0|
+-------+--------------+------------------+
