Spark SQL 概述+RDD、DataFrame、DataSet區別+DataFrame和DataSet常用操作


 

目標1:掌握Spark SQL原理

目標2:掌握DataFrame/DataSet數據結構和使用方式

目標3:熟練使用Spark SQL完成計算任務

 

1. Spark SQL概述

1.1. Spark SQL的前世今生

        Shark是一個為Spark設計的大規模數據倉庫系統,它與Hive兼容。Shark建立在Hive的代碼基礎上,並通過將Hive的部分物理執行計划交換出來。這個方法使得Shark的用戶可以加速Hive的查詢,但是Shark繼承了Hive的大且復雜的代碼使得Shark很難優化和維護,同時Shark依賴於Spark的版本。隨着我們遇到了性能優化的上限,以及集成SQL的一些復雜的分析功能,我們發現Hive的MapReduce設計的框架限制了Shark的發展。在2014年7月1日的Spark Summit上,Databricks宣布終止對Shark的開發,將重點放到Spark SQL上。

1.2. 什么是Spark SQL

 

 

Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫做DataFrame並且作為分布式SQL查詢引擎的作用。

相比於Spark RDD API,Spark SQL包含了對結構化數據和在其上運算的更多信息,Spark SQL使用這些信息進行了額外的優化,使對結構化數據的操作更加高效和方便。

有多種方式去使用Spark SQL,包括SQL、DataFrames API和Datasets API。但無論是哪種API或者是編程語言,它們都是基於同樣的執行引擎,因此你可以在不同的API之間隨意切換,它們各有各的特點,看你喜歡那種風格。

1.3. 為什么要學習Spark SQL

我們已經學習了Hive,它是將Hive SQL轉換成MapReduce然后提交到集群中去執行,大大簡化了編寫MapReduce程序的復雜性,由於MapReduce這種計算模型執行效率比較慢,所以Spark SQL應運而生,它是將Spark SQL轉換成RDD,然后提交到集群中去運行,執行效率非常快!

1.易整合

 

 

將sql查詢與spark程序無縫混合,可以使用java、scala、python、R等語言的API操作。

 

2.統一的數據訪問

 

 

以相同的方式連接到任何數據源。

3.兼容Hive

 

 

支持hiveSQL的語法。

 

 

4.標准的數據連接

 

 

可以使用行業標准的JDBC或ODBC連接。

 

 

2. DataFrame

2.1. 什么是DataFrame

DataFrame的前身是SchemaRDD,從Spark 1.3.0開始SchemaRDD更名為DataFrame。與SchemaRDD的主要區別是:DataFrame不再直接繼承自RDD,而是自己實現了RDD的絕大多數功能。你仍舊可以在DataFrame上調用rdd方法將其轉換為一個RDD。

在Spark中,DataFrame是一種以RDD為基礎的分布式數據集,類似於傳統數據庫的二維表格,DataFrame帶有Schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型,但底層做了更多的優化。DataFrame可以從很多數據源構建,比如:已經存在的RDD、結構化文件、外部數據庫、Hive表。

2.2. DataFrame與RDD的區別

RDD可看作是分布式的對象的集合,Spark並不知道對象的詳細模式信息,DataFrame可看作是分布式的Row對象的集合,其提供了由列組成的詳細模式信息(就是列的名稱和類型),使得Spark SQL可以進行某些形式的執行優化。DataFrame和普通的RDD的邏輯框架區別如下所示:

 

 

上圖直觀地體現了DataFrame和RDD的區別。

左側的RDD[Person]雖然以Person為類型參數,但Spark框架本身不了解 Person類的內部結構。

而右側的DataFrame卻提供了詳細的結構信息,使得Spark SQL可以清楚地知道該數據集中包含哪些列,每列的名稱和類型各是什么,DataFrame多了數據的結構信息,即schema。這樣看起來就像一張表了,DataFrame還配套了新的操作數據的方法,DataFrame API(如df.select())和SQL(select id, name from xx_table where ...)。

此外DataFrame還引入了off-heap,意味着JVM堆以外的內存, 這些內存直接受操作系統管理(而不是JVM)。Spark能夠以二進制的形式序列化數據(不包括結構)到off-heap中, 當要操作數據時, 就直接操作off-heap內存. 由於Spark理解schema, 所以知道該如何操作。

RDD是分布式的Java對象的集合。DataFrame是分布式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子以外,更重要的特點是提升執行效
率、減少數據讀取以及執行計划的優化。

有了DataFrame這個高一層的抽象后,我們處理數據更加簡單了,甚至可以用SQL來處理數據了,對開發者來說,易用性有了很大的提升。

不僅如此,通過DataFrame API或SQL處理數據,會自動經過Spark 優化器(Catalyst)的優化,即使你寫的程序或SQL不高效,也可以運行的很快。

 

2.3. DataFrame與RDD的優缺點

2.3.1RDD的優缺點:

優點:

(1)編譯時類型安全 
       編譯時就能檢查出類型錯誤

(2)面向對象的編程風格 
       直接通過對象調用方法的形式來操作數據

缺點:

(1)序列化和反序列化的性能開銷 
       無論是集群間的通信, 還是IO操作都需要對對象的結構和數據進行序列化和反序列化。

(2)GC的性能開銷 
       頻繁的創建和銷毀對象, 勢必會增加GC

2.3.2DataFrame的優缺點:

優點:

DataFrame通過引入schema和off-heap(不在堆里面的內存,指的是除了不在堆的內存,使用操作系統上的內存),解決了RDD的缺點, Spark通過schame就能夠讀懂數據, 因此在通信和IO時就只需要序列化和反序列化數據, 而結構的部分就可以省略了;

缺點:

通過off-heap引入,可以快速的操作數據,避免大量的GC。但是卻丟了RDD的優點,DataFrame不是類型安全的, API也不是面向對象風格的。

 

 

 

3.RDD、DataFrame、DataSet區別

在spark中,RDD、DataFrame、Dataset是最常用的數據類型,本博文給出筆者在使用的過程中體會到的區別和各自的優勢

 

3.1共性:

1、RDD、DataFrame、Dataset全都是spark平台下的分布式彈性數據集,為處理超大型數據提供便利

2、三者都有惰性機制,在進行創建、轉換,如map方法時,不會立即執行,只有在遇到Action如foreach時,三者才會開始遍歷運算,極端情況下,如果代碼里面有創建、轉換,但是后面沒有在Action中使用對應的結果,在執行時會被直接跳過,如

1

2

3

4

5

6

7

8

val sparkconf = new SparkConf().setMaster("local").setAppName("test").set("spark.port.maxRetries","1000")

val spark = SparkSession.builder().config(sparkconf).getOrCreate()

val rdd=spark.sparkContext.parallelize(Seq(("a", 1), ("b", 1), ("a", 1)))

 

rdd.map{line=>

  println("運行")

  line._1

}

map中的println("運行")並不會運行

3、三者都會根據spark的內存情況自動緩存運算,這樣即使數據量很大,也不用擔心會內存溢出

4、三者都有partition的概念,如

1

2

3

4

5

6

7

8

var predata=data.repartition(24).mapPartitions{

      PartLine => {

        PartLine.map{

          line =>

             println(“轉換操作”)

                            }

                         }

這樣對每一個分區進行操作時,就跟在操作數組一樣,不但數據量比較小,而且可以方便的將map中的運算結果拿出來,如果直接用map,map中對外面的操作是無效的,如

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

val rdd=spark.sparkContext.parallelize(Seq(("a", 1), ("b", 1), ("a", 1)))

    var flag=0

    val test=rdd.map{line=>

      println("運行")

      flag+=1

      println(flag)

      line._1

    }

println(test.count)

println(flag)

    /**

    運行

    1

    運行

    2

    運行

    3

    3

    0

   * */

不使用partition時,對map之外的操作無法對map之外的變量造成影響

5、三者有許多共同的函數,如filter,排序等

6、在對DataFrame和Dataset進行操作許多操作都需要這個包進行支持

1

2

import spark.implicits._

//這里的spark是SparkSession的變量名

7、DataFrame和Dataset均可使用模式匹配獲取各個字段的值和類型

DataFrame:

1

2

3

4

5

6

7

testDF.map{

      case Row(col1:String,col2:Int)=>

        println(col1);println(col2)

        col1

      case _=>

        ""

    }

為了提高穩健性,最好后面有一個_通配操作,這里提供了DataFrame一個解析字段的方法

Dataset:

1

2

3

4

5

6

7

8

case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型

    testDS.map{

      case Coltest(col1:String,col2:Int)=>

        println(col1);println(col2)

        col1

      case _=>

        ""

    }

  

3.2區別:

RDD:

1、RDD一般和spark mlib同時使用

2、RDD不支持sparksql操作

DataFrame:

1、與RDD和Dataset不同,DataFrame每一行的類型固定為Row,只有通過解析才能獲取各個字段的值,如

1

2

3

4

5

testDF.foreach{

  line =>

    val col1=line.getAs[String]("col1")

    val col2=line.getAs[String]("col2")

}

每一列的值沒法直接訪問

2、DataFrame與Dataset一般與spark ml同時使用

3、DataFrame與Dataset均支持sparksql的操作,比如select,groupby之類,還能注冊臨時表/視窗,進行sql語句操作,如

1

2

dataDF.createOrReplaceTempView("tmp")

spark.sql("select  ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)

4、DataFrame與Dataset支持一些特別方便的保存方式,比如保存成csv,可以帶上表頭,這樣每一列的字段名一目了然

1

2

3

4

5

6

//保存

val saveoptions = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://172.xx.xx.xx:9000/test")

datawDF.write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()

//讀取

val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://172.xx.xx.xx:9000/test")

val datarDF= spark.read.options(options).format("com.databricks.spark.csv").load()

利用這樣的保存方式,可以方便的獲得字段名和列的對應,而且分隔符(delimiter)可以自由指定

Dataset:

這里主要對比Dataset和DataFrame,因為Dataset和DataFrame擁有完全相同的成員函數,區別只是每一行的數據類型不同

DataFrame也可以叫Dataset[Row],每一行的類型是Row,不解析,每一行究竟有哪些字段,各個字段又是什么類型都無從得知,只能用上面提到的getAS方法或者共性中的第七條提到的模式匹配拿出特定字段

而Dataset中,每一行是什么類型是不一定的,在自定義了case class之后可以很自由的獲得每一行的信息

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型

/**

      rdd

      ("a", 1)

      ("b", 1)

      ("a", 1)

      * */

val test: Dataset[Coltest]=rdd.map{line=>

      Coltest(line._1,line._2)

    }.toDS

test.map{

      line=>

        println(line.col1)

        println(line.col2)

    }

可以看出,Dataset在需要訪問列中的某個字段時是非常方便的,然而,如果要寫一些適配性很強的函數時,如果使用Dataset,行的類型又不確定,可能是各種case class,無法實現適配,這時候用DataFrame即Dataset[Row]就能比較好的解決問題

3.3轉化:

RDD、DataFrame、Dataset三者有許多共性,有各自適用的場景常常需要在三者之間轉換

DataFrame/DatasetRDD

這個轉換很簡單

1

2

val rdd1=testDF.rdd

val rdd2=testDS.rdd

RDDDataFrame

1

2

3

4

import spark.implicits._

val testDF = rdd.map {line=>

      (line._1,line._2)

    }.toDF("col1","col2")

一般用元組把一行的數據寫在一起,然后在toDF中指定字段名

RDDDataset

1

2

3

4

5

import spark.implicits._

case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型

val testDS = rdd.map {line=>

      Coltest(line._1,line._2)

    }.toDS

可以注意到,定義每一行的類型(case class)時,已經給出了字段名和類型,后面只要往case class里面添加值即可

DatasetDataFrame

這個也很簡單,因為只是把case class封裝成Row

1

2

import spark.implicits._

val testDF = testDS.toDF

DataFrameDataset

1

2

3

import spark.implicits._

case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型

val testDS = testDF.as[Coltest]

這種方法就是在給出每一列的類型后,使用as方法,轉成Dataset,這在數據類型是DataFrame又需要針對各個字段處理時極為方便

特別注意:

在使用一些特殊的操作時,一定要加上 import spark.implicits._ 不然toDF、toDS無法使用

 

 

4. 讀取數據源創建DataFrame

4.1 讀取文本文件創建DataFrame

  在spark2.0版本之前,Spark SQL中SQLContext是創建DataFrame和執行SQL的入口,可以利用hiveContext通過hive sql語句操作hive表數據,兼容hive操作,並且hiveContext繼承自SQLContext。在spark2.0之后,這些都統一於SparkSession,SparkSession 封裝了 SparkContext,SqlContext,通過SparkSession可以獲取到SparkConetxt,SqlContext對象。

 

 

(1)在本地創建一個文件,有三列,分別是id、name、age,用空格分隔,然后上傳到hdfs上。person.txt內容為:

1 zhangsan 20

2 lisi 29

3 wangwu 25

4 zhaoliu 30

5 tianqi 35

6 kobe 40

上傳數據文件到HDFS上:

hdfs dfs -put person.txt  /

先執行 spark-shell --master local[2]

 

(2)在spark shell執行下面命令,讀取數據,將每一行的數據使用列分隔符分割

val lineRDD= sc.textFile("/person.txt").map(_.split(" "))

 

 

 

(3)定義case class(相當於表的schema)

case class Person(id:Int, name:String, age:Int)

 

 

(4)將RDD和case class關聯

val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))

 

 

(5)將RDD轉換成DataFrame

val personDF = personRDD.toDF

 

 

 

(6)對DataFrame進行處理

personDF.show

 

 

personDF.printSchema

 

 

(7)不使用樣例類構建DF,自定義schema

scala> val peopleDF = sc.textFile("/people.txt").map(_.split(" ")).map(x=>(x(0),x(1),x(2))).toDF("id","name","age")

peopleDF: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]

(8)通過SparkSession構建DataFrame

使用spark-shell中已經初始化好的SparkSession對象spark生成DataFrame

val dataFrame=spark.read.text("/person.txt")

 

 

4.2 讀取json文件創建DataFrame

(1)數據文件

使用spark安裝包下的

/opt/bigdata/spark/examples/src/main/resources/people.json文件

 

(2)在spark shell執行下面命令,讀取數據

val jsonDF= spark.read.json("file:///opt/bigdata/spark/examples/src/main/resources/people.json")

 

(3)接下來就可以使用DataFrame的函數操作               

 

 

4.3 讀取parquet列式存儲格式文件創建DataFrame

(1)數據文件

使用spark安裝包下的

/opt/bigdata/spark/examples/src/main/resources/users.parquet文件

 

(2)在spark shell執行下面命令,讀取數據

val parquetDF=spark.read.parquet("file:///opt/bigdata/spark/examples/src/main/resources/users.parquet")

 

(3)接下來就可以使用DataFrame的函數操作

 

 

 

 

5.DataSet

 

5.1.  什么是DataSet

 

DataSet是分布式的數據集合,Dataset提供了強類型支持,也是在RDD的每行數據加了類型約束。DataSet是在Spark1.6中添加的新的接口。它集中了RDD的優點(強類型和可以用強大lambda函數)以及使用了Spark SQL優化的執行引擎。DataSet可以通過JVM的對象進行構建,可以用函數式的轉換(map/flatmap/filter)進行多種操作。

 

 

 

5.2.  DataFrame、DataSet、RDD的區別

 

假設RDD中的兩行數據長這樣:

 

 

那么DataFrame中的數據長這樣:

 

 

那么Dataset中的數據長這樣:

 

 

或者長這樣(每行數據是個Object):

 

DataSet包含了DataFrame的功能,Spark2.0中兩者統一,DataFrame表示為DataSet[Row],即DataSet的子集。

 

(1)DataSet可以在編譯時檢查類型

 

(2)並且是面向對象的編程接口

 

相比DataFrame,Dataset提供了編譯時類型檢查,對於分布式程序來講,提交一次作業太費勁了(要編譯、打包、上傳運行),到提交到集群運行時才發現錯誤,這會浪費大量的時間,這也是引入Dataset的一個重要原因。

 

5.3.  DataFrame與DataSet互相轉換

 

DataFrame和DataSet可以相互轉化。

 

(1)DataFrame轉為 DataSet

 

df.as[ElementType] 這樣可以把DataFrame轉化為DataSet。

 

(2)DataSet轉為DataFrame

 

ds.toDF() 這樣可以把DataSet轉化為DataFrame。

5.4.  創建DataSet

 

(1)通過spark.createDataset創建

 

 

(2)通toDS方法生成DataSet

 

(3)通過DataFrame轉化生成

  使用as[類型]轉換為DataSet

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM