Spark學習之Spark SQL


Spark SQL

一、Spark SQL基礎

1Spark SQL簡介

Spark SQLSpark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫做DataFrame並且作為分布式SQL查詢引擎的作用。
http://spark.apache.org/sql/

為什么要學習Spark SQL我們已經學習了Hive,它是將Hive SQL轉換成MapReduce然后提交到集群上執行,大大簡化了編寫MapReduce的程序的復雜性,由於MapReduce這種計算模型執行效率比較慢。所以Spark SQL的應運而生,它是將Spark SQL轉換成RDD,然后提交到集群執行,執行效率非常快!同時Spark SQL也支持從Hive中讀取數據。

 

Spark SQL的特點:

1.容易整合(集成) 

2.統一的數據訪問方式

3.兼容Hive
 

4.標准的數據連接

2、基本概念:DatasetsDataFrames

  DataFrame

  DataFrame是組織成命名列的數據集。它在概念上等同於關系數據庫中的,但在底層具有更豐富的優化。DataFrames可以從各種來源構建,

例如:

  結構化數據文件

  hive中的表

  外部數據庫或現有RDDs

DataFrame API支持的語言有ScalaJavaPythonR

從上圖可以看出,DataFrame多了數據的結構信息,schemaRDD是分布式的 Java對象的集合。DataFrame是分布式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子以外,更重要的特點是提升執行效率、減少數據讀取以及執行計划的優化

  Datasets

  Dataset是數據的分布式集合。Dataset是在Spark 1.6中添加的一個新接口,是DataFrame之上更高一級的抽象。它提供了RDD的優點(強類型化,使用強大的lambda函數的能力)以及Spark SQL優化后的執行引擎的優點。一個Dataset 可以從JVM對象構造,然后使用函數轉換(mapflatMapfilter等)去操作。 Dataset API 支持ScalaJavaPython不支持Dataset API

3、測試數據

使用員工表的數據,並已經將其保存到了HDFS上。
emp.csv

dept.csv

4、創建DataFrames

*)通過Case Class創建DataFrames

① 定義case class(相當於表的結構:Schema

注意:由於mgrcomm列中包含null值,簡單起見,將對應的case class類型定義為String

② HDFS上的數據讀入RDD,並將RDDcase Class關聯 
 

③ RDD轉換成DataFrames

④ 通過DataFrames查詢數據
 

*)使用SparkSession

① 什么是SparkSession

Apache Spark 2.0引入了SparkSession,其為用戶提供了一個統一的切入點來使用Spark的各項功能,並且允許用戶通過它調用DataFrameDataset相關API來編寫Spark程序。最重要的是,它減少了用戶需要了解的一些概念,使得我們可以很容易地與Spark交互。

2.0版本之前,與Spark交互之前必須先創建SparkConfSparkContext然而在Spark 2.0中,我們可以通過SparkSession來實現同樣的功能,而不需要顯式地創建SparkConf, SparkContext 以及 SQLContext,因為這些對象已經封裝在SparkSession中。
 

② 創建StructType,來定義Schema結構信息
 

注意,需要:import org.apache.spark.sql.types._

③ 讀入數據並且切分數據
 

④ RDD中的數據映射成Row

注意,需要:import org.apache.spark.sql.Row

⑤ 創建DataFrames

val df = spark.createDataFrame(rowRDD,myschema)

 

再舉一個例子,使用JSon文件來創建DataFame

① 源文件:$SPARK_HOME/examples/src/main/resources/people.json

② val df = spark.read.json("源文件")

③ 查看數據和Schema信息
 

5DataFrame操作

DataFrame操作也稱為無類型的Dataset操作

*)查詢所有的員工姓名

*)查詢所有的員工姓名和薪水,並給薪水加100塊錢

*)查詢工資大於2000的員工

*)求每個部門的員工人數
 

完整的例子,請參考:

http://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Dataset 

*)在DataFrame中使用SQL語句

① 將DataFrame注冊成表(視圖):df.createOrReplaceTempView("emp")

② 執行查詢:spark.sql("select * from emp").show

              spark.sql("select * from emp where deptno=10").show

              spark.sql("select deptno,sum(sal) from emp group by deptno").show

 

6Global Temporary View

上面使用的是一個在Session生命周期中的臨時views。在Spark SQL中,如果你想擁有一個臨時的view,並想在不同的Session中共享,而且在application的運行周期內可用,那么就需要創建一個全局的臨時view。並記得使用的時候加上global_temp作為前綴來引用它,因為全局的臨時view是綁定到系統保留的數據庫global_temp上。

① 創建一個普通的view和一個全局的view

df.createOrReplaceTempView("emp1")

df.createGlobalTempView("emp2")

 

② 在當前會話中執行查詢,均可查詢出結果。

spark.sql("select * from emp1").show

spark.sql("select * from global_temp.emp2").show

 

③ 開啟一個新的會話,執行同樣的查詢

spark.newSession.sql("select * from emp1").show     (運行出錯)

spark.newSession.sql("select * from global_temp.emp2").show

 

7、創建Datasets

DataFrame的引入,可以讓Spark更好的處理結構數據的計算,但其中一個主要的問題是:缺乏編譯時類型安全。為了解決這個問題,Spark采用新的Dataset API (DataFrame API的類型擴展)
 

Dataset是一個分布式的數據收集器。這是在Spark1.6之后新加的一個接口,兼顧了RDD的優點(強類型,可以使用功能強大的lambda)以及Spark SQL的執行器高效性的優點。所以可以把DataFrames看成是一種特殊的Datasets,即:Dataset(Row)

 

*)創建DataSet,方式一:使用序列

1、定義case class

    case class MyData(a:Int,b:String)

 

2、生成序列,並創建DataSet

   val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS

 

3、查看結果
 

*)創建DataSet,方式二:使用JSON數據

1、定義case class

             case class Person(name: String, gender: String)

 

2、通過JSON數據生成DataFrame

             val df = spark.read.json(sc.parallelize("""{"gender": "Male", "name": "Tom"}""" :: Nil))

 

3、將DataFrame轉成DataSet

               df.as[Person].show

               df.as[Person].collect

 

*)創建DataSet,方式三:使用HDFS數據

1、讀取HDFS數據,並創建DataSet

                val linesDS = spark.read.text("hdfs://hadoop111:9000/data/data.txt").as[String]

 

2、對DataSet進行操作:分詞后,查詢長度大於3的單詞

                val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)

                words.show

                words.collect

 

            3、執行WordCount程序

             val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x => x._1).count

             result.show

             排序:result.orderBy($"value").show

 

8Datasets的操作案例
  emp.json

*)使用emp.json 生成DataFrame

val empDF = spark.read.json("/root/resources/emp.json")

            查詢工資大於3000的員工

            empDF.where($"sal" >= 3000).show

 

*)創建case class

case class Emp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)

 

*)生成DataSets,並查詢數據

     val empDS = empDF.as[Emp]

 

     查詢工資大於3000的員工

     empDS.filter(_.sal > 3000).show

 

     查看10號部門的員工

     empDS.filter(_.deptno == 10).show

 

*)多表查詢

1、創建部門表

val deptRDD=sc.textFile("/root/temp/dept.csv").map(_.split(","))

case class Dept(deptno:Int,dname:String,loc:String)

val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS

 

2、創建員工表

case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)

val empRDD = sc.textFile("/root/temp/emp.csv").map(_.split(","))

val empDS = empRDD.map(x => Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS

 

3、執行多表查詢:等值鏈接

    val result = deptDS.join(empDS,"deptno")

    

    另一種寫法:注意有三個等號

    val result = deptDS.joinWith(empDS,deptDS("deptno")=== empDS("deptno"))

    joinWithjoin的區別是連接后的新Datasetschema會不一樣

 

*)查看執行計划:result.explain

 

二、使用數據源 

1、通用的Load/Save函數

*)什么是parquet文件?

Parquet是列式存儲格式的一種文件類型,列式存儲有以下的核心:

可以跳過不符合條件的數據,只讀取需要的數據,降低IO數據量。

壓縮編碼可以降低磁盤存儲空間。由於同一列的數據類型是一樣的,可以使用更高效的壓縮編碼(例如Run Length EncodingDelta Encoding)進一步節約存儲空間。

l 只讀取需要的列,支持向量運算,能夠獲取更好的掃描性能。

l Parquet格式是Spark SQL的默認數據源,可通過spark.sql.sources.default配置

 

*)通用的Load/Save函數

讀取Parquet文件

val usersDF = spark.read.load("/root/resources/users.parquet")

查詢Schema和數據

查詢用戶的name和喜愛顏色,並保存

usersDF.select($"name",$"favorite_color").write.save("/root/result/parquet")

 

l 驗證結果

*)顯式指定文件格式:加載json格式

直接加載:val usersDF = spark.read.load("/root/resources/people.json")

                      會出錯

l val usersDF = spark.read.format("json").load("/root/resources/people.json")

 

 

*)存儲模式(Save Modes

可以采用SaveMode執行存儲操作,SaveMode定義了對數據的處理模式。需要注意的是,這些保存模式不使用任何鎖定,不是原子操作。此外,當使用Overwrite方式執行時,在輸出新數據之前原數據就已經被刪除。SaveMode詳細介紹如下表:
 

Demo

  usersDF.select($"name").write.save("/root/result/parquet1")

--> 出錯:因為/root/result/parquet1已經存在

 

  usersDF.select($"name").write.mode("overwrite").save("/root/result/parquet1")

 

*)將結果保存為表

  usersDF.select($"name").write.saveAsTable("table1")

 

也可以進行分區、分桶等操作:partitionBybucketBy

2Parquet文件

Parquet是一個列格式而且用於多個數據處理系統中。Spark SQL提供支持對於Parquet文件的讀寫,也就是自動保存原始數據的schema。當寫Parquet文件時,所有的列被自動轉化為nullable,因為兼容性的緣故。

*)案例:

讀入json格式的數據,將其轉換成parquet格式,並創建相應的表來使用SQL進行查詢。

*Schema的合並:

Parquet支持Schema evolutionSchema演變,即:合並)。用戶可以先定義一個簡單的Schema,然后逐漸的向Schema中增加列描述。通過這種方式,用戶可以獲取多個有不同Schema但相互兼容的Parquet文件。

Demo:
 

3JSON Datasets

Spark SQL能自動解析JSON數據集的Schema,讀取JSON數據集為DataFrame格式。讀取JSON數據集方法為SQLContext.read().json()。該方法將String格式的RDDJSON文件轉換為DataFrame

需要注意的是,這里的JSON文件不是常規的JSON格式。JSON文件每一行必須包含一個獨立的、自滿足有效的JSON對象。如果用多行描述一個JSON對象,會導致讀取出錯。讀取JSON數據集示例如下:

*Demo1:使用Spark自帶的示例文件 --> people.json 文件

定義路徑:

val path ="/root/resources/people.json"

 

讀取Json文件,生成DataFrame

val peopleDF = spark.read.json(path)

 

打印Schema結構信息:

peopleDF.printSchema()

 

創建臨時視圖:

peopleDF.createOrReplaceTempView("people")

 

執行查詢

spark.sql("SELECT name FROM people WHERE age=19").show

 

4、使用JDBC

Spark SQL同樣支持通過JDBC讀取其他數據庫的數據作為數據源。

Demo演示:使用Spark SQL讀取Oracle數據庫中的表。

啟動Spark Shell的時候,指定Oracle數據庫的驅動

spark-shell --master spark://spark81:7077            \\

         --jars /root/temp/ojdbc6.jar              \\

         --driver-class-path /root/temp/ojdbc6.jar

 

  讀取Oracle數據庫中的數據

 

 

 

*)方式一:

            val oracleDF = spark.read.format("jdbc").

         option("url","jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com").

         option("dbtable","scott.emp").

         option("user","scott").

         option("password","tiger").

         load

 

*)方式二:

導入需要的類:

import java.util.Properties   

 

定義屬性:               

val oracleprops = new Properties()

oracleprops.setProperty("user","scott")

oracleprops.setProperty("password","tiger")

 

 

讀取數據:

val oracleEmpDF =

   spark.read.jdbc("jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com",

   "scott.emp",oracleprops)

 

注意:下面是讀取Oracle 10gWindows 上)的步驟

5、使用Hive Table

首先,搭建好Hive的環境(需要Hadoop

配置Spark SQL支持Hive

  只需要將以下文件拷貝到$SPARK_HOME/conf的目錄下,即可

  $HIVE_HOME/conf/hive-site.xml

  $HADOOP_CONF_DIR/core-site.xml

  $HADOOP_CONF_DIR/hdfs-site.xml

 

使用Spark Shell操作Hive

  啟動Spark Shell的時候,需要使用--jars指定mysql的驅動程序

  創建表

  spark.sql("create table src (key INT, value STRING) row format delimited fields terminated by ','")

 

  導入數據

spark.sql("load data local path '/root/temp/data.txt' into table src")

 

  查詢數據

spark.sql("select * from src").show

 

使用spark-sql操作Hive

  啟動spark-sql的時候,需要使用--jars指定mysql的驅動程序

  操作Hive

  show tables;

  select * from ;

 

 

三、性能優化

1、在內存中緩存數據

性能調優主要是將數據放入內存中操作。通過spark.cacheTable("tableName")或者dataFrame.cache()。使用spark.uncacheTable("tableName")來從內存中去除table

Demo案例:

*)從Oracle數據庫中讀取數據,生成DataFrame

     val oracleDF = spark.read.format("jdbc")

        .option("url","jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com")

        .option("dbtable","scott.emp")

        .option("user","scott")

        .option("password","tiger").load

 

*)將DataFrame注冊成表:    oracleDF.registerTempTable("emp")

 

*)執行查詢,並通過Web Console監控執行的時間

                 spark.sql("select * from emp").show
 

*)將表進行緩存,並查詢兩次,並通過Web Console監控執行的時間

     spark.sqlContext.cacheTable("emp")
 

*)清空緩存:

      spark.sqlContext.cacheTable("emp")

      spark.sqlContext.clearCache

2、性能優化相關參數

1.將數據緩存到內存中的相關優化參數

  (1)spark.sql.inMemoryColumnarStorage.compressed

  默認為 true

  Spark SQL 將會基於統計信息自動地為每一列選擇一種壓縮編碼方式。

 

  (2)spark.sql.inMemoryColumnarStorage.batchSize

  默認值:10000

  緩存批處理大小。緩存數據時, 較大的批處理大小可以提高內存利用率和壓縮率,但同時也會帶來 OOMOut Of Memory)的風險。

 

2.其他性能相關的配置選項(不過不推薦手動修改,可能在后續版本自動的自適應修改)

  (1)spark.sql.files.maxPartitionBytes

  默認值:128 MB

  讀取文件時單個分區可容納的最大字節數

 

  (2)spark.sql.files.openCostInBytes

  默認值:4M

  打開文件的估算成本, 按照同一時間能夠掃描的字節數來測量。當往一個分區寫入多個文件的時候會使用。高估更好, 這樣的話小文件分區將比大文件分區更快 (先被調度)

 

  (3)spark.sql.autoBroadcastJoinThreshold

  默認值:10M

  用於配置一個表在執行 join 操作時能夠廣播給所有 worker 節點的最大字節大小。通過將這個值設置為 -1 可以禁用廣播。注意,當前數據統計僅支持已經運行了 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。

 

  (4)spark.sql.shuffle.partitions

  默認值:200

  用於配置 join 或聚合操作混洗(shuffle)數據時使用的分區數。

四、在IDEA中開發Spark SQL程序

 

1、指定Schema格式

package sparksql

import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext}

object SpecifyingSchema {
  def main(args: Array[String]) {
      //創建Spark Session對象
    val spark = SparkSession.builder().master("local").appName("UnderstandingSparkSession").getOrCreate()

    //從指定的地址創建RDD
    val personRDD = spark.sparkContext.textFile("D:\\temp\\student.txt").map(_.split(" "))

    //通過StructType直接指定每個字段的schema
    val schema = StructType(
      List(
        StructField("id", IntegerType, true),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
      )
    )
    //將RDD映射到rowRDD
    val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))

    //將schema信息應用到rowRDD上
    val personDataFrame = spark.createDataFrame(rowRDD, schema)

    //注冊表
    personDataFrame.createOrReplaceTempView("t_person")

    //執行SQL
    val df = spark.sql("select * from t_person order by age desc limit 4")

    //顯示結果
    df.show()

    //停止Spark Context
    spark.stop()
  }
}

 

2、使用case class 

package demo

import org.apache.spark.sql.SparkSession

//使用case class
object Demo2 {

  def main(args: Array[String]): Unit = {
    //創建SparkSession
    val spark = SparkSession.builder().master("local").appName("My Demo 1").getOrCreate()

    //從指定的文件中讀取數據,生成對應的RDD
    val lineRDD = spark.sparkContext.textFile("d:\\temp\\student.txt").map(_.split(" "))

    //將RDD和case class 關聯
    val studentRDD = lineRDD.map( x => Student(x(0).toInt,x(1),x(2).toInt))

    //生成 DataFrame,通過RDD 生成DF,導入隱式轉換
    import spark.sqlContext.implicits._
    val studentDF = studentRDD.toDF

    //注冊表 視圖
    studentDF.createOrReplaceTempView("student")

    //執行SQL
    spark.sql("select * from student").show()

    spark.stop()
  }
}

//case class 一定放在外面
case class Student(stuID:Int,stuName:String,stuAge:Int)

 

 

3、就數據保存到數據庫

package demo

import java.util.Properties

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}


//作用:讀取本地一個文件, 生成對應 DataFrame,注冊表
object Demo1 {

  def main(args: Array[String]): Unit = {
    //創建SparkSession
    val spark = SparkSession.builder().master("local").appName("My Demo 1").getOrCreate()

    //從指定的文件中讀取數據,生成對應的RDD
    val personRDD = spark.sparkContext.textFile("d:\\temp\\student.txt").map(_.split(" "))

    //創建schema ,通過StructType
    val schema = StructType(
        List(
          StructField("personID",IntegerType,true),
          StructField("personName",StringType,true),
          StructField("personAge",IntegerType,true)
        )
    )

    //將RDD映射到Row RDD 行的數據上
    val rowRDD = personRDD.map(p => Row(p(0).toInt,p(1).trim,p(2).toInt))

    //生成DataFrame
    val personDF = spark.createDataFrame(rowRDD,schema)

    //將DF注冊成表
    personDF.createOrReplaceTempView("myperson")

    //執行SQL
    val result = spark.sql("select * from myperson")

    //顯示
    //result.show()

    //將結果保存到oracle中
    val props = new Properties()
    props.setProperty("user","scott")
    props.setProperty("password","tiger")

    result.write.jdbc("jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com","scott.myperson",props)

    //如果表已經存在,append的方式數據
    //result.write.mode("append").jdbc("jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com","scott.myperson",props)


    //停止spark context
    spark.stop()
  }
}

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM