Spark SQL筆記——技術點匯總


目錄

· 概述

· 原理

    · 組成

    · 執行流程

    · 性能

· API

    · 應用程序模板

    · 通用讀寫方法

    · RDD轉為DataFrame

    · Parquet文件數據源

    · JSON文件數據源

    · Hive數據源

    · 數據庫JDBC數據源

    · DataFrame Operation

· 性能調優

    · 緩存數據

    · 參數調優

· 案例

    · 數據准備

    · 查詢部門職工數

    · 查詢各部門職工工資總數,並排序

    · 查詢各部門職工考勤信息


 

概述

1. Spark SQLSpark的結構化數據處理模塊。

2. Spark SQL特點

    a) 數據兼容:可從Hive表、外部數據庫(JDBC)、RDDParquet文件、JSON文件獲取數據,可通過Scala方法或SQL方式操作這些數據,並把結果轉回RDD

    b) 組件擴展:SQL語法解析器、分析器、優化器均可重新定義。

    c) 性能優化:內存列存儲、動態字節碼生成等優化技術,內存緩存數據。

    d) 多語言支持:ScalaJavaPythonR

原理

組成

1. Catalyst優化:優化處理查詢語句的整個過程,包括解析、綁定、優化、物理計划等,主要由關系代數(relation algebra)、表達式(expression)以及查詢優化(query optimization)組成。

2. Spark SQL內核:處理數據的輸入輸出,從不同數據源(結構化數據Parquet文件JSON文件、Hive表、外部數據庫、已有RDD)獲取數據,執行查詢(expression of queries),並將查詢結果輸出成DataFrame

3. Hive支持:對Hive數據的處理,主要包括HiveQLMetaStoreSerDesUDFs等。

執行流程

1. SqlParserSQL語句解析,生成Unresolved邏輯計划(未提取Schema信息);

2. Catalyst分析器結合數據字典(catalog)進行綁定,生成Analyzed邏輯計划,過程中Schema Catalog要提取Schema信息;

3. Catalyst優化器對Analyzed邏輯計划優化,按照優化規則得到Optimized邏輯計划;

4. 與Spark Planner交互,應用策略(strategy)到plan,使用Spark Planner將邏輯計划轉換成物理計划,然后調用next函數,生成可執行物理計划。

性能

1. 內存列式緩存:內存列式(in-memory columnar format)緩存(再次執行時無需重復讀取),僅掃描需要的列,並自動調整壓縮比使內存使用率和GC壓力最小化。

2. 動態代碼和字節碼生成技術:提升重復表達式求值查詢的速率。

3. Tungsten優化:

    a) 由Spark自己管理內存而不是JVM,避免了JVM GC帶來的性能損失。

    b) 內存中Java對象被存儲成Spark自己的二進制格式,直接在二進制格式上計算,省去序列化和反序列化時間;此格式更緊湊,節省內存空間。

API

應用程序模板

 1 import org.apache.spark.SparkConf
 2 import org.apache.spark.SparkContext
 3 import org.apache.spark.sql.SQLContext
 4 import org.apache.spark.sql.hive.HiveContext
 5 
 6 object Test {
 7   def main(args: Array[String]): Unit = {
 8     val conf = new SparkConf().setAppName("Test")
 9     val sc = new SparkContext(conf)
10     val sqlContext = new HiveContext(sc)
11     
12     // ...
13   }
14 }

通用讀寫方法

1. Spark SQL內置數據源短名稱有jsonparquetjdbc,默認parquet(通過“spark.sql.sources.default”配置)。

2. 保存模式:

Scala/Java

Python

說明

SaveMode.ErrorIfExists

"error"

默認,如果數據庫已經存在,拋出異常

SaveMode.Append

"append"

如果數據庫已經存在,追加DataFrame數據

SaveMode.Overwrite

"overwrite"

如果數據庫已經存在,重寫DataFrame數據

SaveMode.Ignore

"ignore"

如果數據庫已經存在,忽略DataFrame數據

3. 讀寫文件代碼(統一使用sqlContext.read和dataFrame.write)模板: 

1 val dataFrame = sqlContext.read.format("數據源名稱").load("文件路徑")
2 val newDataFrame = dataFrame // 操作數據得到新DataFrame
3 newDataFrame.write.format("數據源名稱").save("文件路徑")

RDD轉為DataFrame

1. 方法1

    a) 方法:使用反射機制推斷RDD Schema

    b) 場景:運行前知道Schema

    c) 特點:代碼簡潔。

    d) 示例:

 1 import org.apache.spark.SparkConf
 2 import org.apache.spark.SparkContext
 3 import org.apache.spark.sql.SQLContext
 4 
 5 object Test {
 6     
 7     def main(args: Array[String]): Unit = {
 8         val conf = new SparkConf().setAppName("Test")
 9         val sc = new SparkContext(conf)
10         val sqlContext = new SQLContext(sc)
11         
12         // 將一個RDD隱式轉換為一個DataFrame
13         import sqlContext.implicits._
14         // 使用case定義Schema(不能超過22個屬性)
15         case class Person(name: String, age: Int)
16         // 讀取文件創建MappedRDD,再將數據寫入Person類,隱式轉換為DataFrame
17         val peopleDF = sc.textFile("/test/people.csv").map(_.split(",")).map(cols => Person(cols(0), cols(1).trim.toInt)).toDF()
18         // DataFrame注冊臨時表
19         peopleDF.registerTempTable("table_people")
20         
21         // SQL
22         val teenagers = sqlContext.sql("select name, age from table_people where age >= 13 and age <= 19")
23         teenagers.collect.foreach(println)
24     }
25     
26 }

2. 方法2

    a) 方法:以編程方式定義RDD Schema

    b) 場景:運行前不知道Schema

    c) 示例:

 1 import org.apache.spark.SparkConf
 2 import org.apache.spark.SparkContext
 3 import org.apache.spark.sql.SQLContext
 4 import org.apache.spark.sql.types.StructType
 5 import org.apache.spark.sql.types.StringType
 6 import org.apache.spark.sql.types.StructField
 7 import org.apache.spark.sql.Row
 8 
 9 object Test {
10     
11     def main(args: Array[String]): Unit = {
12         val conf = new SparkConf().setAppName("Test")
13         val sc = new SparkContext(conf)
14         val sqlContext = new SQLContext(sc)
15         
16         // 將一個RDD隱式轉換為一個DataFrame
17         import sqlContext.implicits._
18         // 使用case定義Schema(不能超過22個屬性)
19         case class Person(name: String, age: Int)
20         // 讀取文件創建MappedRDD
21         val peopleFile = sc.textFile("/test/people.csv")
22         // 運行時從某處獲取的Schema結構
23         val schemaArray = Array("name", "age")
24         // 創建Schema
25         val schema = StructType(schemaArray.map(fieldName => StructField(fieldName, StringType, true)))
26         // 將文本轉為RDD
27         val rowRDD = peopleFile.map(_.split(",")).map(cols => Row(cols(0), cols(1).trim))
28         // 將Schema應用於RDD
29         val peopleDF = sqlContext.createDataFrame(rowRDD, schema)
30         // DataFrame注冊臨時表
31         peopleDF.registerTempTable("table_people")
32         
33         // SQL
34         val teenagers = sqlContext.sql("select name, age from table_people where age >= 13 and age <= 19")
35         teenagers.collect.foreach(println)
36     }
37     
38 }

Parquet文件數據源

1. Parquet優點:

    a) 高效、Parquet采用列式存儲避免讀入不需要的數據,具有極好的性能和GC

    b) 方便的壓縮和解壓縮,並具有極好的壓縮比例;

    c) 可直接讀寫Parquet文件,比磁盤更好的緩存效果。

2. Spark SQL支持根據Parquet文件自描述自動推斷Schema,生成DataFrame

3. 編程示例:

1 // 加載文件創建DataFrame
2 val peopleDF = sqlContext.read.load("/test/people.parquet")
3 peopleDF.printSchema
4 // DataFrame注冊臨時表
5 peopleDF.registerTempTable("table_people")
6 
7 // SQL
8 val teenagers = sqlContext.sql("select name, age from table_people where age >= 13 and age <= 19")
9 teenagers.collect.foreach(println)

4. 分區發現(partition discovery

    a) 與Hive分區表類似,通過分區列的值對表設置分區目錄,加載Parquet數據源可自動發現和推斷分區信息。

    b) 示例:有一個分區列為gendercountry的分區表,加載路徑“/path/to/table”可自動提取分區信息

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

    創建的DataFrame的Schema:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

    c) 分區列數據類型:支持numeric和string類型的自動推斷,通過“spark.sql.sources.partitionColumnTypeInference.enabled”配置開啟或關閉(默認開啟),關閉后分區列全為string類型。

JSON文件數據源

1. Spark SQL支持根據JSON文件自描述自動推斷Schema,生成DataFrame

2. 示例:

1 // 加載文件創建DataFrame(JSON文件自描述Schema)
2 val peopleDF = sqlContext.read.format("json").load("/test/people.json")
3 peopleDF.printSchema
4 // DataFrame注冊臨時表
5 peopleDF.registerTempTable("table_people")
6 
7 // SQL
8 val teenagers = sqlContext.sql("select name, age from table_people where age >= 13 and age <= 19")
9 teenagers.collect.foreach(println)

Hive數據源

1. HiveContext

    a) 操作Hive數據源須創建SQLContext的子類HiveContext對象。

    b) Standalone集群:添加hive-site.xml$SPARK_HOME/conf目錄。

    c) YARN集群:添加hive-site.xml$YARN_CONF_DIR目錄;添加Hive元數據庫JDBC驅動jar文件到$HADOOP_HOME/lib目錄。

    d) 最簡單方法:通過spark-submit命令參數--file--jar參數分別指定hive-site.xmlHive元數據庫JDBC驅動jar文件。

    e) 未找到hive-site.xml:當前目錄下自動創建metastore_dbwarehouse目錄。

    f) 模板:

val sqlContext = new HiveContext(sc)

2. 使用HiveQL

    a) “spark.sql.dialect”配置:SQLContextsqlHiveContext支持sqlhiveql(默認)。

    b) 模板:

sqlContext.sql("HiveQL")

3. 支持Hive特性

    a) Hive查詢語句,包括selectgroup byorder bycluster bysort by

    b) Hive運算符,包括:關系運算符(=、!=<><>>=<=等)、算術運算符(+、-、*、/、%等)、邏輯運算符(and、&&、or、||等)、復雜類型構造函數、數據函數(sign、ln、cos等)、字符串函數(instr、length、printf);

    c) 用戶自定義函數(UDF);

    d) 用戶自定義聚合函數(UDAF);

    e) 用戶自定義序列化格式(SerDes);

    f) 連接操作,包括join、{left | right | full} outer join、left semi join、cross join

    g) 聯合操作(union);

    h) 子查詢:select col from (select a + b as col from t1) t2

    i) 抽樣(Sampling);

    j) 解釋(Explain);

    k) 分區表(Partitioned table);

    l) 所有Hive DDL操作函數,包括create table、create table as select、alter table

    m) 大多數Hive數據類型tinyint、smallint、int、bigint、boolean、float、double、string、binary、timestamp、date、array<>、map<>、struct<>

數據庫JDBC數據源

1. Spark SQL支持加載數據庫表生成DataFrame

2. 模板(注意:需要相關JDBC驅動jar文件)

val jdbcOptions = Map("url" -> "", "driver" -> "", "dbtable" -> "")
sqlContext.read.format("jdbc").options(jdbcOptions).load

3. JDBC參數

名稱

說明

url

The JDBC URL to connect to.

dbtable

The JDBC table that should be read. Note that anything that is valid in a FROM clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parentheses.

driver

The class name of the JDBC driver to use to connect to this URL.

partitionColumn, lowerBound, upperBound, numPartitions

These options must all be specified if any of them is specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned.

fetchSize

The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows).

DataFrame Operation

1. 分類:

    a) DataFrameAction

名稱

說明

collect: Array[Row]

Array形式返回DataFrame的所有Row

collectAsList: List[Row]

List形式返回DataFrame的所有Row

count(): Long

返回DataFrameRow數目

first(): Row

返回第一個Row

head(): Row

返回第一個Row

show(): Unit

以表格形式顯示DataFrame的前20Row

take(n: Int): Array[Row]

返回DataFramenRow

    b) 基礎DataFrame函數(basic DataFrame functions)

名稱

說明

cache(): DataFrame.this.type

緩存DataFrame

columns: Array[String]

Array形式返回全部的列名

dtypes: Array[(String, String)]

Array形式返回全部的列名和數據類型

explain: Unit

打印physical plan到控制台

isLocal: Boolean

返回collecttake是否可以本地運行

persist(newLevel: StorageLevel: DataFrame.this.type

根據StorageLevel持久化

printSchema(): Unit

以樹格式打印Schema

registerTempTable(tableName: String): Unit

使用給定的名字注冊DataFrame為臨時表

schema: StructType

返回DataFrameSchema

toDF(colNames: String*): DataFrame

返回一個重新指定columnDataFrame

unpersist(): DataFrame.this.type

移除持久化

    c) 集成語言查詢(language integrated queries)

名稱

說明

agg(aggExpr: (String, String), aggExpr: (String, String)*): DataFrame

agg(exprs: Map[String, String]): DataFrame

agg(expr: Column, exprs: Column*): DataFrame

在整體DataFrame不分組聚合

apply(colName: String): Column

Column形式返回列名為colName的列

as(alias: String): DataFrame

as(alias: Symbol): DataFrame

以一個別名集方式返回一個新DataFrame

col(colName: String): Column

apply

cube(col: String, cols: String*): GroupedData

使用專門的列(以便聚合),給當前DataFrame創建一個多維數據集

distinct: DataFrame

Row去重,返回新DataFrame

drop(col: Column): DataFrame

刪除一個列,返回新DataFrame

except(other: DataFrame): DataFrame

集合差,返回新DataFrame

filter(conditionExpr: String): DataFrame

filter(condition: Column): DataFrame

使用給定的SQL表達式過濾

groupBy(col: String, cols: String*): GroupedData

使用給定的列分組DataFrame,以便能夠聚合

intersect(other: DataFrame): DataFrame

交集,返回新DataFrame

limit(n: Int): DataFrame

獲取前n行數據,返回新DataFrame

join(right: DataFrame):DataFrame

join(right: DataFrame, joinExprs: Column):DataFrame

join(right: DataFrame, joinExprs: Column, joinType: String):DataFrame

Join,第1個為笛卡爾積(Cross Join),第2個為Inner Join

orderBy(col: String, cols: String*):  DataFrame

orderBy(sortExprs: Columns*): DataFrame

使用給定表達式排序,返回新DataFrame

sample(withReplacement: Boolean, fraction: Double): DataFrame

使用隨機種子,抽樣部分行返回新DataFrame

select(col: String, cols: String*): DataFrame

select(cols: Column*): DataFrame

selectExpr(exprs: String*): DataFrame

選擇一個列集合

sort(col: String, cols: String*):  DataFrame

sort(sortExprs: Column*):  DataFrame

orderBy

unionAll(other: DataFrame): DataFrame

集合和,返回新DataFrame

where(conditionExpr: String): DataFrame

where(condition: Column): DataFrame

filter

withColumn(colName, col: Column)

添加新列,返回新DataFrame

withColumnRenamed(existingName: String, newName: String)

重命名列,返回新DataFrame

    d) 輸出操作

名稱

說明

write

保存DataFrame內容到外部文件存儲、Hive表:

dataFrame.write.save("路徑") // 默認Parquet數據源

dataFrame.write.format("數據源名稱").save("路徑")

dataFrame.write.saveAsTable("表名")

dataFrame.write.insertInto("表名")

    e) RDD Operation

    DataFrame本質是一個擁有多個分區的RDD,支持RDD OperationcoalesceflatMapforeachforeachPartitionjavaRDDmapmapPartitionsrepartitiontoJSONtoJavaRDD等。

性能調優

緩存數據

1. 內存列式(in-memory columnar format)緩存:Spark SQL僅掃描需要的列,並自動調整壓縮比使內存使用率和GC壓力最小化。

2. 相關配置:

名稱

默認值

說明

spark.sql.inMemoryColumnarStorage.compressed

true

true時,Spark SQL基於數據統計為每列自動選擇壓縮編碼

spark.sql.inMemoryColumnarStorage.batchSize

10000

控制內存列式緩存的批處理大小,大批量可提升內存使用率,但會增加內存溢出風險

3. 緩存/移除緩存代碼模板:

// 緩存方法1(lazy)
sqlContext.cacheTable("表名")
// 緩存方法2(lazy)
dataFrame.cache()
// 移除緩存(eager)
sqlContext.uncacheTable("表名")
// 注意:RDD的cache方法不是列式緩存
rdd.cache()

參數調優

名稱

默認值

說明

spark.sql.autoBroadcastJoinThreshold

10485760 (10MB)

當執行Join時,對一個將要被廣播到所有Worker的表配置最大字節,通過設置為-1禁止廣播

spark.sql.tungsten.enabled

true

配置是否開啟Tungsten優化,默認開啟

spark.sql.shuffle.partitions

200

當執行JoinAggregation進行Shuffle時,配置可用分區數

案例

數據准備

1. 數據結構

    a) 職工基本信息(people

字段

說明

name

姓名

id

ID

gender

性別

age

年齡

year

入職年份

position

職位

deptid

所在部門ID

    b) 部門基本信息(department)

字段

說明

name

名稱

deptid

ID

    c) 職工考勤信息(attendance)

字段

說明

id

職工ID

year

month

overtime

加班

latetime

遲到

absenteeism

曠工

leaveearlytime

早退小時

    d) 職工工資清單(salary)

字段

說明

id

職工ID

salary

工資

2. 建庫、建表(spark-shell方式)

1 sqlContext.sql("create database hrs")
2 sqlContext.sql("use hrs")
3 sqlContext.sql("create external table if not exists people(name string, id int, gender string, age int, year int, position string, deptid int) row format delimited fields terminated by ',' lines terminated by '\n' location '/test/hrs/people'")
4 sqlContext.sql("create external table if not exists department(name string, deptid int) row format delimited fields terminated by ',' lines terminated by '\n' location '/test/hrs/department'")
5 sqlContext.sql("create external table if not exists attendance(id int, year int, month int, overtime int, latetime int, absenteeism int, leaveearlytime int) row format delimited fields terminated by ',' lines terminated by '\n' location '/test/hrs/attendance'")
6 sqlContext.sql("create external table if not exists salary(id int, salary int) row format delimited fields terminated by ',' lines terminated by '\n' location '/test/hrs/salary'")

3. 測試數據
    a) 職工基本信息(people.csv)

Michael,1,male,37,2001,developer,2
Andy,2,female,33,2003,manager,1
Justin,3,female,23,2013,recruitingspecialist,3
John,4,male,22,2014,developer,2
Herry,5,male,27,2010,developer,1
Brewster,6,male,37,2001,manager,2
Brice,7,female,30,2003,manager,3
Justin,8,male,23,2013,recruitingspecialist,3
John,9,male,22,2014,developer,1
Herry,10,female,27,2010,recruitingspecialist,3

    b) 部門基本信息(department.csv)

manager,1
researchhanddevelopment,2
humanresources,3

    c) 職工考勤信息(attendance.csv)

1,2015,12,0,2,4,0
2,2015,8,5,0,5,3
3,2015,3,16,4,1,5
4,2015,3,0,0,0,0
5,2015,3,0,3,0,0
6,2015,3,32,0,0,0
7,2015,3,0,16,3,32
8,2015,19,36,0,0,0
9,2015,5,6,30,0,2
10,2015,10,6,56,40,0
1,2014,12,0,2,4,0
2,2014,8,5,0,5,3
3,2014,3,16,4,1,5
4,2014,3,0,0,0,0
5,2014,3,0,3,0,0
6,2014,3,32,0,0,0
7,2014,3,0,16,3,32
8,2014,19,36,0,0,0
9,2014,5,6,30,0,2
10,2014,10,6,56,40,0

    d) 職工工資清單(salary.csv)

1,5000
2,10000
3,6000
4,7000
5,5000
6,11000
7,12000
8,5500
9,6500
10,4500

4. 上傳數據文件至HDFS

hadoop fs -mkdir /test/hrs/people
hadoop fs -mkdir /test/hrs/department
hadoop fs -mkdir /test/hrs/attendance
hadoop fs -mkdir /test/hrs/salary
hadoop fs -put people.csv /test/hrs/people
hadoop fs -put department.csv /test/hrs/department
hadoop fs -put attendance.csv /test/hrs/attendance
hadoop fs -put salary.csv /test/hrs/salary

查詢部門職工數

1. HiveQL方式

1 sqlContext.sql("select d.name, count(p.id) from people p join department d on p.deptid = d.deptid group by d.name").show

2. Scala方式

1 val peopleDF = sqlContext.table("people")
2 val departmentDF = sqlContext.table("department")
3 peopleDF.join(departmentDF, peopleDF("deptid") === departmentDF("deptid")).groupBy(departmentDF("name")).agg(count(peopleDF("id")).as("cnt")).select(departmentDF("name"), col("cnt")).show

3. 結果

查詢各部門職工工資總數,並排序

1. HiveQL方式

1 sqlContext.sql("select d.name, sum(s.salary) as salarysum from people p join department d on p.deptid = d.deptid join salary s on p.id = s.id  group by d.name order by salarysum").show

2. Scala方式

1 val peopleDF = sqlContext.table("people")
2 val departmentDF = sqlContext.table("department")
3 val salaryDF = sqlContext.table("salary")
4 peopleDF.join(departmentDF, peopleDF("deptid") === departmentDF("deptid")).join(salaryDF, peopleDF("id") === salaryDF("id")).groupBy(departmentDF("name")).agg(sum(salaryDF("salary")).as("salarysum")).orderBy("salarysum").select(departmentDF("name"), col("salarysum")).show

3. 結果

查詢各部門職工考勤信息

1. HiveQL方式

1 sqlContext.sql("select d.name, ai.year, sum(ai.attinfo) from (select p.id, p.deptid, a.year, a.month, (a.overtime - a.latetime - a.absenteeism - a.leaveearlytime) as attinfo from attendance a join people p on a.id = p.id) ai join department d on ai.deptid = d.deptid group by d.name, ai.year").show

2. Scala方式

1 val attendanceDF = sqlContext.table("attendance")
2 val peopleDF = sqlContext.table("people")
3 val departmentDF = sqlContext.table("department")
4 val subqueryDF = attendanceDF.join(peopleDF, attendanceDF("id") === peopleDF("id")).select(peopleDF("id"), peopleDF("deptid"), attendanceDF("year"), attendanceDF("month"), (attendanceDF("overtime") - attendanceDF("latetime") - attendanceDF("absenteeism") - attendanceDF("leaveearlytime")).as("attinfo"))
5 subqueryDF.join(departmentDF, subqueryDF("deptid") === departmentDF("deptid")).groupBy(departmentDF("name"), subqueryDF("year")).agg(sum(subqueryDF("attinfo")).as("attinfosum")).select(departmentDF("name"), subqueryDF("year"), col("attinfosum")).show

3. 結果

作者:netoxi
出處:http://www.cnblogs.com/netoxi
本文版權歸作者和博客園共有,歡迎轉載,未經同意須保留此段聲明,且在文章頁面明顯位置給出原文連接。歡迎指正與交流。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM