一、spark SQL概述
1.1 什么是spark SQL
Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫做DataFrame並且作為分布式SQL查詢引擎的作用。類似於hive的作用。
1.2 spark SQL的特點
1、容易集成:安裝Spark的時候,已經集成好了。不需要單獨安裝。
2、統一的數據訪問方式:JDBC、JSON、Hive、parquet文件(一種列式存儲文件,是SparkSQL默認的數據源,hive中也支持)
3、完全兼容Hive。可以將Hive中的數據,直接讀取到Spark SQL中處理。
一般在生產中,基本都是使用hive做數據倉庫存儲數據,然后用spark從hive讀取數據進行處理。
4、支持標准的數據連接:JDBC、ODBC
5、計算效率比基於mr的hive高,而且hive2.x版本中,hive建議使用spark作為執行引擎
二、spark SQL基本原理
2.1 DataFrame和DataSet基本概念
2.1.1 DataFrame
DataFrame是組織成命名列的數據集。它在概念上等同於關系數據庫中的表,里面有表的結構以及數據,但在底層具有更豐富的優化。DataFrames可以從各種來源構建,
例如:
結構化數據文件
hive中的表
外部數據庫或現有RDDs
DataFrame API支持的語言有Scala,Java,Python和R。
比起RDD,DataFrame多了數據的結構信息,即schema。RDD是分布式的 Java對象的集合。DataFrame是分布式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子以外,更重要的特點是提升執行效率、減少數據讀取以及執行計划的優化。
2.1.2 DataSet
Dataset是一個分布式的數據收集器。這是在Spark1.6之后新加的一個接口,兼顧了RDD的優點(強類型,可以使用功能強大的lambda)以及Spark SQL的執行器高效性的優點。所以可以把DataFrames看成是一種特殊的Datasets,即:Dataset(Row)
2.2 創建DataFrame的方式
2.2.1 SparkSession對象
Apache Spark 2.0引入了SparkSession,其為用戶提供了一個統一的切入點來使用Spark的各項功能,並且允許用戶通過它調用DataFrame和Dataset相關API來編寫Spark程序。最重要的是,它減少了用戶需要了解的一些概念,使得我們可以很容易地與Spark交互。
在2.0版本之前,與Spark交互之前必須先創建SparkConf和SparkContext。然而在Spark 2.0中,我們可以通過SparkSession來實現同樣的功能,而不需要顯式地創建SparkConf, SparkContext 以及 SQLContext,因為這些對象已經封裝在SparkSession中。
要注意一點,在我用的這個spark版本中,直接使用new SQLContext() 來創建SQLContext對象,會顯示該方式已經被棄用了(IDEA中會顯示已棄用),建議使用SparkSession來獲取SQLContext對象。
2.2.2 通過case class樣本類
這種方式在scala中比較常用,因為case class是scala的特色
/**
表 t_stu 的結構為:
id name age
*/
object CreateDF {
def main(args: Array[String]): Unit = {
//這是最新的獲取SQLContext對象的方式
//2、創建SparkSession對象,設置master,appname
val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
//3、通過spark獲取sparkContext對象,讀取數據
val lines = spark.sparkContext.textFile("G:\\test\\t_stu.txt").map(_.split(","))
//4、將數據映射到case class中,也就是數據映射到表的對應字段中
val tb = lines.map(t=>emp(t(0).toInt,t(1),t(2).toInt))
//這里必須要加上隱式轉換,否則無法調用 toDF 函數
import spark.sqlContext.implicits._
//5、生成df
val df2 = tb.toDF()
//相當於select name from t_stu
df1.select($"name").show()
//關閉spark對象
spark.stop()
}
}
/*1、定義case class,每個屬性對應表中的字段名以及類型
一般生產中為了方便,會全部定義為string類型,然后有需要的時候
才根據實際情況將string轉為需要的類型
這一步相當於定義表的結構
*/
case class emp(id:Int,name:String,age:Int)
總結步驟為:
1、定義case class,用來表結構 2、創建sparkSession對象,用來讀取數據 3、將rdd中的數據和case class映射 4、調用 toDF 函數將rdd轉為 DataFrame
2.2.3 通過StructType類
這種方式java比較常用
package SparkSQLExer
import org.apache
import org.apache.spark
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
/**
* 創建dataschema方式2:
* 通過spark session對象創建,表結構通過StructType創建
*/
object CreateDF02 {
def main(args: Array[String]): Unit = {
val sparkS = SparkSession.builder().master("local").appName("create schema").getOrCreate()
//1、通過StructType創建表結構schema,里面表的每個字段使用 StructField定義
val tbSchema = StructType(List(
StructField("id",DataTypes.IntegerType),
StructField("name",DataTypes.StringType),
StructField("age",DataTypes.IntegerType)
))
//2、讀取數據
var lines = sparkS.sparkContext.textFile("G:\\test\\t_stu.txt").map(_.split(","))
//3、將數據映射為ROW對象
val rdd1 = lines.map(t=>Row(t(0).toInt,t(1),t(2).toInt))
//4、創建表結構和表數據映射,返回的就是df
val df2 = sparkS.createDataFrame(rdd1, tbSchema)
//打印表結構
df2.printSchema()
sparkS.stop()
}
}
總結步驟為:
1、通過StructType創建表結構schema,里面表的每個字段使用 StructField定義 2、通過sparkSession.sparkContext讀取數據 3、將數據映射格式為Row對象 4、將StructType和數據Row對象映射,返回df
2.2.4 使用json等有表格式的文件類型
package SparkSQLExer
import org.apache.spark.sql.SparkSession
/**
* 創建df方式3:通過有格式的文件直接導入數據以及表結構,比如json格式的文件
* 返回的直接就是一個DF
*/
object CreateDF03 {
def main(args: Array[String]): Unit = {
val sparkS = SparkSession.builder().master("local").appName("create df through json").getOrCreate()
//讀取json方式1:
val jsonrdd1= sparkS.read.json("path")
//讀取json方式2:
val jsonrdd1= sparkS.read.format("json").load("path")
sparkS.stop()
}
}
這種方式比較簡單,就是直接讀取json文件而已
sparkS.read.xxxx讀取任意文件時,返回的都是DF對象
2.3 操作DataFrame
2.3.1 DSL語句
DSL語句其實就是將sql語句的一些操作轉為類似函數的方式去調用,比如:
df1.select("name").show
例子:
為了方便,直接在spark-shell里操作了,
spark-shell --master spark://bigdata121:7077 1、打印表結構 scala> df1.printSchema root |-- empno: integer (nullable = true) |-- ename: string (nullable = true) |-- job: string (nullable = true) |-- mgr: integer (nullable = true) |-- hiredate: string (nullable = true) |-- sal: integer (nullable = true) |-- comm: integer (nullable = true) |-- deptno: integer (nullable = true) 2、顯示當前df的表數據或者查詢結果的數據 scala> df1.show +-----+------+---------+----+----------+----+----+------+ |empno| ename| job| mgr| hiredate| sal|comm|deptno| +-----+------+---------+----+----------+----+----+------+ | 7369| SMITH| CLERK|7902|1980/12/17| 800| 0| 20| | 7499| ALLEN| SALESMAN|7698| 1981/2/20|1600| 300| 30| | 7521| WARD| SALESMAN|7698| 1981/2/22|1250| 500| 30| | 7566| JONES| MANAGER|7839| 1981/4/2|2975| 0| 20| | 7654|MARTIN| SALESMAN|7698| 1981/9/28|1250|1400| 30| | 7698| BLAKE| MANAGER|7839| 1981/5/1|2850| 0| 30| | 7782| CLARK| MANAGER|7839| 1981/6/9|2450| 0| 10| | 7788| SCOTT| ANALYST|7566| 1987/4/19|3000| 0| 20| | 7839| KING|PRESIDENT|7839|1981/11/17|5000| 0| 10| | 7844|TURNER| SALESMAN|7698| 1981/9/8|1500| 0| 30| | 7876| ADAMS| CLERK|7788| 1987/5/23|1100| 0| 20| | 7900| JAMES| CLERK|7698| 1981/12/3| 950| 0| 30| | 7902| FORD| ANALYST|7566| 1981/12/3|3000| 0| 20| | 7934|MILLER| CLERK|7782| 1982/1/23|1300| 0| 10| +-----+------+---------+----+----------+----+----+------+ 3、執行select, 相當於select xxx form xxx where xxx scala> df1.select("ename","sal").where("sal>2000").show +------+----+ | ename| sal| +------+----+ | SMITH| 800| | ALLEN|1600| | WARD|1250| | JONES|2975| |MARTIN|1250| | BLAKE|2850| | CLARK|2450| | SCOTT|3000| | KING|5000| |TURNER|1500| | ADAMS|1100| | JAMES| 950| | FORD|3000| |MILLER|1300| +------+----+ 4、對某些列進行操作 對某個指定進行操作時,需要加上$符號,然后后面才能操作 $代表 取出來以后,再做一些操作。 注意:這個 $ 的用法在ideal中無法正常使用,解決方法下面說 scala> df1.select($"ename",$"sal",$"sal"+100).show +------+----+-----------+ | ename| sal|(sal + 100)| +------+----+-----------+ | SMITH| 800| 900| | ALLEN|1600| 1700| | WARD|1250| 1350| | JONES|2975| 3075| |MARTIN|1250| 1350| | BLAKE|2850| 2950| | CLARK|2450| 2550| | SCOTT|3000| 3100| | KING|5000| 5100| |TURNER|1500| 1600| | ADAMS|1100| 1200| | JAMES| 950| 1050| | FORD|3000| 3100| |MILLER|1300| 1400| +------+----+-----------+ 5、過濾行 scala> df1.filter($"sal">2000).show +-----+-----+---------+----+----------+----+----+------+ |empno|ename| job| mgr| hiredate| sal|comm|deptno| +-----+-----+---------+----+----------+----+----+------+ | 7566|JONES| MANAGER|7839| 1981/4/2|2975| 0| 20| | 7698|BLAKE| MANAGER|7839| 1981/5/1|2850| 0| 30| | 7782|CLARK| MANAGER|7839| 1981/6/9|2450| 0| 10| | 7788|SCOTT| ANALYST|7566| 1987/4/19|3000| 0| 20| | 7839| KING|PRESIDENT|7839|1981/11/17|5000| 0| 10| | 7902| FORD| ANALYST|7566| 1981/12/3|3000| 0| 20| +-----+-----+---------+----+----------+----+----+------+ 6、分組以及計數 scala> df1.groupBy($"deptno").count.show +------+-----+ |deptno|count| +------+-----+ | 20|