Spark詳解(06) - SparkSQL


Spark詳解(06) - SparkSQL

Spark SQL概述

什么是Spark SQL

Spark SQL是Spark用於結構化數據(Structured Data)處理的Spark模塊。

(1)半結構化數據(日志數據): 001    zhangsan     18

(2)結構化數據(數據庫數據):

id

name

age

001

zhangsan

18

為什么要有Spark SQL

Hive on Spark:Hive既作為存儲元數據又負責SQL的解析優化,語法是HQL語法,執行引擎變成了Spark,Spark負責采用RDD執行。

Spark on Hive:Hive只作為存儲元數據,Spark負責SQL解析優化,語法是Spark SQL語法,Spark負責采用優化后的RDD執行。

Spark SQL原理

Spark SQL它提供了2個編程抽象,DataFrame、DataSet。(類似Spark Core中的RDD)

什么是DataFrame

1DataFrame是一種以RDD為基礎的分布式數據集,類似於傳統數據庫中的二維表格。

2DataFrameRDD的主要區別在於,前者帶有schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型。

左側的RDD[Person]雖然以Person為類型參數,但Spark框架本身不了解Person類的內部結構。而右側的DataFrame卻提供了詳細的結構信息,使得Spark SQL可以清楚地知道該數據集中包含哪些列,每列的名稱和類型各是什么。

3Spark SQL性能上比RDD要高。因為Spark SQL了解數據內部結構,從而對藏於DataFrame背后的數據源以及作用於DataFrame之上的變換進行了針對性的優化,最終達到大幅提升運行時效率的目標。反觀RDD,由於無從得知所存數據元素的具體內部結構,Spark Core只能在Stage層面進行簡單、通用的流水線優化。

什么是DataSet

DataSet是分布式數據集合。

DataSet是強類型的。比如可以有DataSet[Car]DataSet[User]。具有類型安全檢查

DataFrameDataSet的特例type DataFrame = DataSet[Row] Row是一個類型,跟CarUser這些的類型一樣,所有的表結構信息都用Row來表示。

RDD、DataFrame和DataSet之間關系

1)發展歷史

RDDSpark1.0=DataframeSpark1.3=DatasetSpark1.6

如果同樣的數據都給到這三個數據結構,他們分別計算之后,都會給出相同的結果。不同是的他們的執行效率和執行方式。在后期的Spark版本中,DataSet有可能會逐步取代RDDDataFrame成為唯一的API接口

2)三者的共性

1RDDDataFrameDataSet全都是Spark平台下的分布式彈性數據集,為處理超大型數據提供便利

2)三者都有惰性機制,在進行創建、轉換,如map方法時,不會立即執行,只有在遇到Action行動算子如foreach時,三者才會開始遍歷運算

3)三者有許多共同的函數,如filter,排序等

4)三者都會根據Spark的內存情況自動緩存運算

5)三者都有分區的概念

Spark SQL的特點

1)易整合

無縫的整合了SQL查詢和Spark編程。

2)統一的數據訪問方式

使用相同的方式連接不同的數據源。

3)兼容Hive

在已有的倉庫上直接運行SQL或者HiveQL

4)標准的數據連接

通過JDBC或者ODBC來連接

Spark SQL編程

本章重點學習如何使用DataFrameDataSet進行編程以及他們之間的關系和轉換,關於具體的SQL書寫不是本章的重點。

SparkSession新的起始點

在老的版本中,SparkSQL提供兩種SQL查詢起始點:

一個叫SQLContext,用於Spark自己提供的SQL查詢;

一個叫HiveContext,用於連接Hive的查詢。

SparkSessionSpark最新的SQL查詢起始點,實質上是SQLContextHiveContext的組合,所以在SQLContextHiveContext上可用的APISparkSession上同樣是可以使用的。

SparkSession內部封裝了SparkContext,所以計算實際上是由SparkContext完成的。當使用spark-shell的時候,Spark框架會自動的創建一個名稱叫做SparkSparkSession,就像以前可以自動獲取到一個sc來表示SparkContext

DataFrame

DataFrame是一種以RDD為基礎的分布式數據集,類似於傳統數據庫中的二維表格。

創建DataFrame

Spark SQLSparkSession是創建DataFrame和執行SQL的入口,創建DataFrame有三種方式:

通過Spark的數據源進行創建;

從一個存在的RDD進行轉換;

還可以從Hive Table進行查詢返回。

1)從Spark數據源進行創建

1)數據准備,在/opt/module/spark-local目錄下創建一個user.json文件

  1. {"age":20,"name":"qiaofeng"}
  2. {"age":19,"name":"xuzhu"}
  3. {"age":18,"name":"duanyu"}

2)查看Spark支持創建文件的數據源格式,使用tab鍵查看

scala> spark.read.

csv format jdbc json load option options orc parquet schema table text textFile

3)讀取json文件創建DataFrame

scala> val df = spark.read.json("/opt/module/spark-local/user.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

注意:如果從內存中獲取數據,Spark可以知道數據類型具體是什么,如果是數字,默認作為Int處理;但是從文件中讀取的數字,不能確定是什么類型,所以用BigInt接收,可以和Long類型轉換,但是和Int不能進行轉換。

4)查看DataFrame算子

scala> df.

5)展示結果

2)從RDD進行轉換

參考章節:RDDDataFrame相互轉換

3Hive Table進行查詢返回

參考章節:與Hive交互

SQL風格語法

SQL語法風格是指查詢數據的時候使用SQL語句來查詢,這種風格的查詢必須要有臨時視圖或者全局視圖來輔助。

視圖:對特定表的數據的查詢結果重復使用View只能查詢,不能修改和插入。

select * from t_user where age > 30 的查詢結果可以存儲在臨時表v_user_age中,方便在后面重復使用。例如:select * from v_user_age

1)臨時視圖

1)創建一個DataFrame

scala> val df = spark.read.json("/opt/module/spark-local/user.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

2)對DataFrame創建一個臨時視圖

scala> df.createOrReplaceTempView("user")

3)通過SQL語句實現查詢全表

scala> val sqlDF = spark.sql("SELECT * FROM user")

sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

4)結果展示

5)求年齡的平均值

scala> val sqlDF = spark.sql("SELECT avg(age) from user")

sqlDF: org.apache.spark.sql.DataFrame = [avg(age): double]

6)結果展示

scala> sqlDF.show

+--------+

|avg(age)|

+--------+

| 19.0|

+--------+

7)創建一個新會話再執行,發現視圖找不到

scala> spark.newSession().sql("SELECT avg(age) from user ").show()

org.apache.spark.sql.AnalysisException: Table or view not found: user; line 1 pos 14;

注意:普通臨時視圖是Session范圍內的,如果想全局有效,可以創建全局臨時視圖。

2)全局視圖

1)對於DataFrame創建一個全局視圖

scala> df.createGlobalTempView("user")

2)通過SQL語句實現查詢全表

scala> spark.sql("SELECT * FROM global_temp.user").show()

+---+--------+

|age| name|

+---+--------+

| 20|qiaofeng|

| 19| xuzhu|

| 18| duanyu|

+---+--------+

3)新建session,通過SQL語句實現查詢全表

scala> spark.newSession().sql("SELECT * FROM global_temp.user").show()

+---+--------+

|age| name|

+---+--------+

| 20|qiaofeng|

| 19| xuzhu|

| 18| duanyu|

+---+--------+

DSL風格語法

DataFrame提供一個特定領域語言(domain-specific languageDSL)去管理結構化的數據,可以在ScalaJavaPythonR中使用DSL,使用DSL語法風格不必去創建臨時視圖了。

1)創建一個DataFrame

scala> val df = spark.read.json("/opt/module/spark-local/user.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

2)查看DataFrameSchema信息

scala> df.printSchema

root

|-- age: Long (nullable = true)

|-- name: string (nullable = true)

3)只查看"name"列數據

scala> df.select("name").show()

+--------+

| name|

+--------+

|qiaofeng|

| xuzhu|

| duanyu|

+--------+

4)查看年齡和姓名,且年齡大於18

scala> df.select("age","name").where("age>18").show

+---+--------+

|age| name|

+---+--------+

| 20|qiaofeng|

| 19| xuzhu|

+---+--------+

5)查看所有列

scala> df.select("*").show

+---+--------+

|age| name|

+---+--------+

| 20|qiaofeng|

| 19| xuzhu|

| 18| duanyu|

+---+--------+

6)查看"name"列數據以及"age+1"數據

注意:涉及到運算的時候,每列都必須使用$,或者采用引號表達式:單引號+字段名

scala> df.select($"name",$"age" + 1).show

scala> df.select('name, 'age + 1).show()

scala> df.select('name, 'age + 1 as "newage").show()

 

+--------+---------+

| name|(age + 1)|

+--------+---------+

|qiaofeng| 21|

| xuzhu| 20|

| duanyu| 19|

+--------+---------+

7)查看"age"大於"19"的數據

scala> df.filter("age>19").show

+---+--------+

|age| name|

+---+--------+

| 20|qiaofeng|

+---+--------+

8)按照"age"分組,查看數據條數

scala> df.groupBy("age").count.show

+---+-----+

|age|count|

+---+-----+

| 19| 1|

| 18| 1|

| 20| 1|

+---+-----+

DataSet

DataSet是具有強類型的數據集合,需要提供對應的類型信息。

創建DataSet(基本類型序列)

使用基本類型的序列創建DataSet

1)將集合轉換為DataSet

scala> val ds = Seq(1,2,3,4,5,6).toDS

ds: org.apache.spark.sql.Dataset[Int] = [value: int]

2)查看DataSet的值

scala> ds.show

+-----+

|value|

+-----+

| 1|

| 2|

| 3|

| 4|

| 5|

| 6|

+-----+

創建DataSet(樣例類序列)

使用樣例類序列創建DataSet

1)創建一個User的樣例類

scala> case class User(name: String, age: Long)

defined class User

2)將集合轉換為DataSet

scala> val caseClassDS = Seq(User("wangyuyan",2)).toDS()

caseClassDS: org.apache.spark.sql.Dataset[User] = [name: string, age: bigint]

3)查看DataSet的值

scala> caseClassDS.show

+---------+---+

| name|age|

+---------+---+

|wangyuyan| 2|

+---------+---+

注意:在實際使用的時候,很少用到把序列轉換成DataSet,更多是通過RDD來得到DataSet

RDD、DataFrame、DataSet相互轉換

IDEA創建SparkSQL工程

1)創建一個maven工程SparkSQLTest

2)在項目SparkSQLTest上點擊右鍵,Add Framework Support=》勾選scala

3)在main下創建scala文件夾,並右鍵Mark Directory as Sources Root=>scala下創建包名為com.zhangjk.sparksql

4)輸入文件夾准備:在新建的SparkSQLTest項目名稱上右鍵=》新建input文件夾=》在input文件夾上右鍵=》新建user.json。並輸入如下內容:

{"age":20,"name":"qiaofeng"}

{"age":19,"name":"xuzhu"}

{"age":18,"name":"duanyu"}

5)在pom.xml文件中添加如下依賴

<dependencies>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.12</artifactId>

<version>3.0.0</version>

</dependency>

</dependencies>

6)代碼實現

  1. package com.zhangjk.sparksql
  2.  
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.sql.{DataFrame, SparkSession}
  5.  
  6. object SparkSQL01_inpu {
  7.   def main(args: Array[String]): Unit = {
  8.     // 1 創建上下文環境配置對象
  9.     val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
  10.     // 2 創建SparkSession對象
  11.     val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  12.     // 3 讀取數據
  13.     val df: DataFrame = spark.read.json("input/user.json")
  14.     // 4 可視化
  15.     df.show()
  16.     // 5 釋放資源
  17.     spark.stop()
  18.   }
  19. }

RDDDataFrame相互轉換

1RDD轉換為DataFrame

方式1:手動轉換:RDD.toDF("列名1", "列名2")

方式2:通過樣例類反射轉換:UserRDD.map{ x=>User(x._1,x._2) }.toDF()

2DataFrame轉換為RDD

DataFrame.rdd

3)在input/目錄下准備user.txt並輸入如下內容:

qiaofeng,20

xuzhu,19

duanyu,18

4)代碼實現

  1. package com.zhangjk.sparksql
  2.  
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. import org.apache.spark.sql.{DataFrame, Row, SparkSession}
  6.  
  7. object SparkSQL02_RDDAndDataFrame {
  8.  
  9.     def main(args: Array[String]): Unit = {
  10.  
  11.         //1.創建SparkConf並設置App名稱
  12.         val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  13.         //2.創建SparkContext,該對象是提交Spark App的入口
  14.         val sc: SparkContext = new SparkContext(conf)
  15.  
  16.         //3.1 獲取數據
  17.         val LineRDD: RDD[String] = sc.textFile("input/user.txt")
  18.         //3.2 RDD准備完成
  19.         val userRDD: RDD[(String, Int)] = LineRDD.map {
  20.             line =>
  21.                 val fields = line.split(",")
  22.                 (fields(0), fields(1).trim.toInt)
  23.         }
  24.  
  25.         //4. 創建SparkSession對象
  26.         val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  27.  
  28.         //5.1 RDDDataFrameDataSet轉換必須要導的包
  29.         import spark.implicits._
  30.         //5.2 RDD轉換為DataFrame(手動轉)
  31.         userRDD.toDF("name","age").show
  32.  
  33.         //5.3 RDD轉換為DataFrame(通過樣例類反射轉)
  34.         val userDataFrame: DataFrame = userRDD.map {
  35.             case (name, age) => User(name, age)
  36.         }.toDF()
  37.         userDataFrame.show()
  38.  
  39.         //5.4 DataFrame 轉換為RDD
  40.         val uRDD: RDD[Row] = userDataFrame.rdd
  41.         uRDD.collect().foreach(println)
  42.  
  43.         //6.關閉連接
  44.         sc.stop()
  45.     }
  46. }
  47.  
  48. //樣例類
  49. case class User(name:String, age:Long)

RDD與DataSet相互轉換

1RDD轉換為DataSet

RDD.map { x => User(x._1, x._2) }.toDS()

SparkSQL能夠自動將包含有樣例類的RDD轉換成DataSet,樣例類定義了table的結構,樣例類屬性通過反射變成了表的列名。樣例類可以包含諸如Seq或者Array等復雜的結構。

2DataSet轉換為RDD

DS.rdd

3)代碼實現

  1. package com.zhangjk.sparksql
  2.  
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.sql.{Dataset, SparkSession}
  5. import org.apache.spark.{SparkConf, SparkContext}
  6.  
  7. object SparkSQL03_RDDAndDataSet {
  8.   def main(args: Array[String]): Unit = {
  9.  
  10.     //1.創建SparkConf並設置App名稱
  11.     val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  12.     //2.創建SparkContext,該對象是提交Spark App的入口
  13.     val sc: SparkContext = new SparkContext(conf)
  14.  
  15.     //3.1 獲取數據
  16.     val lineRDD: RDD[String] = sc.textFile("input/user.txt")
  17.  
  18.     //4. 創建SparkSession對象
  19.     val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  20.  
  21.     //5.1 RDDDataFrameDataSet轉換必須要導的包
  22.     import spark.implicits._
  23.     //5.2 RDD轉換為DataSet
  24.     val userDataSet: Dataset[User] = lineRDD.map {
  25.       line =>
  26.         val fields = line.split(",")
  27.         User(fields(0), fields(1).toInt)
  28.     }.toDS()
  29.     userDataSet.show()
  30.  
  31.     //5.3 DataSet轉換為RDD
  32.     val userRDD: RDD[User] = userDataSet.rdd
  33.     userRDD.collect().foreach(println)
  34.  
  35.     //6.關閉連接
  36.     sc.stop()
  37.   }
  38. }
  39.  
  40. case class User(name:String,age:Long)

DataFrame與DataSet相互轉換

1DataFrame轉為DataSet

df.as[User]

2DataSet轉為DataFrame

    ds.toDF

3)代碼實現

  1. package com.zhangjk.sparksql
  2.  
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
  5.  
  6. object SparkSQL04_DataFrameAndDataSet {
  7.  
  8.     def main(args: Array[String]): Unit = {
  9.         // 1 創建上下文環境配置對象
  10.         val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
  11.         // 2 創建SparkSession對象
  12.         val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  13.  
  14.         // 3 讀取數據
  15.         val df: DataFrame = spark.read.json("input/user.json")
  16.  
  17.         //4.1 RDDDataFrameDataSet轉換必須要導的包
  18.         import spark.implicits._
  19.         // 4.2 DataFrame 轉換為DataSet
  20.         val userDataSet: Dataset[User] = df.as[User]
  21.         userDataSet.show()
  22.  
  23.         // 4.3 DataSet轉換為DataFrame
  24.         val userDataFrame: DataFrame = userDataSet.toDF()
  25.         userDataFrame.show()
  26.  
  27.         // 5 釋放資源
  28.         spark.stop()
  29.     }
  30. }
  31.  
  32. case class User(name: String,age: Long)

用戶自定義函數

UDF

1UDF:一行進入,一行出

2)代碼實現

  1. package com.zhangjk.sparksql
  2.  
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.sql.{DataFrame, SparkSession}
  5.  
  6. object SparkSQL05_UDF{
  7.  
  8.     def main(args: Array[String]): Unit = {
  9.  
  10.         // 1 創建上下文環境配置對象
  11.         val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
  12.         // 2 創建SparkSession對象
  13.         val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  14.         // 3 讀取數據
  15.         val df: DataFrame = spark.read.json("input/user.json")
  16.  
  17.         // 4 創建DataFrame臨時視圖
  18.         df.createOrReplaceTempView("user")
  19.           
  20.         // 5 注冊UDF函數。功能:在數據前添加字符串"Name:"
  21.         spark.udf.register("addName", (x:String) => "Name:"+ x)
  22.  
  23.         // 6 調用自定義UDF函數
  24.         spark.sql("select addName(name) as newName, age from user").show()
  25.  
  26.         // 7 釋放資源
  27.         spark.stop()
  28.     }
  29. }

UDAF

1UDAF:輸入多行,返回一行。

2Spark3.x推薦使用extends Aggregator自定義UDAF,屬於強類型的Dataset方式。

3Spark2.x使用extends UserDefinedAggregateFunction,屬於弱類型的DataFrame

4)需求:實現求平均年齡

1)自定義聚合函數實現-強類型

  1. package com.zhangjk.sparksql
  2.  
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.sql.expressions.Aggregator
  5. import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession, functions}
  6.  
  7. object SparkSQL06_UDAF {
  8.  
  9.     def main(args: Array[String]): Unit = {
  10.  
  11.         // 1 創建上下文環境配置對象
  12.         val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
  13.  
  14.         // 2 創建SparkSession對象
  15.         val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  16.  
  17.         // 3 讀取數據
  18.         val df: DataFrame = spark.read.json("input/user.json")
  19.  
  20.         // 4 創建DataFrame臨時視圖
  21.         df.createOrReplaceTempView("user")
  22.           
  23.         // 5 注冊UDAF
  24.         spark.udf.register("myAvg", functions.udaf(new MyAvgUDAF()))
  25.  
  26.         // 6 調用自定義UDAF函數
  27.         spark.sql("select myAvg(age) from user").show()
  28.  
  29.         // 7 釋放資源
  30.         spark.stop()
  31.     }
  32. }
  33.  
  34. //輸入數據類型
  35. case class Buff(var sum: Longvar count: Long)
  36.  
  37. /**
  38.  * 1,20歲; 2,19歲; 3,18
  39.  * IN:聚合函數的輸入類型:Long
  40.  * BUF
  41.  * OUT:聚合函數的輸出類型:Double  (18+19+20) / 3
  42.  */
  43. class MyAvgUDAF extends Aggregator[LongBuffDouble] {
  44.  
  45.     // 初始化緩沖區
  46.     override def zero: Buff = Buff(0L0L)
  47.  
  48.     // 將輸入的年齡和緩沖區的數據進行聚合
  49.     override def reduce(buff: Buff, age: Long): Buff = {
  50.         buff.sum = buff.sum + age
  51.         buff.count = buff.count + 1
  52.         buff
  53.     }
  54.  
  55.     // 多個緩沖區數據合並
  56.     override def merge(buff1: Buff, buff2: Buff): Buff = {
  57.         buff1.sum = buff1.sum + buff2.sum
  58.         buff1.count = buff1.count + buff2.count
  59.         buff1
  60.     }
  61.  
  62.     // 完成聚合操作,獲取最終結果
  63.     override def finish(buff: Buff): Double = {
  64.         buff.sum.toDouble / buff.count
  65.     }
  66.  
  67.     // SparkSQL對傳遞的對象的序列化操作(編碼)
  68.     // 自定義類型就是product   自帶類型根據類型選擇
  69.     override def bufferEncoder: Encoder[Buff] = Encoders.product
  70.  
  71.     override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
  72. }

2)自定義聚合函數實現-弱類型(已過時--了解)

  1. package com.zhangjk.sparksql
  2.  
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
  5. import org.apache.spark.sql._
  6. import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StructField, StructType}
  7.  
  8. object SparkSQL07_UDAF{
  9.  
  10.     def main(args: Array[String]): Unit = {
  11.  
  12.         // 1 創建上下文環境配置對象
  13.         val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
  14.  
  15.         // 2 創建SparkSession對象
  16.         val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  17.  
  18.         // 3 讀取數據
  19.         val df: DataFrame = spark.read.json("input/user.json")
  20.  
  21.         // 4 注冊UDAF
  22.         spark.udf.register("myAvg",new MyAvgUDAF())
  23.  
  24.         // 5 創建DataFrame臨時視圖
  25.         df.createOrReplaceTempView("user")
  26.  
  27.         // 6 調用自定義UDAF函數
  28.         spark.sql("select myAvg(age) from user").show()
  29.  
  30.         // 7 釋放資源
  31.         spark.stop()
  32.     }
  33.  
  34.     class MyAvgUDAF extends UserDefinedAggregateFunction {
  35.  
  36.         // 聚合函數輸入參數的數據類型:age(Long)
  37.         override def inputSchema: StructType = {
  38.             StructType(Array(
  39.                 StructField("age",LongType)
  40.             ))
  41.         }
  42.  
  43.         // 聚合函數緩沖區中值的數據類型(age,count)
  44.         override def bufferSchema: StructType = {
  45.             StructType(Array(
  46.                 StructField("sum",LongType),
  47.                 StructField("count",LongType)
  48.             ))
  49.         }
  50.  
  51.         // 函數返回值的數據類型
  52.         override def dataType: DataType = DoubleType
  53.  
  54.         // 穩定性:對於相同的輸入是否一直返回相同的輸出。
  55.         override def deterministic: Boolean = true
  56.  
  57.         // 函數緩沖區初始化
  58.         override def initialize(buffer: MutableAggregationBuffer): Unit = {
  59.             // 存年齡的總和
  60.             buffer.update(00L)
  61.  
  62.             // 存年齡的個數
  63.             buffer.update(10L)
  64.         }
  65.  
  66.         // 更新緩沖區中的數據
  67.         override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
  68.  
  69.             if (!input.isNullAt(0)) {
  70.                 buffer(0) = buffer.getLong(0) + input.getLong(0)
  71.                 buffer(1) = buffer.getLong(1) + 1
  72.             }
  73.         }
  74.  
  75.         // 合並緩沖區
  76.         override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
  77.             buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
  78.             buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  79.         }
  80.  
  81.         // 計算最終結果
  82.         override def evaluate(buffer: Row): Any = {
  83.             buffer.getLong(0).toDouble / buffer.getLong(1)
  84.         }
  85.     }
  86. }

UDTF(無)

輸入一行,返回多行(這種方式在Hive中有);

SparkSQL中沒有UDTFSpark中用flatMap即可實現該功能

SparkSQL數據的加載與保存

加載數據

1)加載數據通用方法

spark.read.load是加載數據的通用方法

2)代碼實現

  1. package com.zhangjk.sparksql
  2.  
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.sql._
  5. import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
  6. import org.apache.spark.sql.types._
  7.  
  8. object SparkSQL08_Load{
  9.  
  10.     def main(args: Array[String]): Unit = {
  11.  
  12.         // 1 創建上下文環境配置對象
  13.         val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
  14.         // 2 創建SparkSession對象
  15.         val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  16.  
  17.         // 3.1 spark.read直接讀取數據:csv   format   jdbc   json   load   option
  18.         // options   orc   parquet   schema   table   text   textFile
  19.         // 注意:加載數據的相關參數需寫到上述方法中,
  20.         // 如:textFile需傳入加載數據的路徑,jdbc需傳入JDBC相關參數。
  21.         spark.read.json("input/user.json").show()
  22.  
  23.         // 3.2 format指定加載數據類型
  24.         // spark.read.format("…")[.option("…")].load("…")
  25.         // format("…"):指定加載的數據類型,包括"csv""jdbc""json""orc""parquet""textFile"
  26.         // load("…"):在"csv""jdbc""json""orc""parquet""textFile"格式下需要傳入加載數據路徑
  27.         // option("…"):在"jdbc"格式下需要傳入JDBC相應參數,urluserpassworddbtable
  28.         spark.read.format("json").load ("input/user.json").show
  29.  
  30.         // 4 釋放資源
  31.         spark.stop()
  32.     }
  33. }

保存數據

1)保存數據通用方法

df.write.save是保存數據的通用方法

2)代碼實現

  1. package com.zhangjk.sparksql
  2.  
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.sql._
  5.  
  6. object SparkSQL09_Save{
  7.  
  8.   def main(args: Array[String]): Unit = {
  9.  
  10.     // 1 創建上下文環境配置對象
  11.     val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
  12.  
  13.     // 2 創建SparkSession對象
  14.     val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  15.  
  16.     // 3 獲取數據
  17.     val df: DataFrame = spark.read.json("input/user.json")
  18.  
  19.     // 4.1 df.write.保存數據:csv  jdbc   json  orc   parquet textFile… …
  20.     // 注意:保存數據的相關參數需寫到上述方法中。如:textFile需傳入加載數據的路徑,JDBC需傳入JDBC相關參數。
  21.     // 默認保存為parquet文件(可以修改conf.set("spark.sql.sources.default","json")
  22.     df.write.save("output")
  23.  
  24.     // 默認讀取文件parquet
  25.     spark.read.load("output").show()
  26.  
  27.     // 4.2 format指定保存數據類型
  28.     // df.write.format("…")[.option("…")].save("…")
  29.     // format("…"):指定保存的數據類型,包括"csv""jdbc""json""orc""parquet""textFile"
  30.     // save ("…"):在"csv""orc""parquet""textFile"格式下需要傳入保存數據的路徑。
  31.     // option("…"):在"jdbc"格式下需要傳入JDBC相應參數,urluserpassworddbtable
  32.     df.write.format("json").save("output2")
  33.  
  34.     // 4.3 可以指定為保存格式,直接保存,不需要再調用save
  35.     df.write.json("output1")
  36.  
  37.     // 4.4 如果文件已經存在則追加
  38.     df.write.mode("append").json("output2")
  39.  
  40.     // 如果文件已經存在則忽略
  41.     df.write.mode("ignore").json("output2")
  42.  
  43.     // 如果文件已經存在則覆蓋
  44.     df.write.mode("overwrite").json("output2")
  45.  
  46.     // 默認default:如果文件已經存在則拋出異常
  47.     // path file:/E:/ideaProject2/SparkSQLTest/output2 already exists.;
  48.     df.write.mode("error").json("output2")
  49.  
  50.     // 5 釋放資源
  51.     spark.stop()
  52.   }
  53. }
  54.  

與MySQL交互

1)導入依賴

<dependency>

<groupId>mysql</groupId>

<artifactId>mysql-connector-java</artifactId>

<version>5.1.27</version>

</dependency>

2)從MySQL讀數據

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.sql._
  3.  
  4. object SparkSQL10_MySQL_Read{
  5.  
  6.     def main(args: Array[String]): Unit = {
  7.  
  8.         // 1 創建上下文環境配置對象
  9.         val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
  10.  
  11.         // 2 創建SparkSession對象
  12.         val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  13.  
  14.         // 3.1 通用的load方法讀取
  15.         val df: DataFrame = spark.read.format("jdbc")
  16.             .option("url""jdbc:mysql://hadoop102:3306/gmall")
  17.             .option("driver""com.mysql.jdbc.Driver")
  18.             .option("user""root")
  19.             .option("password""000000")
  20.             .option("dbtable""user_info")
  21.             .load()
  22.  
  23.         // 3.2 創建視圖
  24.         df.createOrReplaceTempView("user")
  25.  
  26.         // 3.3 查詢想要的數據
  27.         spark.sql("select id, name from user").show()
  28.  
  29.         // 4 釋放資源
  30.         spark.stop()
  31.     }
  32. }

3)向MySQL寫數據

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.sql._
  4.  
  5. object SparkSQL11_MySQL_Write {
  6.  
  7.     def main(args: Array[String]): Unit = {
  8.  
  9.         // 1 創建上下文環境配置對象
  10.         val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
  11.  
  12.         // 2 創建SparkSession對象
  13.         val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  14.  
  15.         // 3 准備數據
  16.         // 注意:id是主鍵,不能和MySQL數據庫中的id重復
  17.         val rdd: RDD[User] = spark.sparkContext.makeRDD(List(User(3000"zhangsan"), User(3001"lisi")))
  18.  
  19.         val ds: Dataset[User] = rdd.toDS
  20.  
  21.         // 4 MySQL中寫入數據
  22.         ds.write
  23.             .format("jdbc")
  24.             .option("url""jdbc:mysql://hadoop102:3306/gmall")
  25.             .option("user""root")
  26.             .option("password""000000")
  27.             .option("dbtable""user_info")
  28.             .mode(SaveMode.Append)
  29.             .save()
  30.  
  31.         // 5 放資源
  32.         spark.stop()
  33.     }
  34.  
  35.     case class User(id: Int, name: String)
  36. }

與Hive交互

SparkSQL可以采用內嵌Hive,也可以采用外部Hive企業開發中,通常采用外部Hive

內嵌Hive應用

內嵌Hive,元數據存儲在Derby數據庫。

1)如果使用Spark內嵌的Hive,則什么都不用做,直接使用即可。

[bin/spark-shell

 

scala> spark.sql("show tables").show

注意:執行完后,發現多了$SPARK_HOME/metastore_dbderby.log,用於存儲元數據

2)創建一個數據庫

scala> spark.sql("create table user(id int, name string)")

注意:執行完后,發現多了$SPARK_HOME/spark-warehouse/user,用於存儲數據庫數據

3)查看數據庫

scala> spark.sql("show tables").show

4)向表中插入數據

scala> spark.sql("insert into user values(1,'zs')")

5)查詢數據

scala> spark.sql("select * from user").show

注意:然而在實際使用中,幾乎沒有任何人會使用內置的Hive,因為元數據存儲在derby數據庫,不支持多客戶端訪問。

3.4.2 外部Hive應用

如果Spark要接管Hive外部已經部署好的Hive,需要通過以下幾個步驟。

0)為了說明內嵌Hive和外部Hive區別:刪除內嵌Hivemetastore_dbspark-warehouse

rm -rf metastore_db/ spark-warehouse/

1)確定原有Hive是正常工作的

sbin/start-dfs.sh

sbin/start-yarn.sh

bin/hive

2)需要把hive-site.xml拷貝到sparkconf/目錄下

cp hive-site.xml /opt/module/spark-local/conf/

3)如果以前hive-site.xml文件中,配置過Tez相關信息,注釋掉(不是必須)

4)把MySQL的驅動copySparkjars/目錄下

cp mysql-connector-java-5.1.48.jar /opt/module/spark-local/jars/

5)需要提前啟動hive服務,/opt/module/hive/bin/hiveservices.sh start(不是必須)

6)如果訪問不到HDFS,則需把core-site.xmlhdfs-site.xml拷貝到conf/目錄(不是必須)

7)啟動 spark-shell

bin/spark-shell

8)查詢表

scala> spark.sql("show tables").show

9)創建一個數據庫

scala> spark.sql("create table user(id int, name string)")

10)向表中插入數據

scala> spark.sql("insert into user values(1,'zs')")

11)查詢數據

scala> spark.sql("select * from user").show

3.4.3 運行Spark SQL CLI

Spark SQL CLI可以很方便的在本地運行Hive元數據服務以及從命令行執行查詢任務。在Spark目錄下執行如下命令啟動Spark SQL CLI,直接執行SQL語句,類似Hive窗口。

bin/spark-sql

 

spark-sql (default)> show tables;

IDEA操作Hive

1)添加依賴

<dependencies>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.12</artifactId>

<version>3.0.0</version>

</dependency>

 

<dependency>

<groupId>mysql</groupId>

<artifactId>mysql-connector-java</artifactId>

<version>5.1.27</version>

</dependency>

 

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-hive_2.12</artifactId>

<version>3.0.0</version>

</dependency>

</dependencies>

2)拷貝hive-site.xmlresources目錄(如果需要操作Hadoop,需要拷貝hdfs-site.xmlcore-site.xmlyarn-site.xml

3)代碼實現

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.sql._
  3.  
  4. object SparkSQL12_Hive {
  5.  
  6.     def main(args: Array[String]): Unit = {
  7.  
  8.         // 設置訪問用戶名(權限問題)
  9.         System.setProperty("HADOOP_USER_NAME","hadoop")
  10.  
  11.         // 1 創建上下文環境配置對象
  12.         val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
  13.         // 2 創建SparkSession對象
  14.         val spark: SparkSession = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()
  15.  
  16.         import spark.implicits._
  17.  
  18.         // 3 連接外部Hive,並進行操作
  19.         spark.sql("show tables").show()
  20.         spark.sql("create table user3(id int, name string)")
  21.         spark.sql("insert into user3 values(1,'zs')")
  22.         spark.sql("select * from user3").show
  23.  
  24.         // 4 釋放資源
  25.         spark.stop()
  26.     }
  27. }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM