Spark SQL
一、Spark SQL基礎
1、Spark SQL簡介

Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫做DataFrame並且作為分布式SQL查詢引擎的作用。
http://spark.apache.org/sql/
為什么要學習Spark SQL?我們已經學習了Hive,它是將Hive SQL轉換成MapReduce然后提交到集群上執行,大大簡化了編寫MapReduce的程序的復雜性,由於MapReduce這種計算模型執行效率比較慢。所以Spark SQL的應運而生,它是將Spark SQL轉換成RDD,然后提交到集群執行,執行效率非常快!同時Spark SQL也支持從Hive中讀取數據。
Spark SQL的特點:
1.容易整合(集成)
2.統一的數據訪問方式
3.兼容Hive
4.標准的數據連接
2、基本概念:Datasets和DataFrames
DataFrame
DataFrame是組織成命名列的數據集。它在概念上等同於關系數據庫中的表,但在底層具有更豐富的優化。DataFrames可以從各種來源構建,
例如:
結構化數據文件
hive中的表
外部數據庫或現有RDDs
DataFrame API支持的語言有Scala,Java,Python和R。
從上圖可以看出,DataFrame多了數據的結構信息,即schema。RDD是分布式的 Java對象的集合。DataFrame是分布式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子以外,更重要的特點是提升執行效率、減少數據讀取以及執行計划的優化
Datasets
Dataset是數據的分布式集合。Dataset是在Spark 1.6中添加的一個新接口,是DataFrame之上更高一級的抽象。它提供了RDD的優點(強類型化,使用強大的lambda函數的能力)以及Spark SQL優化后的執行引擎的優點。一個Dataset 可以從JVM對象構造,然后使用函數轉換(map, flatMap,filter等)去操作。 Dataset API 支持Scala和Java。 Python不支持Dataset API。
3、測試數據
使用員工表的數據,並已經將其保存到了HDFS上。
emp.csv
dept.csv
4、創建DataFrames
(*)通過Case Class創建DataFrames
① 定義case class(相當於表的結構:Schema)
注意:由於mgr和comm列中包含null值,簡單起見,將對應的case class類型定義為String
② 將HDFS上的數據讀入RDD,並將RDD與case Class關聯
③ 將RDD轉換成DataFrames
④ 通過DataFrames查詢數據
(*)使用SparkSession
① 什么是SparkSession
Apache Spark 2.0引入了SparkSession,其為用戶提供了一個統一的切入點來使用Spark的各項功能,並且允許用戶通過它調用DataFrame和Dataset相關API來編寫Spark程序。最重要的是,它減少了用戶需要了解的一些概念,使得我們可以很容易地與Spark交互。
在2.0版本之前,與Spark交互之前必須先創建SparkConf和SparkContext。然而在Spark 2.0中,我們可以通過SparkSession來實現同樣的功能,而不需要顯式地創建SparkConf, SparkContext 以及 SQLContext,因為這些對象已經封裝在SparkSession中。
② 創建StructType,來定義Schema結構信息
注意,需要:import org.apache.spark.sql.types._
③ 讀入數據並且切分數據
④ 將RDD中的數據映射成Row
注意,需要:import org.apache.spark.sql.Row
⑤ 創建DataFrames
val df = spark.createDataFrame(rowRDD,myschema)
再舉一個例子,使用JSon文件來創建DataFame
① 源文件:$SPARK_HOME/examples/src/main/resources/people.json
② val df = spark.read.json("源文件")
③ 查看數據和Schema信息
5、DataFrame操作
DataFrame操作也稱為無類型的Dataset操作
(*)查詢所有的員工姓名
(*)查詢所有的員工姓名和薪水,並給薪水加100塊錢
(*)查詢工資大於2000的員工
(*)求每個部門的員工人數
完整的例子,請參考:
http://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Dataset
(*)在DataFrame中使用SQL語句
① 將DataFrame注冊成表(視圖):df.createOrReplaceTempView("emp")
② 執行查詢:spark.sql("select * from emp").show
spark.sql("select * from emp where deptno=10").show
spark.sql("select deptno,sum(sal) from emp group by deptno").show
6、Global Temporary View
上面使用的是一個在Session生命周期中的臨時views。在Spark SQL中,如果你想擁有一個臨時的view,並想在不同的Session中共享,而且在application的運行周期內可用,那么就需要創建一個全局的臨時view。並記得使用的時候加上global_temp作為前綴來引用它,因為全局的臨時view是綁定到系統保留的數據庫global_temp上。
① 創建一個普通的view和一個全局的view
df.createOrReplaceTempView("emp1")
df.createGlobalTempView("emp2")
② 在當前會話中執行查詢,均可查詢出結果。
spark.sql("select * from emp1").show
spark.sql("select * from global_temp.emp2").show
③ 開啟一個新的會話,執行同樣的查詢
spark.newSession.sql("select * from emp1").show (運行出錯)
spark.newSession.sql("select * from global_temp.emp2").show
7、創建Datasets
DataFrame的引入,可以讓Spark更好的處理結構數據的計算,但其中一個主要的問題是:缺乏編譯時類型安全。為了解決這個問題,Spark采用新的Dataset API (DataFrame API的類型擴展)。
Dataset是一個分布式的數據收集器。這是在Spark1.6之后新加的一個接口,兼顧了RDD的優點(強類型,可以使用功能強大的lambda)以及Spark SQL的執行器高效性的優點。所以可以把DataFrames看成是一種特殊的Datasets,即:Dataset(Row)
(*)創建DataSet,方式一:使用序列
1、定義case class
case class MyData(a:Int,b:String)
2、生成序列,並創建DataSet
val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS
3、查看結果
(*)創建DataSet,方式二:使用JSON數據
1、定義case class
case class Person(name: String, gender: String)
2、通過JSON數據生成DataFrame
val df = spark.read.json(sc.parallelize("""{"gender": "Male", "name": "Tom"}""" :: Nil))
3、將DataFrame轉成DataSet
df.as[Person].show
df.as[Person].collect
(*)創建DataSet,方式三:使用HDFS數據
1、讀取HDFS數據,並創建DataSet
val linesDS = spark.read.text("hdfs://hadoop111:9000/data/data.txt").as[String]
2、對DataSet進行操作:分詞后,查詢長度大於3的單詞
val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)
words.show
words.collect
3、執行WordCount程序
val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x => x._1).count
result.show
排序:result.orderBy($"value").show
8、Datasets的操作案例
emp.json

(*)使用emp.json 生成DataFrame
val empDF = spark.read.json("/root/resources/emp.json")
查詢工資大於3000的員工
empDF.where($"sal" >= 3000).show
(*)創建case class
case class Emp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)
(*)生成DataSets,並查詢數據
val empDS = empDF.as[Emp]
查詢工資大於3000的員工
empDS.filter(_.sal > 3000).show
查看10號部門的員工
empDS.filter(_.deptno == 10).show
(*)多表查詢
1、創建部門表
val deptRDD=sc.textFile("/root/temp/dept.csv").map(_.split(","))
case class Dept(deptno:Int,dname:String,loc:String)
val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS
2、創建員工表
case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)
val empRDD = sc.textFile("/root/temp/emp.csv").map(_.split(","))
val empDS = empRDD.map(x => Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS
3、執行多表查詢:等值鏈接
val result = deptDS.join(empDS,"deptno")
另一種寫法:注意有三個等號
val result = deptDS.joinWith(empDS,deptDS("deptno")=== empDS("deptno"))
joinWith和join的區別是連接后的新Dataset的schema會不一樣
(*)查看執行計划:result.explain
二、使用數據源
1、通用的Load/Save函數
(*)什么是parquet文件?
Parquet是列式存儲格式的一種文件類型,列式存儲有以下的核心:
l 可以跳過不符合條件的數據,只讀取需要的數據,降低IO數據量。
l 壓縮編碼可以降低磁盤存儲空間。由於同一列的數據類型是一樣的,可以使用更高效的壓縮編碼(例如Run Length Encoding和Delta Encoding)進一步節約存儲空間。
l 只讀取需要的列,支持向量運算,能夠獲取更好的掃描性能。
l Parquet格式是Spark SQL的默認數據源,可通過spark.sql.sources.default配置
(*)通用的Load/Save函數
l 讀取Parquet文件
val usersDF = spark.read.load("/root/resources/users.parquet")
l 查詢Schema和數據
l 查詢用戶的name和喜愛顏色,並保存
usersDF.select($"name",$"favorite_color").write.save("/root/result/parquet")
l 驗證結果
(*)顯式指定文件格式:加載json格式
l 直接加載:val usersDF = spark.read.load("/root/resources/people.json")
會出錯
l val usersDF = spark.read.format("json").load("/root/resources/people.json")
(*)存儲模式(Save Modes)
可以采用SaveMode執行存儲操作,SaveMode定義了對數據的處理模式。需要注意的是,這些保存模式不使用任何鎖定,不是原子操作。此外,當使用Overwrite方式執行時,在輸出新數據之前原數據就已經被刪除。SaveMode詳細介紹如下表:
Demo:
usersDF.select($"name").write.save("/root/result/parquet1")
--> 出錯:因為/root/result/parquet1已經存在
usersDF.select($"name").write.mode("overwrite").save("/root/result/parquet1")
(*)將結果保存為表
usersDF.select($"name").write.saveAsTable("table1")
也可以進行分區、分桶等操作:partitionBy、bucketBy
2、Parquet文件
Parquet是一個列格式而且用於多個數據處理系統中。Spark SQL提供支持對於Parquet文件的讀寫,也就是自動保存原始數據的schema。當寫Parquet文件時,所有的列被自動轉化為nullable,因為兼容性的緣故。
(*)案例:
讀入json格式的數據,將其轉換成parquet格式,並創建相應的表來使用SQL進行查詢。
(*)Schema的合並:
Parquet支持Schema evolution(Schema演變,即:合並)。用戶可以先定義一個簡單的Schema,然后逐漸的向Schema中增加列描述。通過這種方式,用戶可以獲取多個有不同Schema但相互兼容的Parquet文件。
Demo:
3、JSON Datasets
Spark SQL能自動解析JSON數據集的Schema,讀取JSON數據集為DataFrame格式。讀取JSON數據集方法為SQLContext.read().json()。該方法將String格式的RDD或JSON文件轉換為DataFrame。
需要注意的是,這里的JSON文件不是常規的JSON格式。JSON文件每一行必須包含一個獨立的、自滿足有效的JSON對象。如果用多行描述一個JSON對象,會導致讀取出錯。讀取JSON數據集示例如下:
(*)Demo1:使用Spark自帶的示例文件 --> people.json 文件
定義路徑:
val path ="/root/resources/people.json"
讀取Json文件,生成DataFrame:
val peopleDF = spark.read.json(path)
打印Schema結構信息:
peopleDF.printSchema()
創建臨時視圖:
peopleDF.createOrReplaceTempView("people")
執行查詢
spark.sql("SELECT name FROM people WHERE age=19").show
4、使用JDBC
Spark SQL同樣支持通過JDBC讀取其他數據庫的數據作為數據源。
Demo演示:使用Spark SQL讀取Oracle數據庫中的表。
l 啟動Spark Shell的時候,指定Oracle數據庫的驅動
spark-shell --master spark://spark81:7077 \\
--jars /root/temp/ojdbc6.jar \\
--driver-class-path /root/temp/ojdbc6.jar
讀取Oracle數據庫中的數據
(*)方式一:
val oracleDF = spark.read.format("jdbc").
option("url","jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com").
option("dbtable","scott.emp").
option("user","scott").
option("password","tiger").
load
(*)方式二:
導入需要的類:
import java.util.Properties
定義屬性:
val oracleprops = new Properties()
oracleprops.setProperty("user","scott")
oracleprops.setProperty("password","tiger")
讀取數據:
val oracleEmpDF =
spark.read.jdbc("jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com",
"scott.emp",oracleprops)
注意:下面是讀取Oracle 10g(Windows 上)的步驟
5、使用Hive Table
l 首先,搭建好Hive的環境(需要Hadoop)
l 配置Spark SQL支持Hive
只需要將以下文件拷貝到$SPARK_HOME/conf的目錄下,即可
$HIVE_HOME/conf/hive-site.xml
$HADOOP_CONF_DIR/core-site.xml
$HADOOP_CONF_DIR/hdfs-site.xml
l 使用Spark Shell操作Hive
啟動Spark Shell的時候,需要使用--jars指定mysql的驅動程序
創建表
spark.sql("create table src (key INT, value STRING) row format delimited fields terminated by ','")
導入數據
spark.sql("load data local path '/root/temp/data.txt' into table src")
查詢數據
spark.sql("select * from src").show
l 使用spark-sql操作Hive
啟動spark-sql的時候,需要使用--jars指定mysql的驅動程序
操作Hive
show tables;
select * from 表;
三、性能優化
1、在內存中緩存數據
性能調優主要是將數據放入內存中操作。通過spark.cacheTable("tableName")或者dataFrame.cache()。使用spark.uncacheTable("tableName")來從內存中去除table。
Demo案例:
(*)從Oracle數據庫中讀取數據,生成DataFrame
val oracleDF = spark.read.format("jdbc")
.option("url","jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com")
.option("dbtable","scott.emp")
.option("user","scott")
.option("password","tiger").load
(*)將DataFrame注冊成表: oracleDF.registerTempTable("emp")
(*)執行查詢,並通過Web Console監控執行的時間
spark.sql("select * from emp").show
(*)將表進行緩存,並查詢兩次,並通過Web Console監控執行的時間
spark.sqlContext.cacheTable("emp")
(*)清空緩存:
spark.sqlContext.cacheTable("emp")
spark.sqlContext.clearCache
2、性能優化相關參數
1.將數據緩存到內存中的相關優化參數
(1)spark.sql.inMemoryColumnarStorage.compressed
默認為 true
Spark SQL 將會基於統計信息自動地為每一列選擇一種壓縮編碼方式。
(2)spark.sql.inMemoryColumnarStorage.batchSize
默認值:10000
緩存批處理大小。緩存數據時, 較大的批處理大小可以提高內存利用率和壓縮率,但同時也會帶來 OOM(Out Of Memory)的風險。
2.其他性能相關的配置選項(不過不推薦手動修改,可能在后續版本自動的自適應修改)
(1)spark.sql.files.maxPartitionBytes
默認值:128 MB
讀取文件時單個分區可容納的最大字節數
(2)spark.sql.files.openCostInBytes
默認值:4M
打開文件的估算成本, 按照同一時間能夠掃描的字節數來測量。當往一個分區寫入多個文件的時候會使用。高估更好, 這樣的話小文件分區將比大文件分區更快 (先被調度)。
(3)spark.sql.autoBroadcastJoinThreshold
默認值:10M
用於配置一個表在執行 join 操作時能夠廣播給所有 worker 節點的最大字節大小。通過將這個值設置為 -1 可以禁用廣播。注意,當前數據統計僅支持已經運行了 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。
(4)spark.sql.shuffle.partitions
默認值:200
用於配置 join 或聚合操作混洗(shuffle)數據時使用的分區數。
四、在IDEA中開發Spark SQL程序
1、指定Schema格式
package sparksql import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession} import org.apache.spark.sql.types._ import org.apache.spark.{SparkConf, SparkContext} object SpecifyingSchema { def main(args: Array[String]) { //創建Spark Session對象 val spark = SparkSession.builder().master("local").appName("UnderstandingSparkSession").getOrCreate() //從指定的地址創建RDD val personRDD = spark.sparkContext.textFile("D:\\temp\\student.txt").map(_.split(" ")) //通過StructType直接指定每個字段的schema val schema = StructType( List( StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType, true) ) ) //將RDD映射到rowRDD val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt)) //將schema信息應用到rowRDD上 val personDataFrame = spark.createDataFrame(rowRDD, schema) //注冊表 personDataFrame.createOrReplaceTempView("t_person") //執行SQL val df = spark.sql("select * from t_person order by age desc limit 4") //顯示結果 df.show() //停止Spark Context spark.stop() } }
2、使用case class
package demo import org.apache.spark.sql.SparkSession //使用case class object Demo2 { def main(args: Array[String]): Unit = { //創建SparkSession val spark = SparkSession.builder().master("local").appName("My Demo 1").getOrCreate() //從指定的文件中讀取數據,生成對應的RDD val lineRDD = spark.sparkContext.textFile("d:\\temp\\student.txt").map(_.split(" ")) //將RDD和case class 關聯 val studentRDD = lineRDD.map( x => Student(x(0).toInt,x(1),x(2).toInt)) //生成 DataFrame,通過RDD 生成DF,導入隱式轉換 import spark.sqlContext.implicits._ val studentDF = studentRDD.toDF //注冊表 視圖 studentDF.createOrReplaceTempView("student") //執行SQL spark.sql("select * from student").show() spark.stop() } } //case class 一定放在外面 case class Student(stuID:Int,stuName:String,stuAge:Int)
3、就數據保存到數據庫
package demo import java.util.Properties import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} //作用:讀取本地一個文件, 生成對應 DataFrame,注冊表 object Demo1 { def main(args: Array[String]): Unit = { //創建SparkSession val spark = SparkSession.builder().master("local").appName("My Demo 1").getOrCreate() //從指定的文件中讀取數據,生成對應的RDD val personRDD = spark.sparkContext.textFile("d:\\temp\\student.txt").map(_.split(" ")) //創建schema ,通過StructType val schema = StructType( List( StructField("personID",IntegerType,true), StructField("personName",StringType,true), StructField("personAge",IntegerType,true) ) ) //將RDD映射到Row RDD 行的數據上 val rowRDD = personRDD.map(p => Row(p(0).toInt,p(1).trim,p(2).toInt)) //生成DataFrame val personDF = spark.createDataFrame(rowRDD,schema) //將DF注冊成表 personDF.createOrReplaceTempView("myperson") //執行SQL val result = spark.sql("select * from myperson") //顯示 //result.show() //將結果保存到oracle中 val props = new Properties() props.setProperty("user","scott") props.setProperty("password","tiger") result.write.jdbc("jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com","scott.myperson",props) //如果表已經存在,append的方式數據 //result.write.mode("append").jdbc("jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com","scott.myperson",props) //停止spark context spark.stop() } }