Spark详解(06) - SparkSQL


Spark详解(06) - SparkSQL

Spark SQL概述

什么是Spark SQL

Spark SQL是Spark用于结构化数据(Structured Data)处理的Spark模块。

(1)半结构化数据(日志数据): 001    zhangsan     18

(2)结构化数据(数据库数据):

id

name

age

001

zhangsan

18

为什么要有Spark SQL

Hive on Spark:Hive既作为存储元数据又负责SQL的解析优化,语法是HQL语法,执行引擎变成了Spark,Spark负责采用RDD执行。

Spark on Hive:Hive只作为存储元数据,Spark负责SQL解析优化,语法是Spark SQL语法,Spark负责采用优化后的RDD执行。

Spark SQL原理

Spark SQL它提供了2个编程抽象,DataFrame、DataSet。(类似Spark Core中的RDD)

什么是DataFrame

1DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。

2DataFrameRDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。

左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。

3Spark SQL性能上比RDD要高。因为Spark SQL了解数据内部结构,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在Stage层面进行简单、通用的流水线优化。

什么是DataSet

DataSet是分布式数据集合。

DataSet是强类型的。比如可以有DataSet[Car]DataSet[User]。具有类型安全检查

DataFrameDataSet的特例type DataFrame = DataSet[Row] Row是一个类型,跟CarUser这些的类型一样,所有的表结构信息都用Row来表示。

RDD、DataFrame和DataSet之间关系

1)发展历史

RDDSpark1.0=DataframeSpark1.3=DatasetSpark1.6

如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。在后期的Spark版本中,DataSet有可能会逐步取代RDDDataFrame成为唯一的API接口

2)三者的共性

1RDDDataFrameDataSet全都是Spark平台下的分布式弹性数据集,为处理超大型数据提供便利

2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action行动算子如foreach时,三者才会开始遍历运算

3)三者有许多共同的函数,如filter,排序等

4)三者都会根据Spark的内存情况自动缓存运算

5)三者都有分区的概念

Spark SQL的特点

1)易整合

无缝的整合了SQL查询和Spark编程。

2)统一的数据访问方式

使用相同的方式连接不同的数据源。

3)兼容Hive

在已有的仓库上直接运行SQL或者HiveQL

4)标准的数据连接

通过JDBC或者ODBC来连接

Spark SQL编程

本章重点学习如何使用DataFrameDataSet进行编程以及他们之间的关系和转换,关于具体的SQL书写不是本章的重点。

SparkSession新的起始点

在老的版本中,SparkSQL提供两种SQL查询起始点:

一个叫SQLContext,用于Spark自己提供的SQL查询;

一个叫HiveContext,用于连接Hive的查询。

SparkSessionSpark最新的SQL查询起始点,实质上是SQLContextHiveContext的组合,所以在SQLContextHiveContext上可用的APISparkSession上同样是可以使用的。

SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。当使用spark-shell的时候,Spark框架会自动的创建一个名称叫做SparkSparkSession,就像以前可以自动获取到一个sc来表示SparkContext

DataFrame

DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。

创建DataFrame

Spark SQLSparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:

通过Spark的数据源进行创建;

从一个存在的RDD进行转换;

还可以从Hive Table进行查询返回。

1)从Spark数据源进行创建

1)数据准备,在/opt/module/spark-local目录下创建一个user.json文件

  1. {"age":20,"name":"qiaofeng"}
  2. {"age":19,"name":"xuzhu"}
  3. {"age":18,"name":"duanyu"}

2)查看Spark支持创建文件的数据源格式,使用tab键查看

scala> spark.read.

csv format jdbc json load option options orc parquet schema table text textFile

3)读取json文件创建DataFrame

scala> val df = spark.read.json("/opt/module/spark-local/user.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

注意:如果从内存中获取数据,Spark可以知道数据类型具体是什么,如果是数字,默认作为Int处理;但是从文件中读取的数字,不能确定是什么类型,所以用BigInt接收,可以和Long类型转换,但是和Int不能进行转换。

4)查看DataFrame算子

scala> df.

5)展示结果

2)从RDD进行转换

参考章节:RDDDataFrame相互转换

3Hive Table进行查询返回

参考章节:与Hive交互

SQL风格语法

SQL语法风格是指查询数据的时候使用SQL语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助。

视图:对特定表的数据的查询结果重复使用View只能查询,不能修改和插入。

select * from t_user where age > 30 的查询结果可以存储在临时表v_user_age中,方便在后面重复使用。例如:select * from v_user_age

1)临时视图

1)创建一个DataFrame

scala> val df = spark.read.json("/opt/module/spark-local/user.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

2)对DataFrame创建一个临时视图

scala> df.createOrReplaceTempView("user")

3)通过SQL语句实现查询全表

scala> val sqlDF = spark.sql("SELECT * FROM user")

sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

4)结果展示

5)求年龄的平均值

scala> val sqlDF = spark.sql("SELECT avg(age) from user")

sqlDF: org.apache.spark.sql.DataFrame = [avg(age): double]

6)结果展示

scala> sqlDF.show

+--------+

|avg(age)|

+--------+

| 19.0|

+--------+

7)创建一个新会话再执行,发现视图找不到

scala> spark.newSession().sql("SELECT avg(age) from user ").show()

org.apache.spark.sql.AnalysisException: Table or view not found: user; line 1 pos 14;

注意:普通临时视图是Session范围内的,如果想全局有效,可以创建全局临时视图。

2)全局视图

1)对于DataFrame创建一个全局视图

scala> df.createGlobalTempView("user")

2)通过SQL语句实现查询全表

scala> spark.sql("SELECT * FROM global_temp.user").show()

+---+--------+

|age| name|

+---+--------+

| 20|qiaofeng|

| 19| xuzhu|

| 18| duanyu|

+---+--------+

3)新建session,通过SQL语句实现查询全表

scala> spark.newSession().sql("SELECT * FROM global_temp.user").show()

+---+--------+

|age| name|

+---+--------+

| 20|qiaofeng|

| 19| xuzhu|

| 18| duanyu|

+---+--------+

DSL风格语法

DataFrame提供一个特定领域语言(domain-specific languageDSL)去管理结构化的数据,可以在ScalaJavaPythonR中使用DSL,使用DSL语法风格不必去创建临时视图了。

1)创建一个DataFrame

scala> val df = spark.read.json("/opt/module/spark-local/user.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

2)查看DataFrameSchema信息

scala> df.printSchema

root

|-- age: Long (nullable = true)

|-- name: string (nullable = true)

3)只查看"name"列数据

scala> df.select("name").show()

+--------+

| name|

+--------+

|qiaofeng|

| xuzhu|

| duanyu|

+--------+

4)查看年龄和姓名,且年龄大于18

scala> df.select("age","name").where("age>18").show

+---+--------+

|age| name|

+---+--------+

| 20|qiaofeng|

| 19| xuzhu|

+---+--------+

5)查看所有列

scala> df.select("*").show

+---+--------+

|age| name|

+---+--------+

| 20|qiaofeng|

| 19| xuzhu|

| 18| duanyu|

+---+--------+

6)查看"name"列数据以及"age+1"数据

注意:涉及到运算的时候,每列都必须使用$,或者采用引号表达式:单引号+字段名

scala> df.select($"name",$"age" + 1).show

scala> df.select('name, 'age + 1).show()

scala> df.select('name, 'age + 1 as "newage").show()

 

+--------+---------+

| name|(age + 1)|

+--------+---------+

|qiaofeng| 21|

| xuzhu| 20|

| duanyu| 19|

+--------+---------+

7)查看"age"大于"19"的数据

scala> df.filter("age>19").show

+---+--------+

|age| name|

+---+--------+

| 20|qiaofeng|

+---+--------+

8)按照"age"分组,查看数据条数

scala> df.groupBy("age").count.show

+---+-----+

|age|count|

+---+-----+

| 19| 1|

| 18| 1|

| 20| 1|

+---+-----+

DataSet

DataSet是具有强类型的数据集合,需要提供对应的类型信息。

创建DataSet(基本类型序列)

使用基本类型的序列创建DataSet

1)将集合转换为DataSet

scala> val ds = Seq(1,2,3,4,5,6).toDS

ds: org.apache.spark.sql.Dataset[Int] = [value: int]

2)查看DataSet的值

scala> ds.show

+-----+

|value|

+-----+

| 1|

| 2|

| 3|

| 4|

| 5|

| 6|

+-----+

创建DataSet(样例类序列)

使用样例类序列创建DataSet

1)创建一个User的样例类

scala> case class User(name: String, age: Long)

defined class User

2)将集合转换为DataSet

scala> val caseClassDS = Seq(User("wangyuyan",2)).toDS()

caseClassDS: org.apache.spark.sql.Dataset[User] = [name: string, age: bigint]

3)查看DataSet的值

scala> caseClassDS.show

+---------+---+

| name|age|

+---------+---+

|wangyuyan| 2|

+---------+---+

注意:在实际使用的时候,很少用到把序列转换成DataSet,更多是通过RDD来得到DataSet

RDD、DataFrame、DataSet相互转换

IDEA创建SparkSQL工程

1)创建一个maven工程SparkSQLTest

2)在项目SparkSQLTest上点击右键,Add Framework Support=》勾选scala

3)在main下创建scala文件夹,并右键Mark Directory as Sources Root=>scala下创建包名为com.zhangjk.sparksql

4)输入文件夹准备:在新建的SparkSQLTest项目名称上右键=》新建input文件夹=》在input文件夹上右键=》新建user.json。并输入如下内容:

{"age":20,"name":"qiaofeng"}

{"age":19,"name":"xuzhu"}

{"age":18,"name":"duanyu"}

5)在pom.xml文件中添加如下依赖

<dependencies>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.12</artifactId>

<version>3.0.0</version>

</dependency>

</dependencies>

6)代码实现

  1. package com.zhangjk.sparksql
  2.  
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.sql.{DataFrame, SparkSession}
  5.  
  6. object SparkSQL01_inpu {
  7.   def main(args: Array[String]): Unit = {
  8.     // 1 创建上下文环境配置对象
  9.     val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
  10.     // 2 创建SparkSession对象
  11.     val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  12.     // 3 读取数据
  13.     val df: DataFrame = spark.read.json("input/user.json")
  14.     // 4 可视化
  15.     df.show()
  16.     // 5 释放资源
  17.     spark.stop()
  18.   }
  19. }

RDDDataFrame相互转换

1RDD转换为DataFrame

方式1:手动转换:RDD.toDF("列名1", "列名2")

方式2:通过样例类反射转换:UserRDD.map{ x=>User(x._1,x._2) }.toDF()

2DataFrame转换为RDD

DataFrame.rdd

3)在input/目录下准备user.txt并输入如下内容:

qiaofeng,20

xuzhu,19

duanyu,18

4)代码实现

  1. package com.zhangjk.sparksql
  2.  
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. import org.apache.spark.sql.{DataFrame, Row, SparkSession}
  6.  
  7. object SparkSQL02_RDDAndDataFrame {
  8.  
  9.     def main(args: Array[String]): Unit = {
  10.  
  11.         //1.创建SparkConf并设置App名称
  12.         val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  13.         //2.创建SparkContext,该对象是提交Spark App的入口
  14.         val sc: SparkContext = new SparkContext(conf)
  15.  
  16.         //3.1 获取数据
  17.         val LineRDD: RDD[String] = sc.textFile("input/user.txt")
  18.         //3.2 RDD准备完成
  19.         val userRDD: RDD[(String, Int)] = LineRDD.map {
  20.             line =>
  21.                 val fields = line.split(",")
  22.                 (fields(0), fields(1).trim.toInt)
  23.         }
  24.  
  25.         //4. 创建SparkSession对象
  26.         val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  27.  
  28.         //5.1 RDDDataFrameDataSet转换必须要导的包
  29.         import spark.implicits._
  30.         //5.2 RDD转换为DataFrame(手动转)
  31.         userRDD.toDF("name","age").show
  32.  
  33.         //5.3 RDD转换为DataFrame(通过样例类反射转)
  34.         val userDataFrame: DataFrame = userRDD.map {
  35.             case (name, age) => User(name, age)
  36.         }.toDF()
  37.         userDataFrame.show()
  38.  
  39.         //5.4 DataFrame 转换为RDD
  40.         val uRDD: RDD[Row] = userDataFrame.rdd
  41.         uRDD.collect().foreach(println)
  42.  
  43.         //6.关闭连接
  44.         sc.stop()
  45.     }
  46. }
  47.  
  48. //样例类
  49. case class User(name:String, age:Long)

RDD与DataSet相互转换

1RDD转换为DataSet

RDD.map { x => User(x._1, x._2) }.toDS()

SparkSQL能够自动将包含有样例类的RDD转换成DataSet,样例类定义了table的结构,样例类属性通过反射变成了表的列名。样例类可以包含诸如Seq或者Array等复杂的结构。

2DataSet转换为RDD

DS.rdd

3)代码实现

  1. package com.zhangjk.sparksql
  2.  
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.sql.{Dataset, SparkSession}
  5. import org.apache.spark.{SparkConf, SparkContext}
  6.  
  7. object SparkSQL03_RDDAndDataSet {
  8.   def main(args: Array[String]): Unit = {
  9.  
  10.     //1.创建SparkConf并设置App名称
  11.     val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  12.     //2.创建SparkContext,该对象是提交Spark App的入口
  13.     val sc: SparkContext = new SparkContext(conf)
  14.  
  15.     //3.1 获取数据
  16.     val lineRDD: RDD[String] = sc.textFile("input/user.txt")
  17.  
  18.     //4. 创建SparkSession对象
  19.     val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  20.  
  21.     //5.1 RDDDataFrameDataSet转换必须要导的包
  22.     import spark.implicits._
  23.     //5.2 RDD转换为DataSet
  24.     val userDataSet: Dataset[User] = lineRDD.map {
  25.       line =>
  26.         val fields = line.split(",")
  27.         User(fields(0), fields(1).toInt)
  28.     }.toDS()
  29.     userDataSet.show()
  30.  
  31.     //5.3 DataSet转换为RDD
  32.     val userRDD: RDD[User] = userDataSet.rdd
  33.     userRDD.collect().foreach(println)
  34.  
  35.     //6.关闭连接
  36.     sc.stop()
  37.   }
  38. }
  39.  
  40. case class User(name:String,age:Long)

DataFrame与DataSet相互转换

1DataFrame转为DataSet

df.as[User]

2DataSet转为DataFrame

    ds.toDF

3)代码实现

  1. package com.zhangjk.sparksql
  2.  
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
  5.  
  6. object SparkSQL04_DataFrameAndDataSet {
  7.  
  8.     def main(args: Array[String]): Unit = {
  9.         // 1 创建上下文环境配置对象
  10.         val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
  11.         // 2 创建SparkSession对象
  12.         val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  13.  
  14.         // 3 读取数据
  15.         val df: DataFrame = spark.read.json("input/user.json")
  16.  
  17.         //4.1 RDDDataFrameDataSet转换必须要导的包
  18.         import spark.implicits._
  19.         // 4.2 DataFrame 转换为DataSet
  20.         val userDataSet: Dataset[User] = df.as[User]
  21.         userDataSet.show()
  22.  
  23.         // 4.3 DataSet转换为DataFrame
  24.         val userDataFrame: DataFrame = userDataSet.toDF()
  25.         userDataFrame.show()
  26.  
  27.         // 5 释放资源
  28.         spark.stop()
  29.     }
  30. }
  31.  
  32. case class User(name: String,age: Long)

用户自定义函数

UDF

1UDF:一行进入,一行出

2)代码实现

  1. package com.zhangjk.sparksql
  2.  
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.sql.{DataFrame, SparkSession}
  5.  
  6. object SparkSQL05_UDF{
  7.  
  8.     def main(args: Array[String]): Unit = {
  9.  
  10.         // 1 创建上下文环境配置对象
  11.         val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
  12.         // 2 创建SparkSession对象
  13.         val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  14.         // 3 读取数据
  15.         val df: DataFrame = spark.read.json("input/user.json")
  16.  
  17.         // 4 创建DataFrame临时视图
  18.         df.createOrReplaceTempView("user")
  19.           
  20.         // 5 注册UDF函数。功能:在数据前添加字符串"Name:"
  21.         spark.udf.register("addName", (x:String) => "Name:"+ x)
  22.  
  23.         // 6 调用自定义UDF函数
  24.         spark.sql("select addName(name) as newName, age from user").show()
  25.  
  26.         // 7 释放资源
  27.         spark.stop()
  28.     }
  29. }

UDAF

1UDAF:输入多行,返回一行。

2Spark3.x推荐使用extends Aggregator自定义UDAF,属于强类型的Dataset方式。

3Spark2.x使用extends UserDefinedAggregateFunction,属于弱类型的DataFrame

4)需求:实现求平均年龄

1)自定义聚合函数实现-强类型

  1. package com.zhangjk.sparksql
  2.  
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.sql.expressions.Aggregator
  5. import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession, functions}
  6.  
  7. object SparkSQL06_UDAF {
  8.  
  9.     def main(args: Array[String]): Unit = {
  10.  
  11.         // 1 创建上下文环境配置对象
  12.         val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
  13.  
  14.         // 2 创建SparkSession对象
  15.         val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  16.  
  17.         // 3 读取数据
  18.         val df: DataFrame = spark.read.json("input/user.json")
  19.  
  20.         // 4 创建DataFrame临时视图
  21.         df.createOrReplaceTempView("user")
  22.           
  23.         // 5 注册UDAF
  24.         spark.udf.register("myAvg", functions.udaf(new MyAvgUDAF()))
  25.  
  26.         // 6 调用自定义UDAF函数
  27.         spark.sql("select myAvg(age) from user").show()
  28.  
  29.         // 7 释放资源
  30.         spark.stop()
  31.     }
  32. }
  33.  
  34. //输入数据类型
  35. case class Buff(var sum: Longvar count: Long)
  36.  
  37. /**
  38.  * 1,20岁; 2,19岁; 3,18
  39.  * IN:聚合函数的输入类型:Long
  40.  * BUF
  41.  * OUT:聚合函数的输出类型:Double  (18+19+20) / 3
  42.  */
  43. class MyAvgUDAF extends Aggregator[LongBuffDouble] {
  44.  
  45.     // 初始化缓冲区
  46.     override def zero: Buff = Buff(0L0L)
  47.  
  48.     // 将输入的年龄和缓冲区的数据进行聚合
  49.     override def reduce(buff: Buff, age: Long): Buff = {
  50.         buff.sum = buff.sum + age
  51.         buff.count = buff.count + 1
  52.         buff
  53.     }
  54.  
  55.     // 多个缓冲区数据合并
  56.     override def merge(buff1: Buff, buff2: Buff): Buff = {
  57.         buff1.sum = buff1.sum + buff2.sum
  58.         buff1.count = buff1.count + buff2.count
  59.         buff1
  60.     }
  61.  
  62.     // 完成聚合操作,获取最终结果
  63.     override def finish(buff: Buff): Double = {
  64.         buff.sum.toDouble / buff.count
  65.     }
  66.  
  67.     // SparkSQL对传递的对象的序列化操作(编码)
  68.     // 自定义类型就是product   自带类型根据类型选择
  69.     override def bufferEncoder: Encoder[Buff] = Encoders.product
  70.  
  71.     override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
  72. }

2)自定义聚合函数实现-弱类型(已过时--了解)

  1. package com.zhangjk.sparksql
  2.  
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
  5. import org.apache.spark.sql._
  6. import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StructField, StructType}
  7.  
  8. object SparkSQL07_UDAF{
  9.  
  10.     def main(args: Array[String]): Unit = {
  11.  
  12.         // 1 创建上下文环境配置对象
  13.         val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
  14.  
  15.         // 2 创建SparkSession对象
  16.         val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  17.  
  18.         // 3 读取数据
  19.         val df: DataFrame = spark.read.json("input/user.json")
  20.  
  21.         // 4 注册UDAF
  22.         spark.udf.register("myAvg",new MyAvgUDAF())
  23.  
  24.         // 5 创建DataFrame临时视图
  25.         df.createOrReplaceTempView("user")
  26.  
  27.         // 6 调用自定义UDAF函数
  28.         spark.sql("select myAvg(age) from user").show()
  29.  
  30.         // 7 释放资源
  31.         spark.stop()
  32.     }
  33.  
  34.     class MyAvgUDAF extends UserDefinedAggregateFunction {
  35.  
  36.         // 聚合函数输入参数的数据类型:age(Long)
  37.         override def inputSchema: StructType = {
  38.             StructType(Array(
  39.                 StructField("age",LongType)
  40.             ))
  41.         }
  42.  
  43.         // 聚合函数缓冲区中值的数据类型(age,count)
  44.         override def bufferSchema: StructType = {
  45.             StructType(Array(
  46.                 StructField("sum",LongType),
  47.                 StructField("count",LongType)
  48.             ))
  49.         }
  50.  
  51.         // 函数返回值的数据类型
  52.         override def dataType: DataType = DoubleType
  53.  
  54.         // 稳定性:对于相同的输入是否一直返回相同的输出。
  55.         override def deterministic: Boolean = true
  56.  
  57.         // 函数缓冲区初始化
  58.         override def initialize(buffer: MutableAggregationBuffer): Unit = {
  59.             // 存年龄的总和
  60.             buffer.update(00L)
  61.  
  62.             // 存年龄的个数
  63.             buffer.update(10L)
  64.         }
  65.  
  66.         // 更新缓冲区中的数据
  67.         override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
  68.  
  69.             if (!input.isNullAt(0)) {
  70.                 buffer(0) = buffer.getLong(0) + input.getLong(0)
  71.                 buffer(1) = buffer.getLong(1) + 1
  72.             }
  73.         }
  74.  
  75.         // 合并缓冲区
  76.         override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
  77.             buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
  78.             buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  79.         }
  80.  
  81.         // 计算最终结果
  82.         override def evaluate(buffer: Row): Any = {
  83.             buffer.getLong(0).toDouble / buffer.getLong(1)
  84.         }
  85.     }
  86. }

UDTF(无)

输入一行,返回多行(这种方式在Hive中有);

SparkSQL中没有UDTFSpark中用flatMap即可实现该功能

SparkSQL数据的加载与保存

加载数据

1)加载数据通用方法

spark.read.load是加载数据的通用方法

2)代码实现

  1. package com.zhangjk.sparksql
  2.  
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.sql._
  5. import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
  6. import org.apache.spark.sql.types._
  7.  
  8. object SparkSQL08_Load{
  9.  
  10.     def main(args: Array[String]): Unit = {
  11.  
  12.         // 1 创建上下文环境配置对象
  13.         val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
  14.         // 2 创建SparkSession对象
  15.         val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  16.  
  17.         // 3.1 spark.read直接读取数据:csv   format   jdbc   json   load   option
  18.         // options   orc   parquet   schema   table   text   textFile
  19.         // 注意:加载数据的相关参数需写到上述方法中,
  20.         // 如:textFile需传入加载数据的路径,jdbc需传入JDBC相关参数。
  21.         spark.read.json("input/user.json").show()
  22.  
  23.         // 3.2 format指定加载数据类型
  24.         // spark.read.format("…")[.option("…")].load("…")
  25.         // format("…"):指定加载的数据类型,包括"csv""jdbc""json""orc""parquet""textFile"
  26.         // load("…"):在"csv""jdbc""json""orc""parquet""textFile"格式下需要传入加载数据路径
  27.         // option("…"):在"jdbc"格式下需要传入JDBC相应参数,urluserpassworddbtable
  28.         spark.read.format("json").load ("input/user.json").show
  29.  
  30.         // 4 释放资源
  31.         spark.stop()
  32.     }
  33. }

保存数据

1)保存数据通用方法

df.write.save是保存数据的通用方法

2)代码实现

  1. package com.zhangjk.sparksql
  2.  
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.sql._
  5.  
  6. object SparkSQL09_Save{
  7.  
  8.   def main(args: Array[String]): Unit = {
  9.  
  10.     // 1 创建上下文环境配置对象
  11.     val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
  12.  
  13.     // 2 创建SparkSession对象
  14.     val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  15.  
  16.     // 3 获取数据
  17.     val df: DataFrame = spark.read.json("input/user.json")
  18.  
  19.     // 4.1 df.write.保存数据:csv  jdbc   json  orc   parquet textFile… …
  20.     // 注意:保存数据的相关参数需写到上述方法中。如:textFile需传入加载数据的路径,JDBC需传入JDBC相关参数。
  21.     // 默认保存为parquet文件(可以修改conf.set("spark.sql.sources.default","json")
  22.     df.write.save("output")
  23.  
  24.     // 默认读取文件parquet
  25.     spark.read.load("output").show()
  26.  
  27.     // 4.2 format指定保存数据类型
  28.     // df.write.format("…")[.option("…")].save("…")
  29.     // format("…"):指定保存的数据类型,包括"csv""jdbc""json""orc""parquet""textFile"
  30.     // save ("…"):在"csv""orc""parquet""textFile"格式下需要传入保存数据的路径。
  31.     // option("…"):在"jdbc"格式下需要传入JDBC相应参数,urluserpassworddbtable
  32.     df.write.format("json").save("output2")
  33.  
  34.     // 4.3 可以指定为保存格式,直接保存,不需要再调用save
  35.     df.write.json("output1")
  36.  
  37.     // 4.4 如果文件已经存在则追加
  38.     df.write.mode("append").json("output2")
  39.  
  40.     // 如果文件已经存在则忽略
  41.     df.write.mode("ignore").json("output2")
  42.  
  43.     // 如果文件已经存在则覆盖
  44.     df.write.mode("overwrite").json("output2")
  45.  
  46.     // 默认default:如果文件已经存在则抛出异常
  47.     // path file:/E:/ideaProject2/SparkSQLTest/output2 already exists.;
  48.     df.write.mode("error").json("output2")
  49.  
  50.     // 5 释放资源
  51.     spark.stop()
  52.   }
  53. }
  54.  

与MySQL交互

1)导入依赖

<dependency>

<groupId>mysql</groupId>

<artifactId>mysql-connector-java</artifactId>

<version>5.1.27</version>

</dependency>

2)从MySQL读数据

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.sql._
  3.  
  4. object SparkSQL10_MySQL_Read{
  5.  
  6.     def main(args: Array[String]): Unit = {
  7.  
  8.         // 1 创建上下文环境配置对象
  9.         val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
  10.  
  11.         // 2 创建SparkSession对象
  12.         val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  13.  
  14.         // 3.1 通用的load方法读取
  15.         val df: DataFrame = spark.read.format("jdbc")
  16.             .option("url""jdbc:mysql://hadoop102:3306/gmall")
  17.             .option("driver""com.mysql.jdbc.Driver")
  18.             .option("user""root")
  19.             .option("password""000000")
  20.             .option("dbtable""user_info")
  21.             .load()
  22.  
  23.         // 3.2 创建视图
  24.         df.createOrReplaceTempView("user")
  25.  
  26.         // 3.3 查询想要的数据
  27.         spark.sql("select id, name from user").show()
  28.  
  29.         // 4 释放资源
  30.         spark.stop()
  31.     }
  32. }

3)向MySQL写数据

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.sql._
  4.  
  5. object SparkSQL11_MySQL_Write {
  6.  
  7.     def main(args: Array[String]): Unit = {
  8.  
  9.         // 1 创建上下文环境配置对象
  10.         val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
  11.  
  12.         // 2 创建SparkSession对象
  13.         val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  14.  
  15.         // 3 准备数据
  16.         // 注意:id是主键,不能和MySQL数据库中的id重复
  17.         val rdd: RDD[User] = spark.sparkContext.makeRDD(List(User(3000"zhangsan"), User(3001"lisi")))
  18.  
  19.         val ds: Dataset[User] = rdd.toDS
  20.  
  21.         // 4 MySQL中写入数据
  22.         ds.write
  23.             .format("jdbc")
  24.             .option("url""jdbc:mysql://hadoop102:3306/gmall")
  25.             .option("user""root")
  26.             .option("password""000000")
  27.             .option("dbtable""user_info")
  28.             .mode(SaveMode.Append)
  29.             .save()
  30.  
  31.         // 5 放资源
  32.         spark.stop()
  33.     }
  34.  
  35.     case class User(id: Int, name: String)
  36. }

与Hive交互

SparkSQL可以采用内嵌Hive,也可以采用外部Hive企业开发中,通常采用外部Hive

内嵌Hive应用

内嵌Hive,元数据存储在Derby数据库。

1)如果使用Spark内嵌的Hive,则什么都不用做,直接使用即可。

[bin/spark-shell

 

scala> spark.sql("show tables").show

注意:执行完后,发现多了$SPARK_HOME/metastore_dbderby.log,用于存储元数据

2)创建一个数据库

scala> spark.sql("create table user(id int, name string)")

注意:执行完后,发现多了$SPARK_HOME/spark-warehouse/user,用于存储数据库数据

3)查看数据库

scala> spark.sql("show tables").show

4)向表中插入数据

scala> spark.sql("insert into user values(1,'zs')")

5)查询数据

scala> spark.sql("select * from user").show

注意:然而在实际使用中,几乎没有任何人会使用内置的Hive,因为元数据存储在derby数据库,不支持多客户端访问。

3.4.2 外部Hive应用

如果Spark要接管Hive外部已经部署好的Hive,需要通过以下几个步骤。

0)为了说明内嵌Hive和外部Hive区别:删除内嵌Hivemetastore_dbspark-warehouse

rm -rf metastore_db/ spark-warehouse/

1)确定原有Hive是正常工作的

sbin/start-dfs.sh

sbin/start-yarn.sh

bin/hive

2)需要把hive-site.xml拷贝到sparkconf/目录下

cp hive-site.xml /opt/module/spark-local/conf/

3)如果以前hive-site.xml文件中,配置过Tez相关信息,注释掉(不是必须)

4)把MySQL的驱动copySparkjars/目录下

cp mysql-connector-java-5.1.48.jar /opt/module/spark-local/jars/

5)需要提前启动hive服务,/opt/module/hive/bin/hiveservices.sh start(不是必须)

6)如果访问不到HDFS,则需把core-site.xmlhdfs-site.xml拷贝到conf/目录(不是必须)

7)启动 spark-shell

bin/spark-shell

8)查询表

scala> spark.sql("show tables").show

9)创建一个数据库

scala> spark.sql("create table user(id int, name string)")

10)向表中插入数据

scala> spark.sql("insert into user values(1,'zs')")

11)查询数据

scala> spark.sql("select * from user").show

3.4.3 运行Spark SQL CLI

Spark SQL CLI可以很方便的在本地运行Hive元数据服务以及从命令行执行查询任务。在Spark目录下执行如下命令启动Spark SQL CLI,直接执行SQL语句,类似Hive窗口。

bin/spark-sql

 

spark-sql (default)> show tables;

IDEA操作Hive

1)添加依赖

<dependencies>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.12</artifactId>

<version>3.0.0</version>

</dependency>

 

<dependency>

<groupId>mysql</groupId>

<artifactId>mysql-connector-java</artifactId>

<version>5.1.27</version>

</dependency>

 

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-hive_2.12</artifactId>

<version>3.0.0</version>

</dependency>

</dependencies>

2)拷贝hive-site.xmlresources目录(如果需要操作Hadoop,需要拷贝hdfs-site.xmlcore-site.xmlyarn-site.xml

3)代码实现

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.sql._
  3.  
  4. object SparkSQL12_Hive {
  5.  
  6.     def main(args: Array[String]): Unit = {
  7.  
  8.         // 设置访问用户名(权限问题)
  9.         System.setProperty("HADOOP_USER_NAME","hadoop")
  10.  
  11.         // 1 创建上下文环境配置对象
  12.         val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
  13.         // 2 创建SparkSession对象
  14.         val spark: SparkSession = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()
  15.  
  16.         import spark.implicits._
  17.  
  18.         // 3 连接外部Hive,并进行操作
  19.         spark.sql("show tables").show()
  20.         spark.sql("create table user3(id int, name string)")
  21.         spark.sql("insert into user3 values(1,'zs')")
  22.         spark.sql("select * from user3").show
  23.  
  24.         // 4 释放资源
  25.         spark.stop()
  26.     }
  27. }

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM