SparkSession新的起点
在老的版本中,SparkSQL提供两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。
SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext或者HiveContext完成的。
spark session的api如下:http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SparkSession
DataFrame基本操作
创建
在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过Spark的数据源进行创建;从一个存在的RDD进行转换;还可以从Hive Table进行查询返回。
1)通过spark的数据源创建
查看SparkSession支持哪些文件格式创建dataframe(在spark shell中,spark.read.+tab)
csv format jdbc json load option options orc parquet schema table text textFile
以json格式为例:
{"name":"zhangsan","age":20}
{"name":"lisi","age":21}
{"name":"wangwu","age":22}
scala> spark.read.json("file:///home/chxy/spark/user.json") res2: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
它可以自动地判断出数据的字段和字段类型
2)从一个存在的RDD中进行转换
注意:如果需要RDD与DF或者DS之间操作,那么都需要引入 import spark.implicits._
(1)手动转换
//首先引入隐式转换
scala> import spark.implicits._ import spark.implicits._
//创建一个RDD
scala> def rdd = spark.sparkContext.makeRDD(List(("zhangsan",21),("lisi",22),("wangwu",23)))
rdd: org.apache.spark.rdd.RDD[(String, Int)]
//手动指定dataframe的数据结构
scala> val dataframe = rdd.toDF("name","age")
dataframe: org.apache.spark.sql.DataFrame = [name: string, age: int]
(2)通过case类来转换
首先创建样例类
scala> case class People(name:String, age:Int) defined class People
将rdd中的数据转换为样例类的实例,rdd中的数据类型变为People
scala> val peopleRdd = rdd.map{ d => {People(d._1,d._2)}} peopleRdd: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[3] at map at
将peopleRdd转换为dataframe,此时无需指定数据结构,spark可以直接将含有case类的RDD转换为DataFrame
scala> val peopleDataframe = peopleRdd.toDF peopleDataframe: org.apache.spark.sql.DataFrame = [name: string, age: int]
将dataframe转换为rdd
scala> peopleDataframe.rdd res3: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[7] at rdd at <console>:32
注意:转换后的数据类型已经不是People,而是Row,也就是行,它无法还原出原来的数据类型。
3)从hive查询的tab中反馈(???)
基本操作
api如下:http://spark.apache.org/docs/1.6.3/api/java/org/apache/spark/sql/DataFrame.html
查看数据
scala> dataframe.show() +--------+---+ | name|age| +--------+---+ |zhangsan| 21| | lisi| 22| | wangwu| 23| +--------+---+
创建临时视图
scala> dataframe.createTempView("user")
从临时视图查询数据
//从临时视图返回的数据会组成一个新的DataFrame
scala> spark.sql("select * from user") res8: org.apache.spark.sql.DataFrame = [name: string, age: int] scala> spark.sql("select * from user").show +--------+---+ | name|age| +--------+---+ |zhangsan| 21| | lisi| 22| | wangwu| 23| +--------+---+
scala> spark.sql("select name from user").show
+--------+
| name|
+--------+
|zhangsan|
| lisi|
| wangwu|
+--------+
创建一个全局临时视图
scala> dataframe.createGlobalTempView("emp")
访问该全局临时视图
scala> spark.sql("select * from global_temp.emp").show +--------+---+ | name|age| +--------+---+ |zhangsan| 21| | lisi| 22| | wangwu| 23| +--------+---+
临时表是Session范围内的,Session退出后,表就失效了。如果想应用范围内有效,可以使用全局表。注意使用全局表时需要全路径访问,如:global_temp.emp
在另一个session范围内访问该视图:
scala> spark.newSession.sql("select * from global_temp.emp").show +--------+---+ | name|age| +--------+---+ |zhangsan| 21| | lisi| 22| | wangwu| 23| +--------+---+
注意:
1)视图一旦定义则不可修改的;
2)session的概念:
广义:连接状态,比如一次通信。
狭义:内存中的一块存储空间
DataSet
Dataset是具有强类型的数据集合,需要提供对应的类型信息。
创建
创建一个样例类
scala> case class People(name:String, age:Int) defined class People
创建DataSet(直接从Seq中创建)
scala> val peopleDataset = Seq(People("zhangsan",20),People("lisi",21),People("wangwu",22)).toDS() peopleDataset: org.apache.spark.sql.Dataset[People] = [name: string, age: int]
RDD转换为DataSet
SparkSQL能够自动将包含有case类的RDD转换成DataSet
直接从peopleRdd开始演示:
scala> peopleRdd res10: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[3] at map at <console>
//RDD中的People case类直接可以映射为DataSet的类型
scala> peopleRdd.toDS
res11: org.apache.spark.sql.Dataset[People] = [name: string, age: int]
DataSet转换成RDD
直接调用rdd方法,而且可以保留RDD的case类的类型
scala> res11.rdd res12: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[9]
DataFrame与DataSet的互转
DataFrame转换成DataSet:DataFrame有结构,但是没有类型,DataSet既有结构也有类型,因此只需要加上类型
scala> peopleDataframe.as[People] res14: org.apache.spark.sql.Dataset[People] = [name: string, age: int]
DataSet转换成DataFrame:同样的道理,只需要忽略类型
scala> peopleDataset.toDF res15: org.apache.spark.sql.DataFrame = [name: string, age: int]
RDD DataFrame,DataSet三者之间的互转总结如下:
重要补充:
1.增删改查,四大sql常用操作,增、删、改是否被dataframel所支持呢?
首先从文件创建一个dataframe,并创建临时视图:
scala> val userDF = spark.read.json("file:///home/chxy/spark/user.json") userDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> userDF.createTempView("userView")
执行插入操作,抛出异常:
scala> spark.sql("insert into userView values('sasa',24)") org.apache.hadoop.fs.ParentNotDirectoryException: Parent path is not a directory: file:/home/chxy/spark/user.json
org.apache.hadoop.fs.ParentNotDirectoryException.这个异常是由hdfs文件系统抛出的。很容易理解,因为hdfs天生不支持文件的插入操作。对于增加和删除操作,因该会得到相同的结果。
执行更新操作,抛出异常:
spark.sql("update userView set name = 'sasa' where id = 1") org.apache.spark.sql.catalyst.parser.ParseException: mismatched input 'update' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(line 1, pos 0)
sparksql不支持update
执行删除操作,抛出异常:
spark.sql("delete from user where age = 20") org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: delete from(line 1, pos 0) == SQL == delete from user where age = 20 ^^^ at org.apache.spark.sql.catalyst.parser.ParserUtils$.operationNotAllowed(ParserUtils.scala:39)
该操作不被允许。
2.关于视图:
视图在driver端是不可见的
scala> userView <console>:24: error: not found: value userView userView ^
如何删除一个视图
spark.sql("drop table userView")
3.关于dataset与dataframe中的算子如何使用
以map算子为例:
package sparksql import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} object Demo1 { def main(args: Array[String]): Unit = { //创建SparkConf()并设置App名称 val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .master("local[*]") .getOrCreate() import spark.implicits._ val raw: RDD[(String, Int)] = spark.sparkContext.makeRDD(List(("zhangsan", 21), ("lisi", 22), ("wangwu", 23))) //创建dataframe val df: DataFrame = raw.toDF("name", "age") df.show() //调用map方法,数据数据类型是:Row(col1,col2...coln) df.map{ case Row(col1:String,col2:Int)=> println(col1);println(col2) col1 case _=> "" }.show()
//同RDD,会生成一个新的DataFrame
spark.stop() } }
dataset:
package sparksql import org.apache.spark.sql.{Dataset, SparkSession} object Demo2 { case class People(name:String, age:Int)//声明case类 def main(args: Array[String]): Unit = { //创建SparkConf()并设置App名称 val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .master("local[*]") .getOrCreate() import spark.implicits._ val peopleDataset = Seq(People("zhangsan",20),People("lisi",21),People("wangwu",22)).toDS()//创建dataset val newDataset: Dataset[String] = peopleDataset.map { case People(name: String, age: Int) => println(name) name case _ => "" } newDataset.show() spark.stop() } }
遇到的坑:
如果把case类的定义放在main方法中,会报错:Seq没有toDS这个方法,参考了这篇博文https://blog.csdn.net/chuyouyinghe/article/details/81189131,将case类的定义转移到了main方法之外。