SparkSQL(二)——基本操作


SparkSession新的起點

在老的版本中,SparkSQL提供兩種SQL查詢起始點:一個叫SQLContext,用於Spark自己提供的SQL查詢;一個叫HiveContext,用於連接Hive的查詢。

SparkSession是Spark最新的SQL查詢起始點,實質上是SQLContext和HiveContext的組合,所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內部封裝了sparkContext,所以計算實際上是由sparkContext或者HiveContext完成的。

 

spark session的api如下:http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SparkSession

 

 

DataFrame基本操作

 

創建

Spark SQL中SparkSession是創建DataFrame和執行SQL的入口,創建DataFrame有三種方式:通過Spark的數據源進行創建;從一個存在的RDD進行轉換;還可以從Hive Table進行查詢返回。

1)通過spark的數據源創建

查看SparkSession支持哪些文件格式創建dataframe(在spark shell中,spark.read.+tab)

csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile

以json格式為例:

{"name":"zhangsan","age":20}

{"name":"lisi","age":21}

 

{"name":"wangwu","age":22}

scala> spark.read.json("file:///home/chxy/spark/user.json")
res2: org.apache.spark.sql.DataFrame = [age: bigint, name: string]              

它可以自動地判斷出數據的字段和字段類型 

 

2)從一個存在的RDD中進行轉換

注意:如果需要RDD與DF或者DS之間操作,那么都需要引入 import spark.implicits._

(1)手動轉換 

//首先引入隱式轉換
scala> import spark.implicits._ import spark.implicits._

//創建一個RDD
scala> def rdd = spark.sparkContext.makeRDD(List(("zhangsan",21),("lisi",22),("wangwu",23)))
rdd: org.apache.spark.rdd.RDD[(String, Int)]

//手動指定dataframe的數據結構
scala> val dataframe = rdd.toDF("name","age")
dataframe: org.apache.spark.sql.DataFrame = [name: string, age: int]

(2)通過case類來轉換

首先創建樣例類

scala>  case class People(name:String, age:Int)
defined class People

將rdd中的數據轉換為樣例類的實例,rdd中的數據類型變為People

scala> val peopleRdd = rdd.map{ d => {People(d._1,d._2)}}
peopleRdd: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[3] at map at 

將peopleRdd轉換為dataframe,此時無需指定數據結構,spark可以直接將含有case類的RDD轉換為DataFrame

scala> val peopleDataframe = peopleRdd.toDF
peopleDataframe: org.apache.spark.sql.DataFrame = [name: string, age: int]

 

 

將dataframe轉換為rdd

scala> peopleDataframe.rdd
res3: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[7] at rdd at <console>:32

注意:轉換后的數據類型已經不是People,而是Row,也就是行,它無法還原出原來的數據類型。

 

3)從hive查詢的tab中反饋(???)

 

基本操作

api如下:http://spark.apache.org/docs/1.6.3/api/java/org/apache/spark/sql/DataFrame.html

查看數據 

scala> dataframe.show()
+--------+---+
|    name|age|
+--------+---+
|zhangsan| 21|
|    lisi| 22|
|  wangwu| 23|
+--------+---+

創建臨時視圖

scala> dataframe.createTempView("user")

從臨時視圖查詢數據

//從臨時視圖返回的數據會組成一個新的DataFrame
scala> spark.sql("select * from user") res8: org.apache.spark.sql.DataFrame = [name: string, age: int] scala> spark.sql("select * from user").show +--------+---+ | name|age| +--------+---+ |zhangsan| 21| | lisi| 22| | wangwu| 23| +--------+---+

scala> spark.sql("select name from user").show
+--------+
|    name|
+--------+
|zhangsan|
|    lisi|
|  wangwu|
+--------+

創建一個全局臨時視圖

scala> dataframe.createGlobalTempView("emp")

訪問該全局臨時視圖

scala> spark.sql("select * from global_temp.emp").show
+--------+---+
|    name|age|
+--------+---+
|zhangsan| 21|
|    lisi| 22|
|  wangwu| 23|
+--------+---+

 

臨時表是Session范圍內的,Session退出后,表就失效了。如果想應用范圍內有效,可以使用全局表。注意使用全局表時需要全路徑訪問,如:global_temp.emp

在另一個session范圍內訪問該視圖:

scala> spark.newSession.sql("select * from global_temp.emp").show
+--------+---+
|    name|age|
+--------+---+
|zhangsan| 21|
|    lisi| 22|
|  wangwu| 23|
+--------+---+

 

 

注意:

1)視圖一旦定義則不可修改的;

2)session的概念:

廣義:連接狀態,比如一次通信。

狹義:內存中的一塊存儲空間

 

 

 

 

DataSet

Dataset是具有強類型的數據集合,需要提供對應的類型信息

 

 

創建

創建一個樣例類

scala>  case class People(name:String, age:Int)
defined class People

創建DataSet(直接從Seq中創建)

scala> val peopleDataset = Seq(People("zhangsan",20),People("lisi",21),People("wangwu",22)).toDS()
peopleDataset: org.apache.spark.sql.Dataset[People] = [name: string, age: int]

 

RDD轉換為DataSet

SparkSQL能夠自動將包含有case類的RDD轉換成DataSet

直接從peopleRdd開始演示:

scala> peopleRdd
res10: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[3] at map at <console>

//RDD中的People case類直接可以映射為DataSet的類型
scala> peopleRdd.toDS
res11: org.apache.spark.sql.Dataset[People] = [name: string, age: int]

 

DataSet轉換成RDD

直接調用rdd方法,而且可以保留RDD的case類的類型

scala> res11.rdd
res12: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[9]

 

DataFrame與DataSet的互轉

DataFrame轉換成DataSet:DataFrame有結構,但是沒有類型,DataSet既有結構也有類型,因此只需要加上類型

scala> peopleDataframe.as[People]
res14: org.apache.spark.sql.Dataset[People] = [name: string, age: int]

DataSet轉換成DataFrame:同樣的道理,只需要忽略類型

scala> peopleDataset.toDF
res15: org.apache.spark.sql.DataFrame = [name: string, age: int]

 

RDD DataFrame,DataSet三者之間的互轉總結如下:

 

重要補充:

1.增刪改查,四大sql常用操作,增、刪、改是否被dataframel所支持呢?

首先從文件創建一個dataframe,並創建臨時視圖:

scala> val userDF = spark.read.json("file:///home/chxy/spark/user.json")
userDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]            

scala> userDF.createTempView("userView")

執行插入操作,拋出異常:

scala> spark.sql("insert into userView values('sasa',24)")
org.apache.hadoop.fs.ParentNotDirectoryException: Parent path is not a directory: file:/home/chxy/spark/user.json

org.apache.hadoop.fs.ParentNotDirectoryException.這個異常是由hdfs文件系統拋出的。很容易理解,因為hdfs天生不支持文件的插入操作。對於增加和刪除操作,因該會得到相同的結果。

執行更新操作,拋出異常:

spark.sql("update userView set name = 'sasa' where id = 1")
org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'update' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(line 1, pos 0)

sparksql不支持update

執行刪除操作,拋出異常:

 spark.sql("delete from user where age = 20")
org.apache.spark.sql.catalyst.parser.ParseException:
Operation not allowed: delete from(line 1, pos 0)

== SQL ==
delete from user where age = 20
^^^

  at org.apache.spark.sql.catalyst.parser.ParserUtils$.operationNotAllowed(ParserUtils.scala:39)

該操作不被允許。

 

2.關於視圖:

視圖在driver端是不可見的

scala> userView
<console>:24: error: not found: value userView
       userView
       ^

如何刪除一個視圖

spark.sql("drop table userView")

 

3.關於dataset與dataframe中的算子如何使用

以map算子為例:

package sparksql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}


object Demo1 {

  def main(args: Array[String]): Unit = {
    //創建SparkConf()並設置App名稱
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    val raw: RDD[(String, Int)] = spark.sparkContext.makeRDD(List(("zhangsan", 21), ("lisi", 22), ("wangwu", 23)))
//創建dataframe
    val df: DataFrame = raw.toDF("name", "age")

    df.show()
//調用map方法,數據數據類型是:Row(col1,col2...coln)
    df.map{
      case Row(col1:String,col2:Int)=>
        println(col1);println(col2)
        col1
      case _=>
        ""
    }.show()

//同RDD,會生成一個新的DataFrame
spark.stop() } }

 

 

dataset:

package sparksql

import org.apache.spark.sql.{Dataset, SparkSession}


object Demo2 {

  case class People(name:String, age:Int)//聲明case類

  def main(args: Array[String]): Unit = {
    //創建SparkConf()並設置App名稱
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    val peopleDataset = Seq(People("zhangsan",20),People("lisi",21),People("wangwu",22)).toDS()//創建dataset

    val newDataset: Dataset[String] = peopleDataset.map {
      case People(name: String, age: Int) =>
        println(name)
        name
      case _ =>
        ""
    }

    newDataset.show()

    spark.stop()
  }
}

 

 

遇到的坑:

如果把case類的定義放在main方法中,會報錯:Seq沒有toDS這個方法,參考了這篇博文https://blog.csdn.net/chuyouyinghe/article/details/81189131,將case類的定義轉移到了main方法之外。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM