SparkSQL


Spark SQL

Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了2個編程抽象:DataFrame和DataSet,並且作為分布式SQL查詢引擎的作用。

Hive SQL是轉換成MapReduce然后提交到集群上執行,大大簡化了編寫MapReduc的程序的復雜性,由於MapReduce這種計算模型執行效率比較慢。所有Spark SQL的應運而生,它是將Spark SQL轉換成RDD,然后提交到集群執行,執行效率非常快!

 SparkSession

在spark2.0中,引入SparkSession(作為DataSet和DataFrame API的切入點)作為Spark最新的SQL查詢起始點,實質上是SQLContext和HiveContext的組合(未來可能還會加上StreamingContext); 為了向后兼容,所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。

SparkSession內部封裝了sparkContext、SparkConf、SQLContext,所以計算實際上是由sparkContext完成的。

 ---- 為用戶提供一個統一的切入點使用Spark 各項功能

 ---- 允許用戶通過它調用 DataFrame 和 Dataset 相關 API 來編寫程序

 --- 與 Spark 交互之時不需要顯示的創建 SparkConf, SparkContext 以及 SQlContext,這些對象已經封閉在 SparkSession 中

DataFrame

在Spark中,DataFrame是一種以RDD為基礎的分布式數據集,類似於傳統數據庫中的二維表格。DataFrame與RDD的主要區別在於,前者帶有schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結構信息,從而對藏於DataFrame背后的數據源以及作用於DataFrame之上的變換進行了針對性的優化,最終達到大幅提升運行時效率的目標。反觀RDD,由於無從得知所存數據元素的具體內部結構,Spark Core只能在stage層面進行簡單、通用的流水線優化。

           

創建

在Spark SQL中SparkSession是創建DataFrame和執行SQL的入口,創建DataFrame有三種方式:

  通過Spark的數據源進行創建;從一個存在的RDD進行轉換;還可以從Hive Table進行查詢返回。

讀取json文件創建DataFrame

spark讀取json按行讀取;只要一行符合json的格式即可;
scala> val rdd = spark.read.json("/opt/module/spark/spark-local/examples/src/main/resources/people.json") rdd: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> rdd.show +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
① SQL風格語法
##轉化成sql去執行

scala> rdd.createTempView("user") //view是table的查詢結果,只能查不能改 scala> spark.sql("select * from user").show +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ scala> spark.sql("select * from user where age is not null").show +---+------+ |age| name| +---+------+ | 30| Andy| | 19|Justin| +---+------+

注意:普通臨時view是Session范圍內的,如果想應用范圍內有效,可以使用全局臨時表。使用全局臨時表時需要全路徑訪問,如:global_temp.people

scala> rdd.createGlobalTempView("emp")  //提升為全局
scala> spark.sql("select * from user where age is not null").show
+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+

scala> spark.sql("select * from emp where age is not null").show    //sql默認從當前session中查找,所以查詢時需要加上global_temp
org.apache.spark.sql.AnalysisException: Table or view not found: emp; line 1 pos 14
scala> spark.sql("select * from global_temp.emp where age is not null").show
+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+

 ② 以面向對象方式訪問;DSL風格語法 模仿面向對象的方式

scala> rdd.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)


scala> rdd.select("age").show
+----+
| age|
+----+
|null|
|  30|
|  19|
+----+


scala> rdd.select($"age"+1).show
+---------+
|(age + 1)|
+---------+
|     null|
|       31|
|       20|
+---------+

代碼方式:

方式一:通過 case class 創建 DataFrames(反射)

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
object TestSparkSql {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkSql").setMaster("local[*]")
    val sc: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
    // 將本地的數據讀入 RDD, 並將 RDD 與 case class 關聯
    val peopleRdd = sc.sparkContext.textFile("file:\\F:\\Input\\people.txt")
      .map(line => People(line.split(",")(0),line.split(",")(1).trim.toInt))
    import sc.implicits._
    // 將RDD 轉換成 DataFrames
    val df: DataFrame = peopleRdd.toDF
    //將DataFrames創建成一個臨時的視圖
    df.createOrReplaceTempView("people")
    sc.sql("select * from people").show() //使用SQL語句進行查詢
    sc.stop()
  }
}
//定義case class,相當於表結構
case class People(var name: String, var age: Int)

說明:

① textFile默認是從hdfs讀取文件; 本地文件讀取 sc.textFile("路徑"),在路徑前面加上file:// 表示從本地文件系統讀

② textFile可直接讀取多個文件夾(嵌套)下的多個數據文件,如上邊路徑可寫成  "file:\\F:\\Input"  讀取這個目錄下多個文件

方式二:通過 structType 創建 DataFrames(編程接口),測試代碼如下

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
object TestSparkSql {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkSql").setMaster("local[*]")
    val sc: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
    // 將本地的數據讀入 RDD
    val peopleRdd = sc.sparkContext.textFile("file:\\F:\\Input")
    // 將 RDD 數據映射成 Row,需要 import org.apache.spark.sql.Row
    import org.apache.spark.sql.Row
    val rowRDD: RDD[Row] = peopleRdd.map(line => {
      val fields = line.split(",")
      Row(fields(0), fields(1).trim.toInt)
    })
    val structType: StructType = StructType(
      //字段名,字段類型,是否可以為空
      StructField("name", StringType, true) ::
        StructField("age", IntegerType, true) :: Nil
    )
    //將DataFrames創建成一個臨時的視圖
    val df: DataFrame = sc.createDataFrame(rowRDD,structType)
    df.createTempView("people")
    sc.sql("select * from people").show() //使用SQL語句進行查詢
    sc.stop()
  }
}
View Code

                      

 方式三:讀取json文件

people.json 必須是在一行:

{"name":"swenna","age":18}
{"name": "kk","age":20}
//讀取json數據
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
object TestSparkSql {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkSql").setMaster("local[*]")
    val sc: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
    // 將本地的數據讀入 RDD
    val df: DataFrame = sc.read.json("file:\\F:\\Input\\people.json")

    //將DataFrames創建成一個臨時的視圖
    df.createOrReplaceTempView("people")
    sc.sql("select * from people").show() //使用SQL語句進行查詢
    sc.stop()
  }
}

               

RDD轉成DF

注意:如果需要RDD與DF或者DS之間操作,那么都需要引入 import spark.implicits._  【spark不是包名,而是sparkSession對象的名稱】

前置條件:導入隱式轉換並創建一個RDD

scala> import spark.implicits._  spark對象中的隱式轉換規則,而不是導入包名
import spark.implicits._
scala
> val df = rdd.toDF("id", "name") df: org.apache.spark.sql.DataFrame = [id: bigint, name: string] scala> df.show +----+-------+ | id| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ scala> df.createTempView("Student") scala> spark.sql("select * from student").show

 

scala> val x = sc.makeRDD(List(("a",1), ("b",4), ("c", 3)))

scala> x.collect
res36: Array[(String, Int)] = Array((a,1), (b,4), (c,3))

scala> x.toDF("name", "count")
res37: org.apache.spark.sql.DataFrame = [name: string, count: int]

scala> val y = x.toDF("name", "count")
y: org.apache.spark.sql.DataFrame = [name: string, count: int]

scala> y.show
+----+-----+
|name|count|
+----+-----+
| a | 1|
| b | 4|
| c | 3|
+----+-----+

DF--->RDD  直接調用rdd即可

scala> y.rdd.collect
res46: Array[org.apache.spark.sql.Row] = Array([a,1], [b,4], [c,3])
scala> df.rdd.collect
res49: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])

RDD轉換為DataSet

SparkSQL能夠自動將包含有case類的RDD轉換成DataFrame,case類定義了table的結構,case類屬性通過反射變成了表的列名。Case類可以包含諸如Seqs或者Array等復雜的結構。     DataSet是具有強類型的數據集合,需要提供對應的類型信息。

scala> case class People(age: BigInt, name: String)
defined class People
scala> rdd.collect
res77: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])
scala> val ds = rdd.as[People]
ds: org.apache.spark.sql.Dataset[People] = [age: bigint, name: string]
scala> ds.collect
res31: Array[People] = Array(People(null,Michael), People(30,Andy), People(19,Justin))

 

scala> case class Person(name: String, age: Long)
defined class Person

scala> val caseclassDS = Seq(Person("kris", 20)).toDS()
caseclassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

scala> caseclassDS.show
+----+---+
|name|age|
+----+---+
|kris| 20|
+----+---+
scala> caseclassDS.collect
res51: Array[Person] = Array(Person(kris,20))

通過textFile方法創建rdd並轉DS

scala> val textFileRDD = sc.textFile("/opt/module/spark/spark-local/examples/src/main/resources/people.txt")
scala> textFileRDD.collect
res78: Array[String] = Array(Michael, 29, Andy, 30, Justin, 19)
scala> case class Person(name: String, age: Long)
defined class Person

scala> textFileRDD.map(x=>{val rddMap = x.split(","); Person(rddMap(0), rddMap(1).trim.toInt)}).toDS
res80: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

DS ----> RDD 調用rdd方法即可

scala> val DS = Seq(Person("Andy", 32)).toDS()  用這種方式可創建一個DataSet
DS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

scala> ds.collect
res76: Array[People] = Array(People(null,Michael), People(30,Andy), People(19,Justin))
scala> ds.rdd.collect
res75: Array[People] = Array(People(null,Michael), People(30,Andy), People(19,Justin))

DF ---> DS

spark.read.json(“ path ”)即是DataFrame類型; 

scala> df.collect
res72: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])
scala> case class Student(id: BigInt, name: String)
defined class Student
scala> df.as[Student]
res69: org.apache.spark.sql.Dataset[Student] = [id: bigint, name: string]

DS-->DF

這種方法就是在給出每一列的類型后,使用as方法,轉成Dataset,這在數據類型是DataFrame又需要針對各個字段處理時極為方便。在使用一些特殊的操作時,一定要加上 import spark.implicits._ 不然toDF、toDS無法使用。

scala> ds.collect
res73: Array[People] = Array(People(null,Michael), People(30,Andy), People(19,Justin))

scala> ds.toDF
res74: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

 三者的共性

(1)RDD、DataFrame、Dataset全都是spark平台下的分布式彈性數據集,為處理超大型數據提供便利;

(2)三者都有惰性機制,在進行創建、轉換,如map方法時,不會立即執行,只有在遇到Action如foreach時,三者才會開始遍歷運算;

(3)三者有許多共同的函數,如filter,排序等;

(4)在對DataFrame和Dataset進行操作許多操作都需要這個包:import spark.implicits._(在創建好SparkSession對象后盡量直接導入)

互相轉化

RDD關心數據,DataFrame關心結構,DataSet關心類型;

  ① 將RDD轉換為DataFrame,需要增加結構信息,所以調用toDF方法,需要增加結構;

  ② 將RDD轉換為DataSet,需要增加結構和類型信息,所以需要轉換為指定類型后,調用toDS方法;

  ③ 將DataFrame轉換為DataSet時,因為已經包含結構信息,只有增加類型信息就可以,所以調用as[類型]

  ④因為DF中本身包含數據,所以轉換為RDD時,直接調用rdd即可;

  ⑤因為DS中本身包含數據,所以轉換為RDD時,直接調用rdd即可;

  ⑥因為DS本身包含數據結構信息,所以轉換為DF時,直接調用toDF即可

三者的區別

聯系:RDD、DataFrame、DataSet三者的聯系是都是spark當中的一種數據類型,RDD是SparkCore當中的,DataFrame和DataSet都是SparkSql中的,它倆底層都基於RDD實現的;

區別:RDD 優點: ①編譯時類型安全 ;②面向對象的編程風格 ; ③直接通過類名點的方式來操作數據; 缺點是通信or IO操作都需要序列化和反序列化的性能開銷 ,比較耗費性能; GC的性能開銷 ,頻繁的創建和銷毀對象, 勢必會增加GC;

DataFrame引入了schema和off-heap堆外內存不會頻繁GC,減少了內存的開銷; 缺點是類型不安全;

DataSet結合了它倆的優點並且把缺點給屏蔽掉了;

 

1. RDD: ① RDD一般和spark mlib同時使用; ② RDD不支持sparksql操作

2. DataFrame:

  1)與RDD和Dataset不同,DataFrame每一行的類型固定為Row,每一列的值沒法直接訪問,只有通過解析才能獲取各個字段的值,

testDF.foreach{
  line =>
    val col1=line.getAs[String]("col1")
    val col2=line.getAs[String]("col2")
}

  2)DataFrame與Dataset一般不與spark mlib同時使用

  3)DataFrame與Dataset均支持sparksql的操作,比如select,groupby之類,還能注冊臨時表/視窗,進行sql語句操作,如:dataDF.createOrReplaceTempView("tmp")

    spark.sql("select  ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)

  4)DataFrame與Dataset支持一些特別方便的保存方式,比如保存成csv,可以帶上表頭,這樣每一列的字段名一目了然

val saveoptions = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://hadoop102:9000/test")  //保存
datawDF.write.format("com.atguigu.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()  //讀取

val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://hadoop102:9000/test")
val datarDF= spark.read.options(options).format("com.atguigu.spark.csv").load()
利用這樣的保存方式,可以方便的獲得字段名和列的對應,而且分隔符(delimiter)可以自由指定。

3. Dataset:

  1)Dataset和DataFrame擁有完全相同的成員函數,區別只是每一行的數據類型不同。

  2)DataFrame也可以叫Dataset[Row],每一行的類型是Row,不解析,每一行究竟有哪些字段,各個字段又是什么類型都無從得知,只能用上面提到的getAS方法或者模式匹配拿出特定字段。而Dataset中,每一行是什么類型是不一定的,在自定義了case class之后可以很自由的獲得每一行的信息

case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型
/**
 rdd
 ("a", 1)
 ("b", 1)
 ("a", 1)
**/
val test: Dataset[Coltest]=rdd.map{line=>
      Coltest(line._1,line._2)
    }.toDS
test.map{
      line=>
        println(line.col1)
        println(line.col2)
    }

可以看出,Dataset在需要訪問列中的某個字段時是非常方便的,然而,如果要寫一些適配性很強的函數時,如果使用Dataset,行的類型又不確定,可能是各種case class,無法實現適配,這時候用DataFrame即Dataset[Row]就能比較好的解決問題

IDEA創建SparkSQL程序

object TestSparkSql {
  def main(args: Array[String]): Unit = {
    //創建配置對象
    val conf: SparkConf = new SparkConf().setAppName("SQL").setMaster("local[*]")
    //創建環境對象
    val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    //導入隱式轉換
    import sparkSession.implicits._

    //執行操作
    // TODO 創建DataFrame
    val df: DataFrame = sparkSession.read.json("input/input.json")
    df.createTempView("user")
    sparkSession.sql("select * from user")
    //df.show()
    // TODO 創建DataSet
    val ds: Dataset[Employ] = Seq(Employ("jing", 18)).toDS()
    //ds.show()
    // TODO 將DataFrame轉換為DataSet
    val dfToDs: Dataset[Employ] = df.as[Employ]
    dfToDs.foreach(x => {
      println(x.name + "\t" + x.age)
    })
    //TODO 將RDD轉換為DataSet
    val rdd: RDD[(String, Int)] = sparkSession.sparkContext.makeRDD(Array(("aa", 19)))
    val employRdd: RDD[Employ] = rdd.map {
      case (name, age) => Employ(name, age)
    }
    //employRdd.toDS().show()

    // TODO 將RDD轉換為DataFrame
    //rdd.toDF().show()
    val rddToDf: DataFrame = sparkSession.sparkContext.makeRDD(Array(("kris", 18))).toDF("username", "age")

    //TODO 將DataFrame轉換為RDD[Row]
    df.rdd.foreach(row => {
      println(row.getLong(0)+ "," + row.getString(1))
    })

    // TODO 將DataSet轉換為RDD[類型]
    val dsToRdd: RDD[Employ] = df.as[Employ].rdd
    sparkSession.stop()
  }
}
case class Employ(name: String, age: BigInt)

用戶自定義函數

 Spark SQL數據的加載與保存

通用加載/保存方法 load和save

通用的讀寫方法是  sparkSql只讀這parquet file這種類型的文件;  否則要改變它的文件類型需要加.format 
加上format("json");輸出也是這個類型

scala>val df = spark.read.load("/opt/module/spark/spark-local/examples/src/main/resources/users.parquet").show

scala>df.select("name", " color").write.save("user.parquet") //保存數據
java.lang.RuntimeException: file:/opt/module/spark/spark-local/examples/src/main/resources/people.json is not a Parquet file. 
用load讀取json數據
scala> spark.read.format("json").load("/opt/module/spark/spark-local/examples/src/main/resources/people.json").show

df.write.format("json").save("/..")

spark.read.format("json").mode("overwrite").save("/..json")

MySQL  Spark之讀取MySQL數據的方式

Spark SQL可以通過JDBC從關系型數據庫中讀取數據的方式創建DataFrame,通過對DataFrame一系列的計算后,還可以將數據再寫回關系型數據庫中。

可在啟動shell時指定相關的數據庫驅動路徑,或者將相關的數據庫驅動放到spark的類路徑下。

[kris@hadoop101 jars]$ cp /opt/software/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar ./

scala> val connectionProperties = new java.util.Properties()
connectionProperties: java.util.Properties = {}

scala> connectionProperties.put("user", "root")
res0: Object = null

scala> connectionProperties.put("password", "123456")
res1: Object = null

scala> val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop101:3306/rdd", "test", connectionProperties)
jdbcDF2: org.apache.spark.sql.DataFrame = [id: int, name: string]

scala> jdbcDF2.show
+---+-------+
| id|   name|
+---+-------+
|  1| Google|
|  2|  Baidu|
|  3|    Ali|
|  4|Tencent|
|  5| Amazon|
+---+-------+

jdbcDF2.write.mode("append").jdbc("jdbc:mysql://hadoop101:3306/rdd", "test", connectionProperties)


scala> val rdd = sc.makeRDD(Array((6, "FaceBook")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[4] at makeRDD at <console>:24

scala> rdd.toDF("id", "name")
res5: org.apache.spark.sql.DataFrame = [id: int, name: string]

scala> val df = rdd.toDF("id", "name")
df: org.apache.spark.sql.DataFrame = [id: int, name: string]

scala> df.show
+---+--------+
| id|    name|
+---+--------+
|  6|FaceBook|
+---+--------+
scala> df.write.mode("append").jdbc("jdbc:mysql://hadoop101:3306/rdd", "test", connectionProperties)
scala> jdbcDF2.show
+---+--------+
| id|    name|
+---+--------+
|  1|  Google|
|  2|   Baidu|
|  3|     Ali|
|  4| Tencent|
|  5|  Amazon|
|  6|FaceBook|
+---+--------+

代碼

import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

object TestSparkSql {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkSql").setMaster("local[*]")
    val sc: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
    //method01(sc)
    method02(sc)
    //method03(sc)
  }
  /** * 方式一:不指定查詢條件
    * 所有的數據由RDD的一個分區處理,如果你這個表數據量很大,表的所有數據都是由RDD的一個分區處理,很可能會出現OOM
    * @param sc
    */
  def method01(sc: SparkSession): Unit = {
    // 將本地的數據讀入 RDD
    val url = "jdbc:mysql://hadoop101/company?"
    val table = "staff"
    val prop = new Properties()
    prop.setProperty("user", "root")
    prop.setProperty("password", "123456")
    //需要傳入Mysql的URL、表名、properties(連接數據庫的用戶名密碼)
    val df: DataFrame = sc.read.jdbc(url, table, prop)
    println(df.count()) //3
    println(df.rdd.partitions.size) //1
    df.createOrReplaceTempView("staff")
    sc.sql("select * from staff where id <=2").show()
    //df.show()
    sc.stop()
  }
  /** * 方式二:指定數據庫字段的范圍
    * 通過lowerBound和upperBound 指定分區的范圍
    * 通過columnName 指定分區的列(只支持整形)
    * 通過numPartitions 指定分區數量 (不宜過大)
    * 說明:將表的數據分布到RDD的幾個分區中,分區的數量由numPartitions參數決定,在理想情況下,每個分區處理相同數量的數據,我們在使用的時候不建議將這個值設置的比較大,因為這可能導致數據庫掛掉!這個函數的缺點就是只能使用整形數據字段作為分區關鍵字。
    * @param sc
    */
  def method02(sc: SparkSession): Unit = {
    val lowerBound = 1
    val upperBound = 100000
    val numPartitions = 5
    val url = "jdbc:mysql://hadoop101/company?user=root&password=123456"
    val prop = new Properties()
    val df: DataFrame = sc.read.jdbc(url, "staff", "id", lowerBound, upperBound,numPartitions,prop)

    df.show()
    println(df.count())
    println(df.rdd.partitions.length) //5個分區
  }

  /** * 方式三:根據任意字段進行分區
    * 通過predicates將數據根據score分為2個區
    * 基於前面兩種方法的限制,Spark還提供了根據任意字段進行分區的方法;rdd的分區數量就等於predicates.length
     * @param sc
    */
  def method03(sc: SparkSession) = {
    val predicates = Array[String]("id <=2", "id > 1 and id < 3") //2個分區
    val url = "jdbc:mysql://hadoop101/company?user=root&password=123456"
    val prop = new Properties()
    val df: DataFrame = sc.read.jdbc(url,"staff",predicates,prop)
    println(df.count()) //3
    println(df.rdd.partitions.length) //2
    df.show()
  }

}


方式四: 通過load獲取,和方式二類似
options函數支持url、driver、dbtable、partitionColumn、lowerBound、upperBound以及numPartitions選項,與方法二的參數一致。
其內部實現原理部分和方法二大體一致。同時load方法還支持json、orc等數據源的讀取。
val df: DataFrame = sc.read.format("jdbc").options(Map ("url" -> url, "dbtable" -> "staff")).load() 加載條件查詢后的數據,報錯: Every derived table must have its own alias,這句話的意思是說每個派生出來的表都必須有一個自己的別名,加了一個沒有別名即可
val df: DataFrame = sc.read.format("jdbc").options(Map ("url" -> url, "dbtable" -> "(select s1.id,s2.name,s1.age from stu1 s1 join stu2 s2 on s1.id = s2.id ) stu")).load()

 

Hive   Spark之HiveSupport連接(spark-shell和IDEA)

 spark.sparkContext.setLogLevel("WARN") //設置日志輸出級別

Apache Hive是Hadoop上的SQL引擎,Spark SQL編譯時可以包含Hive支持,也可以不包含。包含Hive支持的Spark SQL可以支持Hive表訪問、UDF(用戶自定義函數)以及Hive查詢語言(HQL)等。spark-shell默認是Hive支持的;代碼中是默認不支持的,需要手動指定(加一個參數即可)。

如果要使用內嵌的Hive,什么都不用做,直接用就可以了。

可以修改其數據倉庫地址,參數為:--conf spark.sql.warehouse.dir=./wear

scala> spark.sql("create table emp(name String, age Int)").show
19/04/11 01:10:17 WARN HiveMetaStore: Location: file:/opt/module/spark/spark-local/spark-warehouse/emp specified for non-external table:emp

scala> spark.sql("load data local inpath '/opt/module/spark/spark-local/examples/src/main/resources/people.txt' into table emp").show

scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|      emp|      false|
+--------+---------+-----------+
scala> spark.sql("select * from emp").show

/opt/module/spark/spark-local/spark-warehouse/emp
[kris@hadoop101 emp]$ ll
-rwxr-xr-x. 1 kris kris 32 4月  11 01:10 people.txt

外部Hive應用

[kris@hadoop101 spark-local]$ rm -rf metastore_db/ spark-warehouse/

[kris@hadoop101 conf]$ cp hive-site.xml /opt/module/spark/spark-local/conf/

[kris@hadoop101 spark-local]$ bin/spark-shell 
scala> spark.sql("show tables").show
+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|            bigtable|      false|
| default|            business|      false|
| default|                dept|      false|
| default|      dept_partition|      false|
| default|     dept_partition2|      false|
| default|     dept_partitions|      false|
| default|                 emp|      false|
...

[kris@hadoop101 spark-local]$ bin/spark-sql 
log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
spark-sql (default)> show tables;

代碼中操作Hive

log4j.properties

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
View Code

 拷貝Hadoop中core-site.xml、hdfs-site.xml,Hive中hive-site.xml三個文件到resources中(也可以只拷貝hive-site.xml),集群環境把hive的配置文件要發到$SPARK_HOME/conf目錄下;

Maven所依賴的jar包:

    <dependencies>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.27</version>
        </dependency>
        <!--spark操作Hive所需引入的包 spark版本-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>


    </dependencies>
View Code
val sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
支持hive

 測試:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object TestSparkSql {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkSql").setMaster("local[*]")
    val sc: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
    sc.sql("show tables").show()
    sc.stop()
  }
}

              

 

SparkSQL 的元數據

1.1元數據的狀態

SparkSQL 的元數據的狀態有兩種:

1、in_memory,用完了元數據也就丟了

2、hive , 通過hive去保存的,也就是說,hive的元數據存在哪兒,它的元數據也就存在哪兒。

換句話說,SparkSQL的數據倉庫在建立在Hive之上實現的。我們要用SparkSQL去構建數據倉庫的時候,必須依賴於Hive。

2.2Spark-SQL腳本

如果用戶直接運行bin/spark-sql命令。會導致我們的元數據有兩種狀態:

1、in-memory狀態:如果SPARK-HOME/conf目錄下沒有放置hive-site.xml文件,元數據的狀態就是in-memory

2、hive狀態:如果我們在SPARK-HOME/conf目錄下放置了,hive-site.xml文件,那么默認情況下,spark-sql的元數據的狀態就是hive.

 

伴生對象相當於static,可直接類名.
給類起別名,相當於屬性使用type ..

spark.sql("select age, addName(name) from user").show

scala> case class tbStock(ordernumber:String,locationid:String,dateid:String) extends Serializable
scala> val tbStockRdd = spark.sparkContext.textFile("/opt/module/datas/sparkData/tbStock.txt")
tbStockRdd: org.apache.spark.rdd.RDD[String] = /opt/module/datas/sparkData/tbStock.txt MapPartitionsRDD[30] at textFile at <console>:23
scala> val tbStockDS = tbStockRdd.map(_.split("\t")).map(x => tbStock(x(0), x(1), x(2))).toDS
tbStockDS: org.apache.spark.sql.Dataset[tbStock] = [ordernumber: string, locationid: string ... 1 more field]

scala> tbStockDS.show
+-----------+----------+----------+
|ordernumber|locationid|    dateid|
+-----------+----------+----------+
|      lj111|        jd| 2018-3-13|
|      lj112|        jd| 2018-2-13|
|      lj113|        jd| 2019-1-13|
|      lj114|        jd| 2019-3-13|
|      lj115|        jd| 2018-9-13|
|      lj116|        jd|2018-11-13|
|      lj117|        jd|2017-12-13|
|      lj118|        jd| 2017-5-13|
+-----------+----------+----------+

scala> case class tbStockDetail(ordernumber:String, rownum:Int, itemid:String, number:Int, price:Double, amount:Double) extends Serializable
defined class tbStockDetail
scala> val tbStockDetailRdd = spark.sparkContext.textFile("/opt/module/datas/sparkData/tbStockDetail.txt")
tbStockDetailRdd: org.apache.spark.rdd.RDD[String] = /opt/module/datas/sparkData/tbStockDetail.txt MapPartitionsRDD[43] at textFile at <console>:23
scala> val tbStockDetailDS = tbStockDetailRdd.map(_.split("\t")).map(x => tbStockDetail(x(0), x(1).trim().toInt, x(2), x(3).trim().toInt, x(4).trim().toDouble,x(5).trim().toDouble)).toDS
tbStockDetailDS: org.apache.spark.sql.Dataset[tbStockDetail] = [ordernumber: string, rownum: int ... 4 more fields]

scala> tbStockDetailDS.show
+-----------+------+------+------+-----+------+
|ordernumber|rownum|itemid|number|price|amount|
+-----------+------+------+------+-----+------+
|      lj111|    12|item11|    10|100.0| 300.0|
|      lj112|    12|item12|    10|100.0| 200.0|
|      lj113|    12|item13|    10|100.0| 300.0|
|      lj114|    12|item14|    10|100.0| 100.0|
|      lj115|    12|item15|    10|100.0| 300.0|
|      lj116|    12|item16|    10|100.0| 700.0|
|      lj117|    12|item17|    10|100.0| 600.0|
|      lj118|    12|item18|    10|100.0| 500.0|
+-----------+------+------+------+-----+------+

 

tbstock、tbstockdetail--amount 、tbdate
計算所有訂單中每年的銷售單數、銷售總額
三個表連接后以count(distinct a.ordernumber)計銷售單數,sum(b.amount)計銷售總額
select 
    theyear, 
    count(tbstock.ordernumber), 
    sum(tbstockdetail.amount) 
from tbstock join tbstockdetail on tbstock.ordernumber = tbstockdetail.ordernumber 
             join tbdate on tbdate.dateid = tbstock.dateid 
    group by tbdate.theyear 
    order by tbdate.theyear;


統計每年最大金額訂單的銷售額:
統計每個訂單一共有多少銷售額
select 
    a.dateid, 
    a.ordernumber, 
    sum(b.amount) sumAmount
from tbstock a join tbstockdetail b on a.ordernumber = b.ordernumber
    group by a.dateid, a.ordernumber

select 
    theyear, 
    max(c.sumAmount) sumOfAmount
from tbdate join (select a.dateid, a.ordernumber, sum(b.amount) sumAmount
from tbstock a join tbstockdetail b on a.ordernumber = b.ordernumber
    group by a.dateid, a.ordernumber)c on tbdate.dateid = c.dateid
    group by tbdate.theyear order by tbdate.theyear desc

計算所有訂單中每年最暢銷貨品
目標:統計每年最暢銷貨品(哪個貨品銷售額amount在當年最高,哪個就是最暢銷貨品)
1求出每年每個貨品的銷售額
每年 tbdate.theyear 
貨品tbstockdetail.itemid
銷售額amount在當年最高 
select tbdate.theyear, tbstockdetail.itemid, sum(tbstockdetail.amount) sumAmount
from tbdate join tbstock on tbdate.dateid = tbstock.dateid
join tbstockdetail on tbstockdetail.ordernumber = tbstock.ordernumber
group by tbdate.theyear, tbstockdetail.itemid

2在第一步的基礎上,統計每年 所有 貨品中的最大金額
select aa.theyear, max(sumAmount) maxAmount
    from (
select tbdate.theyear, tbstockdetail.itemid, sum(tbstockdetail.amount) sumAmount
    from tbdate 
    join tbstock on tbdate.dateid = tbstock.dateid
    join tbstockdetail on tbstockdetail.ordernumber = tbstock.ordernumber
        group by tbdate.theyear, tbstockdetail.itemid)aa
        group by aa.theyear

用最大銷售額和統計好的每個貨品的銷售額join,以及用年join,集合得到最暢銷貨品那一行信息
每年每個貨品的銷售額 join 每年所有貨品中的最大金額
select distinct e.theyear, e.itemid, f.maxAmount
    from (
select tbdate.theyear, tbstockdetail.itemid, sum(tbstockdetail.amount) sumAmount
    from tbdate 
        join tbstock on tbdate.dateid = tbstock.dateid
        join tbstockdetail on tbstockdetail.ordernumber = tbstock.ordernumber
    group by tbdate.theyear, tbstockdetail.itemid)e join (select aa.theyear, max(sumAmount) maxAmount
    from (
select tbdate.theyear, tbstockdetail.itemid, sum(tbstockdetail.amount) sumAmount
    from tbdate join tbstock on tbdate.dateid = tbstock.dateid
        join tbstockdetail on tbstockdetail.ordernumber = tbstock.ordernumber
    group by tbdate.theyear, tbstockdetail.itemid)aa
    group by aa.theyear)f on e.theyear = f.theyear and e.sumAmount = f.maxAmount
    order by e.theyear

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM