Spark詳解(06) - SparkSQL
Spark SQL概述
什么是Spark SQL
Spark SQL是Spark用於結構化數據(Structured Data)處理的Spark模塊。
(1)半結構化數據(日志數據): 001 zhangsan 18
(2)結構化數據(數據庫數據):
id |
name |
age |
001 |
zhangsan |
18 |
為什么要有Spark SQL
Hive on Spark:Hive既作為存儲元數據又負責SQL的解析優化,語法是HQL語法,執行引擎變成了Spark,Spark負責采用RDD執行。
Spark on Hive:Hive只作為存儲元數據,Spark負責SQL解析優化,語法是Spark SQL語法,Spark負責采用優化后的RDD執行。
Spark SQL原理
Spark SQL它提供了2個編程抽象,DataFrame、DataSet。(類似Spark Core中的RDD)
什么是DataFrame
1)DataFrame是一種以RDD為基礎的分布式數據集,類似於傳統數據庫中的二維表格。
2)DataFrame與RDD的主要區別在於,前者帶有schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型。
左側的RDD[Person]雖然以Person為類型參數,但Spark框架本身不了解Person類的內部結構。而右側的DataFrame卻提供了詳細的結構信息,使得Spark SQL可以清楚地知道該數據集中包含哪些列,每列的名稱和類型各是什么。
3)Spark SQL性能上比RDD要高。因為Spark SQL了解數據內部結構,從而對藏於DataFrame背后的數據源以及作用於DataFrame之上的變換進行了針對性的優化,最終達到大幅提升運行時效率的目標。反觀RDD,由於無從得知所存數據元素的具體內部結構,Spark Core只能在Stage層面進行簡單、通用的流水線優化。
什么是DataSet
DataSet是分布式數據集合。
DataSet是強類型的。比如可以有DataSet[Car],DataSet[User]。具有類型安全檢查
DataFrame是DataSet的特例,type DataFrame = DataSet[Row] ,Row是一個類型,跟Car、User這些的類型一樣,所有的表結構信息都用Row來表示。
RDD、DataFrame和DataSet之間關系
1)發展歷史
RDD(Spark1.0)=》Dataframe(Spark1.3)=》Dataset(Spark1.6)
如果同樣的數據都給到這三個數據結構,他們分別計算之后,都會給出相同的結果。不同是的他們的執行效率和執行方式。在后期的Spark版本中,DataSet有可能會逐步取代RDD和DataFrame成為唯一的API接口。
2)三者的共性
(1)RDD、DataFrame、DataSet全都是Spark平台下的分布式彈性數據集,為處理超大型數據提供便利
(2)三者都有惰性機制,在進行創建、轉換,如map方法時,不會立即執行,只有在遇到Action行動算子如foreach時,三者才會開始遍歷運算
(3)三者有許多共同的函數,如filter,排序等
(4)三者都會根據Spark的內存情況自動緩存運算
(5)三者都有分區的概念
Spark SQL的特點
1)易整合
無縫的整合了SQL查詢和Spark編程。
2)統一的數據訪問方式
使用相同的方式連接不同的數據源。
3)兼容Hive
在已有的倉庫上直接運行SQL或者HiveQL。
4)標准的數據連接
通過JDBC或者ODBC來連接
Spark SQL編程
本章重點學習如何使用DataFrame和DataSet進行編程,以及他們之間的關系和轉換,關於具體的SQL書寫不是本章的重點。
SparkSession新的起始點
在老的版本中,SparkSQL提供兩種SQL查詢起始點:
一個叫SQLContext,用於Spark自己提供的SQL查詢;
一個叫HiveContext,用於連接Hive的查詢。
SparkSession是Spark最新的SQL查詢起始點,實質上是SQLContext和HiveContext的組合,所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。
SparkSession內部封裝了SparkContext,所以計算實際上是由SparkContext完成的。當使用spark-shell的時候,Spark框架會自動的創建一個名稱叫做Spark的SparkSession,就像以前可以自動獲取到一個sc來表示SparkContext。
DataFrame
DataFrame是一種以RDD為基礎的分布式數據集,類似於傳統數據庫中的二維表格。
創建DataFrame
在Spark SQL中SparkSession是創建DataFrame和執行SQL的入口,創建DataFrame有三種方式:
通過Spark的數據源進行創建;
從一個存在的RDD進行轉換;
還可以從Hive Table進行查詢返回。
1)從Spark數據源進行創建
(1)數據准備,在/opt/module/spark-local目錄下創建一個user.json文件
-
{"age":20,"name":"qiaofeng"}
-
{"age":19,"name":"xuzhu"}
-
{"age":18,"name":"duanyu"}
(2)查看Spark支持創建文件的數據源格式,使用tab鍵查看
scala> spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
(3)讀取json文件創建DataFrame
scala> val df = spark.read.json("/opt/module/spark-local/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
注意:如果從內存中獲取數據,Spark可以知道數據類型具體是什么,如果是數字,默認作為Int處理;但是從文件中讀取的數字,不能確定是什么類型,所以用BigInt接收,可以和Long類型轉換,但是和Int不能進行轉換。
(4)查看DataFrame算子
scala> df.
(5)展示結果
2)從RDD進行轉換
參考章節:RDD與DataFrame相互轉換
3)Hive Table進行查詢返回
參考章節:與Hive交互
SQL風格語法
SQL語法風格是指查詢數據的時候使用SQL語句來查詢,這種風格的查詢必須要有臨時視圖或者全局視圖來輔助。
視圖:對特定表的數據的查詢結果重復使用。View只能查詢,不能修改和插入。
select * from t_user where age > 30 的查詢結果可以存儲在臨時表v_user_age中,方便在后面重復使用。例如:select * from v_user_age
1)臨時視圖
(1)創建一個DataFrame
scala> val df = spark.read.json("/opt/module/spark-local/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
(2)對DataFrame創建一個臨時視圖
scala> df.createOrReplaceTempView("user")
(3)通過SQL語句實現查詢全表
scala> val sqlDF = spark.sql("SELECT * FROM user")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
(4)結果展示
(5)求年齡的平均值
scala> val sqlDF = spark.sql("SELECT avg(age) from user")
sqlDF: org.apache.spark.sql.DataFrame = [avg(age): double]
(6)結果展示
scala> sqlDF.show
+--------+
|avg(age)|
+--------+
| 19.0|
+--------+
(7)創建一個新會話再執行,發現視圖找不到
scala> spark.newSession().sql("SELECT avg(age) from user ").show()
org.apache.spark.sql.AnalysisException: Table or view not found: user; line 1 pos 14;
注意:普通臨時視圖是Session范圍內的,如果想全局有效,可以創建全局臨時視圖。
2)全局視圖
(1)對於DataFrame創建一個全局視圖
scala> df.createGlobalTempView("user")
(2)通過SQL語句實現查詢全表
scala> spark.sql("SELECT * FROM global_temp.user").show()
+---+--------+
|age| name|
+---+--------+
| 20|qiaofeng|
| 19| xuzhu|
| 18| duanyu|
+---+--------+
(3)新建session,通過SQL語句實現查詢全表
scala> spark.newSession().sql("SELECT * FROM global_temp.user").show()
+---+--------+
|age| name|
+---+--------+
| 20|qiaofeng|
| 19| xuzhu|
| 18| duanyu|
+---+--------+
DSL風格語法
DataFrame提供一個特定領域語言(domain-specific language,DSL)去管理結構化的數據,可以在Scala,Java,Python和R中使用DSL,使用DSL語法風格不必去創建臨時視圖了。
1)創建一個DataFrame
scala> val df = spark.read.json("/opt/module/spark-local/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
2)查看DataFrame的Schema信息
scala> df.printSchema
root
|-- age: Long (nullable = true)
|-- name: string (nullable = true)
3)只查看"name"列數據
scala> df.select("name").show()
+--------+
| name|
+--------+
|qiaofeng|
| xuzhu|
| duanyu|
+--------+
4)查看年齡和姓名,且年齡大於18
scala> df.select("age","name").where("age>18").show
+---+--------+
|age| name|
+---+--------+
| 20|qiaofeng|
| 19| xuzhu|
+---+--------+
5)查看所有列
scala> df.select("*").show
+---+--------+
|age| name|
+---+--------+
| 20|qiaofeng|
| 19| xuzhu|
| 18| duanyu|
+---+--------+
6)查看"name"列數據以及"age+1"數據
注意:涉及到運算的時候,每列都必須使用$,或者采用引號表達式:單引號+字段名
scala> df.select($"name",$"age" + 1).show
scala> df.select('name, 'age + 1).show()
scala> df.select('name, 'age + 1 as "newage").show()
+--------+---------+
| name|(age + 1)|
+--------+---------+
|qiaofeng| 21|
| xuzhu| 20|
| duanyu| 19|
+--------+---------+
7)查看"age"大於"19"的數據
scala> df.filter("age>19").show
+---+--------+
|age| name|
+---+--------+
| 20|qiaofeng|
+---+--------+
8)按照"age"分組,查看數據條數
scala> df.groupBy("age").count.show
+---+-----+
|age|count|
+---+-----+
| 19| 1|
| 18| 1|
| 20| 1|
+---+-----+
DataSet
DataSet是具有強類型的數據集合,需要提供對應的類型信息。
創建DataSet(基本類型序列)
使用基本類型的序列創建DataSet
(1)將集合轉換為DataSet
scala> val ds = Seq(1,2,3,4,5,6).toDS
ds: org.apache.spark.sql.Dataset[Int] = [value: int]
(2)查看DataSet的值
scala> ds.show
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
+-----+
創建DataSet(樣例類序列)
使用樣例類序列創建DataSet
(1)創建一個User的樣例類
scala> case class User(name: String, age: Long)
defined class User
(2)將集合轉換為DataSet
scala> val caseClassDS = Seq(User("wangyuyan",2)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[User] = [name: string, age: bigint]
(3)查看DataSet的值
scala> caseClassDS.show
+---------+---+
| name|age|
+---------+---+
|wangyuyan| 2|
+---------+---+
注意:在實際使用的時候,很少用到把序列轉換成DataSet,更多是通過RDD來得到DataSet
RDD、DataFrame、DataSet相互轉換
IDEA創建SparkSQL工程
1)創建一個maven工程SparkSQLTest
2)在項目SparkSQLTest上點擊右鍵,Add Framework Support=》勾選scala
3)在main下創建scala文件夾,並右鍵Mark Directory as Sources Root=>在scala下創建包名為com.zhangjk.sparksql
4)輸入文件夾准備:在新建的SparkSQLTest項目名稱上右鍵=》新建input文件夾=》在input文件夾上右鍵=》新建user.json。並輸入如下內容:
{"age":20,"name":"qiaofeng"}
{"age":19,"name":"xuzhu"}
{"age":18,"name":"duanyu"}
5)在pom.xml文件中添加如下依賴
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
6)代碼實現
-
package com.zhangjk.sparksql
-
-
import org.apache.spark.SparkConf
-
import org.apache.spark.sql.{DataFrame, SparkSession}
-
-
object SparkSQL01_inpu {
-
def main(args: Array[String]): Unit = {
-
// 1 創建上下文環境配置對象
-
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
-
// 2 創建SparkSession對象
-
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
-
// 3 讀取數據
-
val df: DataFrame = spark.read.json("input/user.json")
-
// 4 可視化
-
df.show()
-
// 5 釋放資源
-
spark.stop()
-
}
-
}
RDD與DataFrame相互轉換
1)RDD轉換為DataFrame
方式1:手動轉換:RDD.toDF("列名1", "列名2")
方式2:通過樣例類反射轉換:UserRDD.map{ x=>User(x._1,x._2) }.toDF()
2)DataFrame轉換為RDD
DataFrame.rdd
3)在input/目錄下准備user.txt並輸入如下內容:
qiaofeng,20
xuzhu,19
duanyu,18
4)代碼實現
-
package com.zhangjk.sparksql
-
-
import org.apache.spark.rdd.RDD
-
import org.apache.spark.{SparkConf, SparkContext}
-
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-
-
object SparkSQL02_RDDAndDataFrame {
-
-
def main(args: Array[String]): Unit = {
-
-
//1.創建SparkConf並設置App名稱
-
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
//2.創建SparkContext,該對象是提交Spark App的入口
-
val sc: SparkContext = new SparkContext(conf)
-
-
//3.1 獲取數據
-
val LineRDD: RDD[String] = sc.textFile("input/user.txt")
-
//3.2 RDD准備完成
-
val userRDD: RDD[(String, Int)] = LineRDD.map {
-
line =>
-
val fields = line.split(",")
-
(fields(0), fields(1).trim.toInt)
-
}
-
-
//4. 創建SparkSession對象
-
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
-
-
//5.1 RDD和DataFrame、DataSet轉換必須要導的包
-
import spark.implicits._
-
//5.2 RDD轉換為DataFrame(手動轉)
-
userRDD.toDF("name","age").show
-
-
//5.3 RDD轉換為DataFrame(通過樣例類反射轉)
-
val userDataFrame: DataFrame = userRDD.map {
-
case (name, age) => User(name, age)
-
}.toDF()
-
userDataFrame.show()
-
-
//5.4 DataFrame 轉換為RDD
-
val uRDD: RDD[Row] = userDataFrame.rdd
-
uRDD.collect().foreach(println)
-
-
//6.關閉連接
-
sc.stop()
-
}
-
}
-
-
//樣例類
-
case class User(name:String, age:Long)
RDD與DataSet相互轉換
1)RDD轉換為DataSet
RDD.map { x => User(x._1, x._2) }.toDS()
SparkSQL能夠自動將包含有樣例類的RDD轉換成DataSet,樣例類定義了table的結構,樣例類屬性通過反射變成了表的列名。樣例類可以包含諸如Seq或者Array等復雜的結構。
2)DataSet轉換為RDD
DS.rdd
3)代碼實現
-
package com.zhangjk.sparksql
-
-
import org.apache.spark.rdd.RDD
-
import org.apache.spark.sql.{Dataset, SparkSession}
-
import org.apache.spark.{SparkConf, SparkContext}
-
-
object SparkSQL03_RDDAndDataSet {
-
def main(args: Array[String]): Unit = {
-
-
//1.創建SparkConf並設置App名稱
-
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
//2.創建SparkContext,該對象是提交Spark App的入口
-
val sc: SparkContext = new SparkContext(conf)
-
-
//3.1 獲取數據
-
val lineRDD: RDD[String] = sc.textFile("input/user.txt")
-
-
//4. 創建SparkSession對象
-
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
-
-
//5.1 RDD和DataFrame、DataSet轉換必須要導的包
-
import spark.implicits._
-
//5.2 RDD轉換為DataSet
-
val userDataSet: Dataset[User] = lineRDD.map {
-
line =>
-
val fields = line.split(",")
-
User(fields(0), fields(1).toInt)
-
}.toDS()
-
userDataSet.show()
-
-
//5.3 DataSet轉換為RDD
-
val userRDD: RDD[User] = userDataSet.rdd
-
userRDD.collect().foreach(println)
-
-
//6.關閉連接
-
sc.stop()
-
}
-
}
-
-
case class User(name:String,age:Long)
DataFrame與DataSet相互轉換
1)DataFrame轉為DataSet
df.as[User]
2)DataSet轉為DataFrame
ds.toDF
3)代碼實現
-
package com.zhangjk.sparksql
-
-
import org.apache.spark.SparkConf
-
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-
-
object SparkSQL04_DataFrameAndDataSet {
-
-
def main(args: Array[String]): Unit = {
-
// 1 創建上下文環境配置對象
-
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
-
// 2 創建SparkSession對象
-
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
-
-
// 3 讀取數據
-
val df: DataFrame = spark.read.json("input/user.json")
-
-
//4.1 RDD和DataFrame、DataSet轉換必須要導的包
-
import spark.implicits._
-
// 4.2 DataFrame 轉換為DataSet
-
val userDataSet: Dataset[User] = df.as[User]
-
userDataSet.show()
-
-
// 4.3 DataSet轉換為DataFrame
-
val userDataFrame: DataFrame = userDataSet.toDF()
-
userDataFrame.show()
-
-
// 5 釋放資源
-
spark.stop()
-
}
-
}
-
-
case class User(name: String,age: Long)
用戶自定義函數
UDF
1)UDF:一行進入,一行出
2)代碼實現
-
package com.zhangjk.sparksql
-
-
import org.apache.spark.SparkConf
-
import org.apache.spark.sql.{DataFrame, SparkSession}
-
-
object SparkSQL05_UDF{
-
-
def main(args: Array[String]): Unit = {
-
-
// 1 創建上下文環境配置對象
-
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
-
// 2 創建SparkSession對象
-
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
-
// 3 讀取數據
-
val df: DataFrame = spark.read.json("input/user.json")
-
-
// 4 創建DataFrame臨時視圖
-
df.createOrReplaceTempView("user")
-
-
// 5 注冊UDF函數。功能:在數據前添加字符串"Name:"
-
spark.udf.register("addName", (x:String) => "Name:"+ x)
-
-
// 6 調用自定義UDF函數
-
spark.sql("select addName(name) as newName, age from user").show()
-
-
// 7 釋放資源
-
spark.stop()
-
}
-
}
UDAF
1)UDAF:輸入多行,返回一行。
2)Spark3.x推薦使用extends Aggregator自定義UDAF,屬於強類型的Dataset方式。
3)Spark2.x使用extends UserDefinedAggregateFunction,屬於弱類型的DataFrame
4)需求:實現求平均年齡
(1)自定義聚合函數實現-強類型
-
package com.zhangjk.sparksql
-
-
import org.apache.spark.SparkConf
-
import org.apache.spark.sql.expressions.Aggregator
-
import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession, functions}
-
-
object SparkSQL06_UDAF {
-
-
def main(args: Array[String]): Unit = {
-
-
// 1 創建上下文環境配置對象
-
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
-
-
// 2 創建SparkSession對象
-
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
-
-
// 3 讀取數據
-
val df: DataFrame = spark.read.json("input/user.json")
-
-
// 4 創建DataFrame臨時視圖
-
df.createOrReplaceTempView("user")
-
-
// 5 注冊UDAF
-
spark.udf.register("myAvg", functions.udaf(new MyAvgUDAF()))
-
-
// 6 調用自定義UDAF函數
-
spark.sql("select myAvg(age) from user").show()
-
-
// 7 釋放資源
-
spark.stop()
-
}
-
}
-
-
//輸入數據類型
-
case class Buff(var sum: Long, var count: Long)
-
-
/**
-
* 1,20歲; 2,19歲; 3,18歲
-
* IN:聚合函數的輸入類型:Long
-
* BUF:
-
* OUT:聚合函數的輸出類型:Double (18+19+20) / 3
-
*/
-
class MyAvgUDAF extends Aggregator[Long, Buff, Double] {
-
-
// 初始化緩沖區
-
override def zero: Buff = Buff(0L, 0L)
-
-
// 將輸入的年齡和緩沖區的數據進行聚合
-
override def reduce(buff: Buff, age: Long): Buff = {
-
buff.sum = buff.sum + age
-
buff.count = buff.count + 1
-
buff
-
}
-
-
// 多個緩沖區數據合並
-
override def merge(buff1: Buff, buff2: Buff): Buff = {
-
buff1.sum = buff1.sum + buff2.sum
-
buff1.count = buff1.count + buff2.count
-
buff1
-
}
-
-
// 完成聚合操作,獲取最終結果
-
override def finish(buff: Buff): Double = {
-
buff.sum.toDouble / buff.count
-
}
-
-
// SparkSQL對傳遞的對象的序列化操作(編碼)
-
// 自定義類型就是product 自帶類型根據類型選擇
-
override def bufferEncoder: Encoder[Buff] = Encoders.product
-
-
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
-
}
(2)自定義聚合函數實現-弱類型(已過時--了解)
-
package com.zhangjk.sparksql
-
-
import org.apache.spark.SparkConf
-
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
-
import org.apache.spark.sql._
-
import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StructField, StructType}
-
-
object SparkSQL07_UDAF{
-
-
def main(args: Array[String]): Unit = {
-
-
// 1 創建上下文環境配置對象
-
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
-
-
// 2 創建SparkSession對象
-
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
-
-
// 3 讀取數據
-
val df: DataFrame = spark.read.json("input/user.json")
-
-
// 4 注冊UDAF
-
spark.udf.register("myAvg",new MyAvgUDAF())
-
-
// 5 創建DataFrame臨時視圖
-
df.createOrReplaceTempView("user")
-
-
// 6 調用自定義UDAF函數
-
spark.sql("select myAvg(age) from user").show()
-
-
// 7 釋放資源
-
spark.stop()
-
}
-
-
class MyAvgUDAF extends UserDefinedAggregateFunction {
-
-
// 聚合函數輸入參數的數據類型:age(Long)
-
override def inputSchema: StructType = {
-
StructType(Array(
-
StructField("age",LongType)
-
))
-
}
-
-
// 聚合函數緩沖區中值的數據類型(age,count)
-
override def bufferSchema: StructType = {
-
StructType(Array(
-
StructField("sum",LongType),
-
StructField("count",LongType)
-
))
-
}
-
-
// 函數返回值的數據類型
-
override def dataType: DataType = DoubleType
-
-
// 穩定性:對於相同的輸入是否一直返回相同的輸出。
-
override def deterministic: Boolean = true
-
-
// 函數緩沖區初始化
-
override def initialize(buffer: MutableAggregationBuffer): Unit = {
-
// 存年齡的總和
-
buffer.update(0, 0L)
-
-
// 存年齡的個數
-
buffer.update(1, 0L)
-
}
-
-
// 更新緩沖區中的數據
-
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
-
-
if (!input.isNullAt(0)) {
-
buffer(0) = buffer.getLong(0) + input.getLong(0)
-
buffer(1) = buffer.getLong(1) + 1
-
}
-
}
-
-
// 合並緩沖區
-
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
-
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
-
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
-
}
-
-
// 計算最終結果
-
override def evaluate(buffer: Row): Any = {
-
buffer.getLong(0).toDouble / buffer.getLong(1)
-
}
-
}
-
}
UDTF(無)
輸入一行,返回多行(這種方式在Hive中有);
SparkSQL中沒有UDTF,Spark中用flatMap即可實現該功能
SparkSQL數據的加載與保存
加載數據
1)加載數據通用方法
spark.read.load是加載數據的通用方法
2)代碼實現
-
package com.zhangjk.sparksql
-
-
import org.apache.spark.SparkConf
-
import org.apache.spark.sql._
-
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
-
import org.apache.spark.sql.types._
-
-
object SparkSQL08_Load{
-
-
def main(args: Array[String]): Unit = {
-
-
// 1 創建上下文環境配置對象
-
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
-
// 2 創建SparkSession對象
-
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
-
-
// 3.1 spark.read直接讀取數據:csv format jdbc json load option
-
// options orc parquet schema table text textFile
-
// 注意:加載數據的相關參數需寫到上述方法中,
-
// 如:textFile需傳入加載數據的路徑,jdbc需傳入JDBC相關參數。
-
spark.read.json("input/user.json").show()
-
-
// 3.2 format指定加載數據類型
-
// spark.read.format("…")[.option("…")].load("…")
-
// format("…"):指定加載的數據類型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"
-
// load("…"):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要傳入加載數據路徑
-
// option("…"):在"jdbc"格式下需要傳入JDBC相應參數,url、user、password和dbtable
-
spark.read.format("json").load ("input/user.json").show
-
-
// 4 釋放資源
-
spark.stop()
-
}
-
}
保存數據
1)保存數據通用方法
df.write.save是保存數據的通用方法
2)代碼實現
-
package com.zhangjk.sparksql
-
-
import org.apache.spark.SparkConf
-
import org.apache.spark.sql._
-
-
object SparkSQL09_Save{
-
-
def main(args: Array[String]): Unit = {
-
-
// 1 創建上下文環境配置對象
-
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
-
-
// 2 創建SparkSession對象
-
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
-
-
// 3 獲取數據
-
val df: DataFrame = spark.read.json("input/user.json")
-
-
// 4.1 df.write.保存數據:csv jdbc json orc parquet textFile… …
-
// 注意:保存數據的相關參數需寫到上述方法中。如:textFile需傳入加載數據的路徑,JDBC需傳入JDBC相關參數。
-
// 默認保存為parquet文件(可以修改conf.set("spark.sql.sources.default","json"))
-
df.write.save("output")
-
-
// 默認讀取文件parquet
-
spark.read.load("output").show()
-
-
// 4.2 format指定保存數據類型
-
// df.write.format("…")[.option("…")].save("…")
-
// format("…"):指定保存的數據類型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
-
// save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要傳入保存數據的路徑。
-
// option("…"):在"jdbc"格式下需要傳入JDBC相應參數,url、user、password和dbtable
-
df.write.format("json").save("output2")
-
-
// 4.3 可以指定為保存格式,直接保存,不需要再調用save了
-
df.write.json("output1")
-
-
// 4.4 如果文件已經存在則追加
-
df.write.mode("append").json("output2")
-
-
// 如果文件已經存在則忽略
-
df.write.mode("ignore").json("output2")
-
-
// 如果文件已經存在則覆蓋
-
df.write.mode("overwrite").json("output2")
-
-
// 默認default:如果文件已經存在則拋出異常
-
// path file:/E:/ideaProject2/SparkSQLTest/output2 already exists.;
-
df.write.mode("error").json("output2")
-
-
// 5 釋放資源
-
spark.stop()
-
}
-
}
-
與MySQL交互
1)導入依賴
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
2)從MySQL讀數據
-
import org.apache.spark.SparkConf
-
import org.apache.spark.sql._
-
-
object SparkSQL10_MySQL_Read{
-
-
def main(args: Array[String]): Unit = {
-
-
// 1 創建上下文環境配置對象
-
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
-
-
// 2 創建SparkSession對象
-
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
-
-
// 3.1 通用的load方法讀取
-
val df: DataFrame = spark.read.format("jdbc")
-
.option("url", "jdbc:mysql://hadoop102:3306/gmall")
-
.option("driver", "com.mysql.jdbc.Driver")
-
.option("user", "root")
-
.option("password", "000000")
-
.option("dbtable", "user_info")
-
.load()
-
-
// 3.2 創建視圖
-
df.createOrReplaceTempView("user")
-
-
// 3.3 查詢想要的數據
-
spark.sql("select id, name from user").show()
-
-
// 4 釋放資源
-
spark.stop()
-
}
-
}
3)向MySQL寫數據
-
import org.apache.spark.SparkConf
-
import org.apache.spark.rdd.RDD
-
import org.apache.spark.sql._
-
-
object SparkSQL11_MySQL_Write {
-
-
def main(args: Array[String]): Unit = {
-
-
// 1 創建上下文環境配置對象
-
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
-
-
// 2 創建SparkSession對象
-
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
-
-
// 3 准備數據
-
// 注意:id是主鍵,不能和MySQL數據庫中的id重復
-
val rdd: RDD[User] = spark.sparkContext.makeRDD(List(User(3000, "zhangsan"), User(3001, "lisi")))
-
-
val ds: Dataset[User] = rdd.toDS
-
-
// 4 向MySQL中寫入數據
-
ds.write
-
.format("jdbc")
-
.option("url", "jdbc:mysql://hadoop102:3306/gmall")
-
.option("user", "root")
-
.option("password", "000000")
-
.option("dbtable", "user_info")
-
.mode(SaveMode.Append)
-
.save()
-
-
// 5 釋放資源
-
spark.stop()
-
}
-
-
case class User(id: Int, name: String)
-
}
與Hive交互
SparkSQL可以采用內嵌Hive,也可以采用外部Hive。企業開發中,通常采用外部Hive。
內嵌Hive應用
內嵌Hive,元數據存儲在Derby數據庫。
1)如果使用Spark內嵌的Hive,則什么都不用做,直接使用即可。
[bin/spark-shell
scala> spark.sql("show tables").show
注意:執行完后,發現多了$SPARK_HOME/metastore_db和derby.log,用於存儲元數據
2)創建一個數據庫
scala> spark.sql("create table user(id int, name string)")
注意:執行完后,發現多了$SPARK_HOME/spark-warehouse/user,用於存儲數據庫數據
3)查看數據庫
scala> spark.sql("show tables").show
4)向表中插入數據
scala> spark.sql("insert into user values(1,'zs')")
5)查詢數據
scala> spark.sql("select * from user").show
注意:然而在實際使用中,幾乎沒有任何人會使用內置的Hive,因為元數據存儲在derby數據庫,不支持多客戶端訪問。
3.4.2 外部Hive應用
如果Spark要接管Hive外部已經部署好的Hive,需要通過以下幾個步驟。
0)為了說明內嵌Hive和外部Hive區別:刪除內嵌Hive的metastore_db和spark-warehouse
rm -rf metastore_db/ spark-warehouse/
1)確定原有Hive是正常工作的
sbin/start-dfs.sh
sbin/start-yarn.sh
bin/hive
2)需要把hive-site.xml拷貝到spark的conf/目錄下
cp hive-site.xml /opt/module/spark-local/conf/
3)如果以前hive-site.xml文件中,配置過Tez相關信息,注釋掉(不是必須)
4)把MySQL的驅動copy到Spark的jars/目錄下
cp mysql-connector-java-5.1.48.jar /opt/module/spark-local/jars/
5)需要提前啟動hive服務,/opt/module/hive/bin/hiveservices.sh start(不是必須)
6)如果訪問不到HDFS,則需把core-site.xml和hdfs-site.xml拷貝到conf/目錄(不是必須)
7)啟動 spark-shell
bin/spark-shell
8)查詢表
scala> spark.sql("show tables").show
9)創建一個數據庫
scala> spark.sql("create table user(id int, name string)")
10)向表中插入數據
scala> spark.sql("insert into user values(1,'zs')")
11)查詢數據
scala> spark.sql("select * from user").show
3.4.3 運行Spark SQL CLI
Spark SQL CLI可以很方便的在本地運行Hive元數據服務以及從命令行執行查詢任務。在Spark目錄下執行如下命令啟動Spark SQL CLI,直接執行SQL語句,類似Hive窗口。
bin/spark-sql
spark-sql (default)> show tables;
IDEA操作Hive
1)添加依賴
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
2)拷貝hive-site.xml到resources目錄(如果需要操作Hadoop,需要拷貝hdfs-site.xml、core-site.xml、yarn-site.xml)
3)代碼實現
-
import org.apache.spark.SparkConf
-
import org.apache.spark.sql._
-
-
object SparkSQL12_Hive {
-
-
def main(args: Array[String]): Unit = {
-
-
// 設置訪問用戶名(權限問題)
-
System.setProperty("HADOOP_USER_NAME","hadoop")
-
-
// 1 創建上下文環境配置對象
-
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
-
// 2 創建SparkSession對象
-
val spark: SparkSession = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()
-
-
import spark.implicits._
-
-
// 3 連接外部Hive,並進行操作
-
spark.sql("show tables").show()
-
spark.sql("create table user3(id int, name string)")
-
spark.sql("insert into user3 values(1,'zs')")
-
spark.sql("select * from user3").show
-
-
// 4 釋放資源
-
spark.stop()
-
}
-
}