目標1:掌握Spark SQL原理
目標2:掌握DataFrame/DataSet數據結構和使用方式
目標3:熟練使用Spark SQL完成計算任務
1. Spark SQL概述
1.1. Spark SQL的前世今生
Shark是一個為Spark設計的大規模數據倉庫系統,它與Hive兼容。Shark建立在Hive的代碼基礎上,並通過將Hive的部分物理執行計划交換出來。這個方法使得Shark的用戶可以加速Hive的查詢,但是Shark繼承了Hive的大且復雜的代碼使得Shark很難優化和維護,同時Shark依賴於Spark的版本。隨着我們遇到了性能優化的上限,以及集成SQL的一些復雜的分析功能,我們發現Hive的MapReduce設計的框架限制了Shark的發展。在2014年7月1日的Spark Summit上,Databricks宣布終止對Shark的開發,將重點放到Spark SQL上。
1.2. 什么是Spark SQL
Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫做DataFrame並且作為分布式SQL查詢引擎的作用。
相比於Spark RDD API,Spark SQL包含了對結構化數據和在其上運算的更多信息,Spark SQL使用這些信息進行了額外的優化,使對結構化數據的操作更加高效和方便。
有多種方式去使用Spark SQL,包括SQL、DataFrames API和Datasets API。但無論是哪種API或者是編程語言,它們都是基於同樣的執行引擎,因此你可以在不同的API之間隨意切換,它們各有各的特點,看你喜歡那種風格。
1.3. 為什么要學習Spark SQL
我們已經學習了Hive,它是將Hive SQL轉換成MapReduce然后提交到集群中去執行,大大簡化了編寫MapReduce程序的復雜性,由於MapReduce這種計算模型執行效率比較慢,所以Spark SQL應運而生,它是將Spark SQL轉換成RDD,然后提交到集群中去運行,執行效率非常快!
1.易整合
將sql查詢與spark程序無縫混合,可以使用java、scala、python、R等語言的API操作。
2.統一的數據訪問
以相同的方式連接到任何數據源。
3.兼容Hive
支持hiveSQL的語法。
4.標准的數據連接
可以使用行業標准的JDBC或ODBC連接。
2. DataFrame
2.1. 什么是DataFrame
DataFrame的前身是SchemaRDD,從Spark 1.3.0開始SchemaRDD更名為DataFrame。與SchemaRDD的主要區別是:DataFrame不再直接繼承自RDD,而是自己實現了RDD的絕大多數功能。你仍舊可以在DataFrame上調用rdd方法將其轉換為一個RDD。
在Spark中,DataFrame是一種以RDD為基礎的分布式數據集,類似於傳統數據庫的二維表格,DataFrame帶有Schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型,但底層做了更多的優化。DataFrame可以從很多數據源構建,比如:已經存在的RDD、結構化文件、外部數據庫、Hive表。
2.2. DataFrame與RDD的區別
RDD可看作是分布式的對象的集合,Spark並不知道對象的詳細模式信息,DataFrame可看作是分布式的Row對象的集合,其提供了由列組成的詳細模式信息(就是列的名稱和類型),使得Spark SQL可以進行某些形式的執行優化。DataFrame和普通的RDD的邏輯框架區別如下所示:
上圖直觀地體現了DataFrame和RDD的區別。
左側的RDD[Person]雖然以Person為類型參數,但Spark框架本身不了解 Person類的內部結構。
而右側的DataFrame卻提供了詳細的結構信息,使得Spark SQL可以清楚地知道該數據集中包含哪些列,每列的名稱和類型各是什么,DataFrame多了數據的結構信息,即schema。這樣看起來就像一張表了,DataFrame還配套了新的操作數據的方法,DataFrame API(如df.select())和SQL(select id, name from xx_table where ...)。
此外DataFrame還引入了off-heap,意味着JVM堆以外的內存, 這些內存直接受操作系統管理(而不是JVM)。Spark能夠以二進制的形式序列化數據(不包括結構)到off-heap中, 當要操作數據時, 就直接操作off-heap內存. 由於Spark理解schema, 所以知道該如何操作。
RDD是分布式的Java對象的集合。DataFrame是分布式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子以外,更重要的特點是提升執行效
率、減少數據讀取以及執行計划的優化。
有了DataFrame這個高一層的抽象后,我們處理數據更加簡單了,甚至可以用SQL來處理數據了,對開發者來說,易用性有了很大的提升。
不僅如此,通過DataFrame API或SQL處理數據,會自動經過Spark 優化器(Catalyst)的優化,即使你寫的程序或SQL不高效,也可以運行的很快。
2.3. DataFrame與RDD的優缺點
2.3.1RDD的優缺點:
優點:
(1)編譯時類型安全
編譯時就能檢查出類型錯誤
(2)面向對象的編程風格
直接通過對象調用方法的形式來操作數據
缺點:
(1)序列化和反序列化的性能開銷
無論是集群間的通信, 還是IO操作都需要對對象的結構和數據進行序列化和反序列化。
(2)GC的性能開銷
頻繁的創建和銷毀對象, 勢必會增加GC
2.3.2DataFrame的優缺點:
優點:
DataFrame通過引入schema和off-heap(不在堆里面的內存,指的是除了不在堆的內存,使用操作系統上的內存),解決了RDD的缺點, Spark通過schame就能夠讀懂數據, 因此在通信和IO時就只需要序列化和反序列化數據, 而結構的部分就可以省略了;
缺點:
通過off-heap引入,可以快速的操作數據,避免大量的GC。但是卻丟了RDD的優點,DataFrame不是類型安全的, API也不是面向對象風格的。
3.RDD、DataFrame、DataSet區別
在spark中,RDD、DataFrame、Dataset是最常用的數據類型,本博文給出筆者在使用的過程中體會到的區別和各自的優勢
3.1共性:
1、RDD、DataFrame、Dataset全都是spark平台下的分布式彈性數據集,為處理超大型數據提供便利
2、三者都有惰性機制,在進行創建、轉換,如map方法時,不會立即執行,只有在遇到Action如foreach時,三者才會開始遍歷運算,極端情況下,如果代碼里面有創建、轉換,但是后面沒有在Action中使用對應的結果,在執行時會被直接跳過,如
1 2 3 4 5 6 7 8 |
val sparkconf = new SparkConf().setMaster("local").setAppName("test").set("spark.port.maxRetries","1000") val spark = SparkSession.builder().config(sparkconf).getOrCreate() val rdd=spark.sparkContext.parallelize(Seq(("a", 1), ("b", 1), ("a", 1)))
rdd.map{line=> println("運行") line._1 } |
map中的println("運行")並不會運行
3、三者都會根據spark的內存情況自動緩存運算,這樣即使數據量很大,也不用擔心會內存溢出
4、三者都有partition的概念,如
1 2 3 4 5 6 7 8 |
var predata=data.repartition(24).mapPartitions{ PartLine => { PartLine.map{ line => println(“轉換操作”) } } } |
這樣對每一個分區進行操作時,就跟在操作數組一樣,不但數據量比較小,而且可以方便的將map中的運算結果拿出來,如果直接用map,map中對外面的操作是無效的,如
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
val rdd=spark.sparkContext.parallelize(Seq(("a", 1), ("b", 1), ("a", 1))) var flag=0 val test=rdd.map{line=> println("運行") flag+=1 println(flag) line._1 } println(test.count) println(flag) /** 運行 1 運行 2 運行 3 3 0 * */ |
不使用partition時,對map之外的操作無法對map之外的變量造成影響
5、三者有許多共同的函數,如filter,排序等
6、在對DataFrame和Dataset進行操作許多操作都需要這個包進行支持
1 2 |
import spark.implicits._ //這里的spark是SparkSession的變量名 |
7、DataFrame和Dataset均可使用模式匹配獲取各個字段的值和類型
DataFrame:
1 2 3 4 5 6 7 |
testDF.map{ case Row(col1:String,col2:Int)=> println(col1);println(col2) col1 case _=> "" } |
為了提高穩健性,最好后面有一個_通配操作,這里提供了DataFrame一個解析字段的方法
Dataset:
1 2 3 4 5 6 7 8 |
case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型 testDS.map{ case Coltest(col1:String,col2:Int)=> println(col1);println(col2) col1 case _=> "" } |
3.2區別:
RDD:
1、RDD一般和spark mlib同時使用
2、RDD不支持sparksql操作
DataFrame:
1、與RDD和Dataset不同,DataFrame每一行的類型固定為Row,只有通過解析才能獲取各個字段的值,如
1 2 3 4 5 |
testDF.foreach{ line => val col1=line.getAs[String]("col1") val col2=line.getAs[String]("col2") } |
每一列的值沒法直接訪問
2、DataFrame與Dataset一般與spark ml同時使用
3、DataFrame與Dataset均支持sparksql的操作,比如select,groupby之類,還能注冊臨時表/視窗,進行sql語句操作,如
1 2 |
dataDF.createOrReplaceTempView("tmp") spark.sql("select ROW,DATE from tmp where DATE is not null order by DATE").show(100,false) |
4、DataFrame與Dataset支持一些特別方便的保存方式,比如保存成csv,可以帶上表頭,這樣每一列的字段名一目了然
1 2 3 4 5 6 |
//保存 val saveoptions = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://172.xx.xx.xx:9000/test") datawDF.write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save() //讀取 val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://172.xx.xx.xx:9000/test") val datarDF= spark.read.options(options).format("com.databricks.spark.csv").load() |
利用這樣的保存方式,可以方便的獲得字段名和列的對應,而且分隔符(delimiter)可以自由指定
Dataset:
這里主要對比Dataset和DataFrame,因為Dataset和DataFrame擁有完全相同的成員函數,區別只是每一行的數據類型不同
DataFrame也可以叫Dataset[Row],每一行的類型是Row,不解析,每一行究竟有哪些字段,各個字段又是什么類型都無從得知,只能用上面提到的getAS方法或者共性中的第七條提到的模式匹配拿出特定字段
而Dataset中,每一行是什么類型是不一定的,在自定義了case class之后可以很自由的獲得每一行的信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型 /** rdd ("a", 1) ("b", 1) ("a", 1) * */ val test: Dataset[Coltest]=rdd.map{line=> Coltest(line._1,line._2) }.toDS test.map{ line=> println(line.col1) println(line.col2) } |
可以看出,Dataset在需要訪問列中的某個字段時是非常方便的,然而,如果要寫一些適配性很強的函數時,如果使用Dataset,行的類型又不確定,可能是各種case class,無法實現適配,這時候用DataFrame即Dataset[Row]就能比較好的解決問題
3.3轉化:
RDD、DataFrame、Dataset三者有許多共性,有各自適用的場景常常需要在三者之間轉換
DataFrame/Dataset轉RDD:
這個轉換很簡單
1 2 |
val rdd1=testDF.rdd val rdd2=testDS.rdd |
RDD轉DataFrame:
1 2 3 4 |
import spark.implicits._ val testDF = rdd.map {line=> (line._1,line._2) }.toDF("col1","col2") |
一般用元組把一行的數據寫在一起,然后在toDF中指定字段名
RDD轉Dataset:
1 2 3 4 5 |
import spark.implicits._ case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型 val testDS = rdd.map {line=> Coltest(line._1,line._2) }.toDS |
可以注意到,定義每一行的類型(case class)時,已經給出了字段名和類型,后面只要往case class里面添加值即可
Dataset轉DataFrame:
這個也很簡單,因為只是把case class封裝成Row
1 2 |
import spark.implicits._ val testDF = testDS.toDF |
DataFrame轉Dataset:
1 2 3 |
import spark.implicits._ case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型 val testDS = testDF.as[Coltest] |
這種方法就是在給出每一列的類型后,使用as方法,轉成Dataset,這在數據類型是DataFrame又需要針對各個字段處理時極為方便
特別注意:
在使用一些特殊的操作時,一定要加上 import spark.implicits._ 不然toDF、toDS無法使用
4. 讀取數據源創建DataFrame
4.1 讀取文本文件創建DataFrame
在spark2.0版本之前,Spark SQL中SQLContext是創建DataFrame和執行SQL的入口,可以利用hiveContext通過hive sql語句操作hive表數據,兼容hive操作,並且hiveContext繼承自SQLContext。在spark2.0之后,這些都統一於SparkSession,SparkSession 封裝了 SparkContext,SqlContext,通過SparkSession可以獲取到SparkConetxt,SqlContext對象。
(1)在本地創建一個文件,有三列,分別是id、name、age,用空格分隔,然后上傳到hdfs上。person.txt內容為:
1 zhangsan 20 2 lisi 29 3 wangwu 25 4 zhaoliu 30 5 tianqi 35 6 kobe 40 |
上傳數據文件到HDFS上:
hdfs dfs -put person.txt /
先執行 spark-shell --master local[2]
(2)在spark shell執行下面命令,讀取數據,將每一行的數據使用列分隔符分割
val lineRDD= sc.textFile("/person.txt").map(_.split(" "))
(3)定義case class(相當於表的schema)
case class Person(id:Int, name:String, age:Int)
(4)將RDD和case class關聯
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
(5)將RDD轉換成DataFrame
val personDF = personRDD.toDF
(6)對DataFrame進行處理
personDF.show
personDF.printSchema
(7)不使用樣例類構建DF,自定義schema
scala> val peopleDF = sc.textFile("/people.txt").map(_.split(" ")).map(x=>(x(0),x(1),x(2))).toDF("id","name","age")
peopleDF: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]
(8)通過SparkSession構建DataFrame
使用spark-shell中已經初始化好的SparkSession對象spark生成DataFrame
val dataFrame=spark.read.text("/person.txt")
4.2 讀取json文件創建DataFrame
(1)數據文件
使用spark安裝包下的
/opt/bigdata/spark/examples/src/main/resources/people.json文件
(2)在spark shell執行下面命令,讀取數據
val jsonDF= spark.read.json("file:///opt/bigdata/spark/examples/src/main/resources/people.json")
(3)接下來就可以使用DataFrame的函數操作
4.3 讀取parquet列式存儲格式文件創建DataFrame
(1)數據文件
使用spark安裝包下的
/opt/bigdata/spark/examples/src/main/resources/users.parquet文件
(2)在spark shell執行下面命令,讀取數據
val parquetDF=spark.read.parquet("file:///opt/bigdata/spark/examples/src/main/resources/users.parquet")
(3)接下來就可以使用DataFrame的函數操作
5.DataSet
5.1. 什么是DataSet
DataSet是分布式的數據集合,Dataset提供了強類型支持,也是在RDD的每行數據加了類型約束。DataSet是在Spark1.6中添加的新的接口。它集中了RDD的優點(強類型和可以用強大lambda函數)以及使用了Spark SQL優化的執行引擎。DataSet可以通過JVM的對象進行構建,可以用函數式的轉換(map/flatmap/filter)進行多種操作。
5.2. DataFrame、DataSet、RDD的區別
假設RDD中的兩行數據長這樣:
那么DataFrame中的數據長這樣:
那么Dataset中的數據長這樣:
或者長這樣(每行數據是個Object):
DataSet包含了DataFrame的功能,Spark2.0中兩者統一,DataFrame表示為DataSet[Row],即DataSet的子集。
(1)DataSet可以在編譯時檢查類型
(2)並且是面向對象的編程接口
相比DataFrame,Dataset提供了編譯時類型檢查,對於分布式程序來講,提交一次作業太費勁了(要編譯、打包、上傳、運行),到提交到集群運行時才發現錯誤,這會浪費大量的時間,這也是引入Dataset的一個重要原因。
5.3. DataFrame與DataSet互相轉換
DataFrame和DataSet可以相互轉化。
(1)DataFrame轉為 DataSet
df.as[ElementType] 這樣可以把DataFrame轉化為DataSet。
(2)DataSet轉為DataFrame
ds.toDF() 這樣可以把DataSet轉化為DataFrame。
5.4. 創建DataSet
(1)通過spark.createDataset創建
(2)通toDS方法生成DataSet
(3)通過DataFrame轉化生成
使用as[類型]轉換為DataSet