1.1 什么是spark SQL

​ Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫做DataFrame並且作為分布式SQL查詢引擎的作用。類似於hive的作用。

1.2 spark SQL的特點

1、容易集成:安裝Spark的時候,已經集成好了。不需要單獨安裝。
2、統一的數據訪問方式:JDBC、JSON、Hive、parquet文件(一種列式存儲文件,是SparkSQL默認的數據源,hive中也支持)
3、完全兼容Hive。可以將Hive中的數據,直接讀取到Spark SQL中處理。
一般在生產中,基本都是使用hive做數據倉庫存儲數據,然后用spark從hive讀取數據進行處理。
4、支持標准的數據連接:JDBC、ODBC
5、計算效率比基於mr的hive高,而且hive2.x版本中,hive建議使用spark作為執行引擎

二、spark SQL基本原理

2.1 DataFrame和DataSet基本概念

2.1.1 DataFrame

DataFrame是組織成命名列的數據集。它在概念上等同於關系數據庫中的表,里面有表的結構以及數據,但在底層具有更豐富的優化。DataFrames可以從各種來源構建,
例如:
結構化數據文件
hive中的表
外部數據庫或現有RDDs
DataFrame API支持的語言有Scala,Java,Python和R。

​ 比起RDD,DataFrame多了數據的結構信息,即schema。RDD是分布式的 Java對象的集合。DataFrame是分布式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子以外,更重要的特點是提升執行效率、減少數據讀取以及執行計划的優化。

2.1.2 DataSet

Dataset是一個分布式的數據收集器。這是在Spark1.6之后新加的一個接口,兼顧了RDD的優點(強類型,可以使用功能強大的lambda)以及Spark SQL的執行器高效性的優點。所以可以把DataFrames看成是一種特殊的Datasets,即:Dataset(Row)

2.2 創建DataFrame的方式

2.2.1 SparkSession對象

​ Apache Spark 2.0引入了SparkSession,其為用戶提供了一個統一的切入點來使用Spark的各項功能,並且允許用戶通過它調用DataFrame和Dataset相關API來編寫Spark程序。最重要的是,它減少了用戶需要了解的一些概念,使得我們可以很容易地與Spark交互。
​ 在2.0版本之前,與Spark交互之前必須先創建SparkConf和SparkContext。然而在Spark 2.0中,我們可以通過SparkSession來實現同樣的功能,而不需要顯式地創建SparkConf, SparkContext 以及 SQLContext,因為這些對象已經封裝在SparkSession中。
​ 要注意一點,在我用的這個spark版本中,直接使用new SQLContext() 來創建SQLContext對象,會顯示該方式已經被棄用了(IDEA中會顯示已棄用),建議使用SparkSession來獲取SQLContext對象。

2.2.2 通過case class樣本類

這種方式在scala中比較常用,因為case class是scala的特色

/**
表 t_stu 的結構為:
id name age
*/

object CreateDF {
  def main(args: Array[String]): Unit = {
    //這是最新的獲取SQLContext對象的方式
    //2、創建SparkSession對象,設置master,appname
    val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
    //3、通過spark獲取sparkContext對象,讀取數據
    val lines = spark.sparkContext.textFile("G:\\test\\t_stu.txt").map(_.split(","))

    //4、將數據映射到case class中,也就是數據映射到表的對應字段中
    val tb = lines.map(t=>emp(t(0).toInt,t(1),t(2).toInt))
    //這里必須要加上隱式轉換,否則無法調用 toDF 函數
    import spark.sqlContext.implicits._

    //5、生成df
    val df2 = tb.toDF()

    //相當於select name from t_stu
    df1.select($"name").show()

    //關閉spark對象
    spark.stop()
  }
}

/*1、定義case class,每個屬性對應表中的字段名以及類型
     一般生產中為了方便,會全部定義為string類型,然后有需要的時候
     才根據實際情況將string轉為需要的類型
   這一步相當於定義表的結構
*/
case class emp(id:Int,name:String,age:Int)

總結步驟為:

1、定義case class,用來表結構 2、創建sparkSession對象,用來讀取數據 3、將rdd中的數據和case class映射 4、調用 toDF 函數將rdd轉為 DataFrame

2.2.3 通過StructType類

這種方式java比較常用

package SparkSQLExer

import org.apache
import org.apache.spark
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}

/**
  * 創建dataschema方式2:
  * 通過spark session對象創建,表結構通過StructType創建
  */
object CreateDF02 {
  def main(args: Array[String]): Unit = {
    val sparkS = SparkSession.builder().master("local").appName("create schema").getOrCreate()

    //1、通過StructType創建表結構schema,里面表的每個字段使用 StructField定義
    val tbSchema = StructType(List(
        StructField("id",DataTypes.IntegerType),
        StructField("name",DataTypes.StringType),
        StructField("age",DataTypes.IntegerType)
      ))

    //2、讀取數據
    var lines = sparkS.sparkContext.textFile("G:\\test\\t_stu.txt").map(_.split(","))

    //3、將數據映射為ROW對象
    val rdd1 = lines.map(t=>Row(t(0).toInt,t(1),t(2).toInt))

    //4、創建表結構和表數據映射,返回的就是df
    val df2 = sparkS.createDataFrame(rdd1, tbSchema)

    //打印表結構
    df2.printSchema()

    sparkS.stop()

  }

}

總結步驟為:

1、通過StructType創建表結構schema,里面表的每個字段使用 StructField定義 2、通過sparkSession.sparkContext讀取數據 3、將數據映射格式為Row對象 4、將StructType和數據Row對象映射,返回df

2.2.4 使用json等有表格式的文件類型

package SparkSQLExer

import org.apache.spark.sql.SparkSession

/**
  * 創建df方式3:通過有格式的文件直接導入數據以及表結構,比如json格式的文件
  * 返回的直接就是一個DF
  */
object CreateDF03 {
  def main(args: Array[String]): Unit = {
    val sparkS = SparkSession.builder().master("local").appName("create df through json").getOrCreate()

    //讀取json方式1:
    val jsonrdd1= sparkS.read.json("path")

    //讀取json方式2:
    val jsonrdd1= sparkS.read.format("json").load("path")

    sparkS.stop()
  }
}

這種方式比較簡單,就是直接讀取json文件而已
sparkS.read.xxxx讀取任意文件時,返回的都是DF對象

2.3 操作DataFrame

2.3.1 DSL語句

DSL語句其實就是將sql語句的一些操作轉為類似函數的方式去調用,比如:

df1.select("name").show

例子:

為了方便,直接在spark-shell里操作了,
spark-shell --master spark://bigdata121:7077 1、打印表結構 scala> df1.printSchema root |-- empno: integer (nullable = true) |-- ename: string (nullable = true) |-- job: string (nullable = true) |-- mgr: integer (nullable = true) |-- hiredate: string (nullable = true) |-- sal: integer (nullable = true) |-- comm: integer (nullable = true) |-- deptno: integer (nullable = true) 2、顯示當前df的表數據或者查詢結果的數據 scala> df1.show +-----+------+---------+----+----------+----+----+------+ |empno| ename| job| mgr| hiredate| sal|comm|deptno| +-----+------+---------+----+----------+----+----+------+ | 7369| SMITH| CLERK|7902|1980/12/17| 800| 0| 20| | 7499| ALLEN| SALESMAN|7698| 1981/2/20|1600| 300| 30| | 7521| WARD| SALESMAN|7698| 1981/2/22|1250| 500| 30| | 7566| JONES| MANAGER|7839| 1981/4/2|2975| 0| 20| | 7654|MARTIN| SALESMAN|7698| 1981/9/28|1250|1400| 30| | 7698| BLAKE| MANAGER|7839| 1981/5/1|2850| 0| 30| | 7782| CLARK| MANAGER|7839| 1981/6/9|2450| 0| 10| | 7788| SCOTT| ANALYST|7566| 1987/4/19|3000| 0| 20| | 7839| KING|PRESIDENT|7839|1981/11/17|5000| 0| 10| | 7844|TURNER| SALESMAN|7698| 1981/9/8|1500| 0| 30| | 7876| ADAMS| CLERK|7788| 1987/5/23|1100| 0| 20| | 7900| JAMES| CLERK|7698| 1981/12/3| 950| 0| 30| | 7902| FORD| ANALYST|7566| 1981/12/3|3000| 0| 20| | 7934|MILLER| CLERK|7782| 1982/1/23|1300| 0| 10| +-----+------+---------+----+----------+----+----+------+ 3、執行select, 相當於select xxx form xxx where xxx scala> df1.select("ename","sal").where("sal>2000").show +------+----+ | ename| sal| +------+----+ | SMITH| 800| | ALLEN|1600| | WARD|1250| | JONES|2975| |MARTIN|1250| | BLAKE|2850| | CLARK|2450| | SCOTT|3000| | KING|5000| |TURNER|1500| | ADAMS|1100| | JAMES| 950| | FORD|3000| |MILLER|1300| +------+----+ 4、對某些列進行操作 對某個指定進行操作時,需要加上$符號,然后后面才能操作 $代表 取出來以后,再做一些操作。 注意:這個 $ 的用法在ideal中無法正常使用,解決方法下面說 scala> df1.select($"ename",$"sal",$"sal"+100).show +------+----+-----------+ | ename| sal|(sal + 100)| +------+----+-----------+ | SMITH| 800| 900| | ALLEN|1600| 1700| | WARD|1250| 1350| | JONES|2975| 3075| |MARTIN|1250| 1350| | BLAKE|2850| 2950| | CLARK|2450| 2550| | SCOTT|3000| 3100| | KING|5000| 5100| |TURNER|1500| 1600| | ADAMS|1100| 1200| | JAMES| 950| 1050| | FORD|3000| 3100| |MILLER|1300| 1400| +------+----+-----------+ 5、過濾行 scala> df1.filter($"sal">2000).show +-----+-----+---------+----+----------+----+----+------+ |empno|ename| job| mgr| hiredate| sal|comm|deptno| +-----+-----+---------+----+----------+----+----+------+ | 7566|JONES| MANAGER|7839| 1981/4/2|2975| 0| 20| | 7698|BLAKE| MANAGER|7839| 1981/5/1|2850| 0| 30| | 7782|CLARK| MANAGER|7839| 1981/6/9|2450| 0| 10| | 7788|SCOTT| ANALYST|7566| 1987/4/19|3000| 0| 20| | 7839| KING|PRESIDENT|7839|1981/11/17|5000| 0| 10| | 7902| FORD| ANALYST|7566| 1981/12/3|3000| 0| 20| +-----+-----+---------+----+----------+----+----+------+ 6、分組以及計數 scala> df1.groupBy($"deptno").count.show +------+-----+ |deptno|count| +------+-----+ | 20| 5| | 10| 3| | 30| 6| +------+-----+

上面說到在ide中 select($"name")中無法正常使用,解決方法為:

在該語句之前加上這么一句:
import spark.sqlContext.implicits._ 主要還是因為類型的問題,加上隱式轉換就好了

2.3.2 sql語句

df對象不能直接執行sql。需要生成一個視圖,再執行SQL。
需要指定創建的視圖名稱,后面視圖名稱就相當於表名。
視圖后面還會細說,這里先有個概念
例子:

val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
。。。。。。。。。。。。。。
//通過df對象創建臨時視圖。視圖名就相當於表名
df1.createOrReplaceTempView("emp")

//通過sparksession對象執行執行
spark.sql("select * from emp").show spark.sql("select * from emp where sal > 2000").show spark.sql("select deptno,count(1) from emp group by deptno").show //可以創建多個視圖,不沖突 df1.createOrReplaceTempView("emp12345") spark.sql("select e.deptno from emp12345 e").show

2.3.3 多表查詢

scala> case class Dept(deptno:Int,dname:String,loc:String) defined class Dept scala> val lines = sc.textFile("/usr/local/tmp_files/dept.csv").map(_.split(",")) lines: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[68] at map at <console>:24 scala> val allDept = lines.map(x=>Dept(x(0).toInt,x(1),x(2))) allDept: org.apache.spark.rdd.RDD[Dept] = MapPartitionsRDD[69] at map at <console>:28 scala> val df2 = allDept.toDF df2: org.apache.spark.sql.DataFrame = [deptno: int, dname: string ... 1 more field] scala> df2.create createGlobalTempView createOrReplaceTempView createTempView scala> df2.createOrReplaceTempView("dept") scala> spark.sql("select dname,ename from emp12345,dept where emp12345.deptno=dept.deptno").show +----------+------+ | dname| ename| +----------+------+ | RESEARCH| SMITH| | RESEARCH| JONES| | RESEARCH| SCOTT| | RESEARCH| ADAMS| | RESEARCH| FORD| |ACCOUNTING| CLARK| |ACCOUNTING| KING| |ACCOUNTING|MILLER| | SALES| ALLEN| | SALES| WARD| | SALES|MARTIN| | SALES| BLAKE| | SALES|TURNER| | SALES| JAMES| +----------+------+

2.4 創建DataSet

2.4.1 通過case class

和DataFrame類似,只是把 toDF改為調用 toDS 方法

package SparkSQLExer

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object CreateDS {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
    val lines = spark.sparkContext.textFile("G:\\test\\t_stu.txt").map(_.split(","))
    val tb = lines.map(t=>emp1(t(0).toInt,t(1),t(2).toInt))
    import spark.sqlContext.implicits._
    val df1 = tb.toDS()
    df1.select($"name")

  }
}

case class emp1(id:Int,name:String,age:Int)

2.4.2 通過序列Seq類對象

package SparkSQLExer

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object CreateDS {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()

    //創建一個序列對象,里面都是emp1對象,映射的數據,然后直接toDS轉為DataSet
    val ds1 = Seq(emp1(1,"king",20)).toDS()
    ds1.printSchema()

  }
}

case class emp1(id:Int,name:String,age:Int)

2.4.3 使用json格式文件

定義case class case class Person(name:String,age:BigInt) 使用JSON數據生成DataFrame val df = spark.read.format("json").load("/usr/local/tmp_files/people.json") 將DataFrame轉換成DataSet df.as[Person].show df.as[Person] 是一個 DataSet as[T]中的泛型需要是一個case class類,用於映射表頭

2.5 操作DataSet

DataSet支持的算子其實就是rdd和DataFrame算子的結合。

使用emp.json 生成DataFrame
val empDF = spark.read.json("/usr/local/tmp_files/emp.json") scala> empDF.show +----+------+-----+------+----------+---------+----+----+ |comm|deptno|empno| ename| hiredate| job| mgr| sal| +----+------+-----+------+----------+---------+----+----+ | | 20| 7369| SMITH|1980/12/17| CLERK|7902| 800| | 300| 30| 7499| ALLEN| 1981/2/20| SALESMAN|7698|1600| | 500| 30| 7521| WARD| 1981/2/22| SALESMAN|7698|1250| | | 20| 7566| JONES| 1981/4/2| MANAGER|7839|2975| |1400| 30| 7654|MARTIN| 1981/9/28| SALESMAN|7698|1250| | | 30| 7698| BLAKE| 1981/5/1| MANAGER|7839|2850| | | 10| 7782| CLARK| 1981/6/9| MANAGER|7839|2450| | | 20| 7788| SCOTT| 1987/4/19| ANALYST|7566|3000| | | 10| 7839| KING|1981/11/17|PRESIDENT| |5000| | 0| 30| 7844|TURNER| 1981/9/8| SALESMAN|7698|1500| | | 20| 7876| ADAMS| 1987/5/23| CLERK|7788|1100| | | 30| 7900| JAMES| 1981/12/3| CLERK|7698| 950| | | 20| 7902| FORD| 1981/12/3| ANALYST|7566|3000| | | 10| 7934|MILLER| 1982/1/23| CLERK|7782|1300| +----+------+-----+------+----------+---------+----+----+ scala> empDF.where($"sal" >= 3000).show +----+------+-----+-----+----------+---------+----+----+ |comm|deptno|empno|ename| hiredate| job| mgr| sal| +----+------+-----+-----+----------+---------+----+----+ | | 20| 7788|SCOTT| 1987/4/19| ANALYST|7566|3000| | | 10| 7839| KING|1981/11/17|PRESIDENT| |5000| | | 20| 7902| FORD| 1981/12/3| ANALYST|7566|3000| +----+------+-----+-----+----------+---------+----+----+ #### empDF 轉換成 DataSet 需要 case class scala> case class Emp(empno:BigInt,ename:String,job:String,mgr:String,hiredate:String,sal:BigInt,comm:String,deptno:BigInt) defined class Emp scala> val empDS = empDF.as[Emp] empDS: org.apache.spark.sql.Dataset[Emp] = [comm: string, deptno: bigint ... 6 more fields] scala> empDS.filter(_.sal > 3000).show +----+------+-----+-----+----------+---------+---+----+ |comm|deptno|empno|ename| hiredate| job|mgr| sal| +----+------+-----+-----+----------+---------+---+----+ | | 10| 7839| KING|1981/11/17|PRESIDENT| |5000| +----+------+-----+-----+----------+---------+---+----+ scala> empDS.filter(_.deptno == 10).show +----+------+-----+------+----------+---------+----+----+ |comm|deptno|empno| ename| hiredate| job| mgr| sal| +----+------+-----+------+----------+---------+----+----+ | | 10| 7782| CLARK| 1981/6/9| MANAGER|7839|2450| | | 10| 7839| KING|1981/11/17|PRESIDENT| |5000| | | 10| 7934|MILLER| 1982/1/23| CLERK|7782|1300| +----+------+-----+------+----------+---------+----+----+

多表查詢:

1、創建部門表 scala> val deptRDD = sc.textFile("/usr/local/tmp_files/dept.csv").map(_.split(",")) deptRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[154] at map at <console>:24 scala> case class Dept(deptno:Int,dname:String,loc:String) defined class Dept scala> val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS deptDS: org.apache.spark.sql.Dataset[Dept] = [deptno: int, dname: string ... 1 more field] scala> deptDS.show +------+----------+--------+ |deptno| dname| loc| +------+----------+--------+ | 10|ACCOUNTING|NEW YORK| | 20| RESEARCH| DALLAS| | 30| SALES| CHICAGO| | 40|OPERATIONS| BOSTON| +------+----------+--------+ 2、員工表 同上 empDS empDS.join(deptDS,"deptno").where(xxxx) 連接兩個表,通過deptno字段 empDS.joinWith(deptDS,deptDS("deptno")===empDS("deptno")) 這個用於連接的字段名稱不一樣的情況

2.6 視圖view

​ 如果想使用標准的sql語句來操作df或者ds對象時,必須先給df或者ds對象創建視圖,然后通過SparkSession對象的sql函數來對相應的視圖進行操作才可以。那么視圖是什么?
​ 視圖是一個虛表,不存儲數據,可以當做是表的一個訪問鏈接。視圖有兩種類型:
普通視圖:也叫本地視圖,只在當前session會話中有效
全局視圖:在全部session中都有效,全局視圖創建在指定命名空間中:global_temp 類似於一個庫
操作說明:

val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate() val empDF = spark.read.json("/usr/local/tmp_files/emp.json") 創建本地視圖: empDF.createOrReplaceTempView(視圖名),視圖存在就會重新創建 empDF.createTempView(視圖名),如果視圖存在就不會創建 創建全局視圖: empDF.createGlobalTempView(視圖名) 對視圖執行sql操作,這里視圖名就類似於表名 spark.sql("xxxxx") 例子: empDF.createOrReplaceTempView("emp") spark.sql("select * from emp").show 注意,只要創建了視圖,那么就可以通過sparksession對象在任意一個類中操作視圖,也就是表。這個特性很好用,當我們要操作一些表時,可以一開始就讀取成df,然后創建成視圖,那么就可以在任意一個地方查詢表了。

2.7 數據源

通過SparkSession對象可以讀取不同格式的數據源:

val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()

下面都用上面的spark代稱SparkSession。

2.7.1 SparkSession讀取數據的方式

1、load spark.read.load(path):讀取指定路徑的文件,要求文件存儲格式為Parquet文件 2、format spark.read.format("格式").load(path) :指定讀取其他格式的文件,如json 例子: spark.read.format("json").load(path) 3、直接讀取其他格式文件 spark.read.格式名(路徑),這是上面2中的一個簡寫方式,例子: spark.read.json(路徑) json格式文件 spark.read.text(路徑) 讀取文本文件 注意:這些方式返回的都是 DataFrame 對象

2.7.2 SparkSession保存數據的方式

可以將DataFrame 對象寫入到指定格式的文件中,假設有個DataFrame 對象為df1.

1、save df1.write.save(路徑) 他會將文件保存到這個目錄下,文件名spark隨機生成的,所以使用上面的讀取方式的時候,直接指定讀取目錄即可,不用指定文件名。輸出的文件格式為 Parquet。可以直接指定hdfs的路徑,否則就存儲到本地 如: df1.write.save("/test") spark.read.load("/test") 2、直接指定格式存儲 df1.write.json(路徑) 這樣就會以json格式保存文件,生成的文件名的情況和上面類似 3、指定保存模式 如果沒有指定保存模式,輸出路徑存在的情況下,就會報錯 df1.write.mode("append").json(路徑) mode("append") 就表示文件存在時就追加 mode("overwrite") 表示覆蓋舊數據 4、保存為表 df1.write.saveAsTable(表名) 會保存在當前目錄的spark-warehouse 目錄下 5、format df1.write.format(格式).save() 使用指定特定格式的方式來輸出保存數據,比如保存到MongoDB數據庫中

2.7.3 Parquet格式

​ 這種一種列式存儲格式,具體原理可以看看之前hive的文章。這種格式是默認的存儲格式,使用load和save時默認的格式,操作方式很像前面說的,這里不重復。這里要講的是Parquet的一個特殊的功能,支持schema(表結構)的合並。例子:

scala> val df1 = sc.makeRDD(1 to 5).map(i=>(i,i*2)).toDF("single","double") df1: org.apache.spark.sql.DataFrame = [single: int, double: int] scala> df1.show +------+------+ |single|double| +------+------+ | 1| 2| | 2| 4| | 3| 6| | 4| 8| | 5| 10| +------+------+ scala> sc.makeRDD(1 to 5) res8: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at makeRDD at <console>:25 scala> sc.makeRDD(1 to 5).collect res9: Array[Int] = Array(1, 2, 3, 4, 5) //導出表1 scala> df1.write.parquet("/usr/local/tmp_files/test_table/key=1") scala> val df2 = sc.makeRDD(6 to 10).map(i=>(i,i*3)).toDF("single","triple") df2: org.apache.spark.sql.DataFrame = [single: int, triple: int] scala> df2.show +------+------+ |single|triple| +------+------+ | 6| 18| | 7| 21| | 8| 24| | 9| 27| | 10| 30| +------+------+ //導出表2 scala> df2.write.parquet("/usr/local/tmp_files/test_table/key=2") scala> val df3 = spark.read.parquet("/usr/local/tmp_files/test_table") df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 1 more field] //直接讀取會丟失字段 scala> df3.show +------+------+---+ |single|double|key| +------+------+---+ | 8| null| 2| | 9| null| 2| | 10| null| 2| | 3| 6| 1| | 4| 8| 1| | 5| 10| 1| | 6| null| 2| | 7| null| 2| | 1| 2| 1| | 2| 4| 1| +------+------+---+ //加上option,指定"mergeSchema"為true,就可以合並 scala> val df3 = spark.read.option("mergeSchema",true).parquet("/usr/local/tmp_files/test_table") df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 2 more fields] scala> df3.show +------+------+------+---+ |single|double|triple|key| +------+------+------+---+ | 8| null| 24| 2| | 9| null| 27| 2| | 10| null| 30| 2| | 3| 6| null| 1| | 4| 8| null| 1| | 5| 10| null| 1| | 6| null| 18| 2| | 7| null| 21| 2| | 1| 2| null| 1| | 2| 4| null| 1| +------+------+------+---+ 補充問題:key 是什么?必須用key嘛? key是不同表的一個區分字段,在合並的時候,會作為合並之后的表的一個字段,並且值等於key=xx 中設置的值 如果目錄下,兩個表的目錄名不一樣,是無法合並的,合並字段名可以任意, 如:一個是key ,一個是 test 這兩個無法合並,必須統一key或者test

2.7.4 json文件

這種一種帶表格式字段的文件,例子:

scala> val peopleDF = spark.read.json("/usr/local/tmp_files/people.json") peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> peopleDF.printSchema() root |-- age: long (nullable = true) |-- name: string (nullable = true) scala> peopleDF.createOrReplaceTempView("people") scala> spark.sql("select * from people where age=19") res25: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> spark.sql("select * from people where age=19").show +---+------+ |age| name| +---+------+ | 19|Justin| +---+------+ scala> spark.sql("select age,count(1) from people group by age").show +----+--------+ | age|count(1)| +----+--------+ | 19| 1| |null| 1| | 30| 1| +----+--------+

2.7.5 JDBC 連接

df對象支持通過jdbc連接數據庫,寫入數據到數據庫,或者從數據庫讀取數據。
例子:
1、通過jdbc 從mysql讀取數據:

使用 format(xx).option()的方式指定連接數據庫的一些參數,比如用戶名密碼,使用的連接驅動等

import java.util.Properties

import org.apache.spark.sql.SparkSession

object ConnMysql {
  def main(args: Array[String]): Unit = {
    val sparkS = SparkSession.builder().appName("spark sql conn mysql").master("local").getOrCreate()
    //連接mysql方式1:
    //創建properties配置對象,用於存放連接mysql的參數
    val mysqlConn = new Properties()
    mysqlConn.setProperty("user","root")
    mysqlConn.setProperty("password","wjt86912572")
    //使用jdbc連接,指定連接字符串,表名,以及其他連接參數,並返回對應的dataframe
    val mysqlDF1 = sparkS.read.jdbc("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8","customer",mysqlConn)
    mysqlDF1.printSchema()
    mysqlDF1.show()
    mysqlDF1.createTempView("customer")
    sparkS.sql("select * from customer limit 2").show()

    //連接mysql方式2,這種方式比較常用:
    val mysqlConn2 = sparkS.read.format("jdbc")
      .option("url","jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8")              
      .option("user","root")
      .option("password","wjt86912572")
      .option("driver","com.mysql.jdbc.Driver")
      .option("dbtable","customer").load()

    mysqlConn2.printSchema()
  }
}

這是兩種連接讀取數據的方式。

2、jdbc寫入數據到mysql

和讀取類似,只不過換成了write操作

import java.util.Properties

import org.apache.spark.sql.SparkSession

object WriteToMysql {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("write to mysql").master("local").getOrCreate()

    val df1 = spark.read.text("G:\\test\\t_stu.json")

    //方式1:
    df1.write.format("jdbc")
      .option("url","jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8")
      .option("user","root")
      .option("password","wjt86912572")
      .option("driver","com.mysql.jdbc.Driver")
      .option("dbtable","customer").save()

    //方式2:
    val mysqlConn = new Properties()
    mysqlConn.setProperty("user","root")
    mysqlConn.setProperty("password","wjt86912572")
    df1.write.jdbc("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8","customer",mysqlConn)

  }

}

必須要保證df的表格式和寫入的mysql的表格式一樣,字段名也要一樣

2.7.6 hive

1、通過jdbc連接hive
方式和普通jdbc類似,例如:

import java.util.Properties

import org.apache.spark.sql.SparkSession

/**
  * 連接hive的情況有兩種:
  * 1、如果是直接在ideal中運行spark程序的話,則必須在程序中指定jdbc連接的hiveserver的地址
  * 且hiveserver必須以后台服務的形式暴露10000端口出來.這種方式是直接通過jdbc連接hive
  *
  * 2、如果程序是打包到spark集群中運行的話,一般spark集群的conf目錄下,已經有hive client
  * 的配置文件,就會直接啟動hive client來連接hive。這時不需要啟動hiveserver服務。
  * 這種方式是通過hive client連接hive
  */
object ConnHive {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("spark sql conn mysql").master("local").getOrCreate()
    val properties = new Properties()
    properties.setProperty("user","")
    properties.setProperty("password","")

    val hiveDF = spark.read.jdbc("jdbc:hive2://bigdata121:10000/default","customer",properties)
    hiveDF.printSchema()
    spark.stop()
  }
}

這種方式要注意一點:
hiveserver必須以后台服務的形式暴露10000端口出來.這種方式是直接通過jdbc連接hive。

2、通過hive client連接hive
​ 這種方式一般用在生產中,因為任務一般都是通過spark-submit提交到集群中運行,這時候就會直接通過hive client來連接hive,不會通過jdbc來連接了。
​ 要注意:需要在spark的節點上都配置上hive client,然后將hive-site.xml配置文件拷貝到 spark的conf目錄下。同時需要將hadoop的core-site.xml hdfs-site.xml也拷貝過去。另外一方面,因為要使用hive client,所以hive server那邊,一般都要配置metastore server,具體配置看hive的文章。
​ 這樣在spark集群中的程序就可以直接使用

spark.sql("xxxx").show 這樣的操作,默認就會從hive中讀取對應的表進行操作。不用另外做任何連接hive 的操作

或者直接到 spark-shell中,也是可以直接使用 上面的方式操作hive的表
例如:

import org.apache.spark.sql.SparkSession object ConnHive02 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("spark sql conn hive").getOrCreate() spark.sql("select * from customer").show() } } 這樣直接操作的就是 hive 的表了

2.8 小案例--讀取hive數據分析結果寫入mysql

import java.util.Properties

import org.apache.spark.sql.SparkSession

object HiveToMysql {
  def main(args: Array[String]): Unit = {
    //直接通過spark集群中的hive client連接hive,不需要jdbc以及hive server
    val spark = SparkSession.builder().appName("hive to mysql").enableHiveSupport().getOrCreate()
    val resultDF = spark.sql("select * from default.customer")

    //一般中間寫的處理邏輯都是處理從hive讀取的數據,處理完成后寫入到mysql

    val mysqlConn = new Properties()
    mysqlConn.setProperty("user","root")
    mysqlConn.setProperty("password","wjt86912572")
    //通過jdbc寫入mysql
  resultDF.write.mode("append").jdbc("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8", "customer", mysqlConn)

    spark.stop()
  }

}

三、性能優化

3.1 內存中緩存數據

先啟動個spark-shell

spark-shell --master spark://bigdata121:7077 要在spark-shell中操作mysql,所以記得自己找個 mysql-connector的jar放到spark的jars目錄下

例子:

創建df,從mysql讀取表
scala> val mysqDF = spark.read.format("jdbc").option("url","jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8").option("user","root").option("password","wjt86912572").option("driver","com.mysql.jdbc.Driver").option("dbtable","customer").load() mysqDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field] scala> mysqDF.show +---+------+--------------------+ | id| name| last_mod| +---+------+--------------------+ | 1| neil|2019-07-20 17:09:...| | 2| jack|2019-07-20 17:09:...| | 3|martin|2019-07-20 17:09:...| | 4| tony|2019-07-20 17:09:...| | 5| eric|2019-07-20 17:09:...| | 6| king|2019-07-20 17:42:...| | 7| tao|2019-07-20 17:45:...| +---+------+--------------------+ 必須注冊成一張表,才可以緩存。 scala> mysqDF.registerTempTable("customer") warning: there was one deprecation warning; re-run with -deprecation for details 標識這張表可以被緩存,但是現在數據並沒有直接緩存 scala> spark.sqlContext.cacheTable("customer") 第一次查詢表,從mysql讀取數據,並緩存到內存中 scala> spark.sql("select * from customer").show +---+------+--------------------+ | id| name| last_mod| +---+------+--------------------+ | 1| neil|2019-07-20 17:09:...| | 2| jack|2019-07-20 17:09:...| | 3|martin|2019-07-20 17:09:...| | 4| tony|2019-07-20 17:09:...| | 5| eric|2019-07-20 17:09:...| | 6| king|2019-07-20 17:42:...| | 7| tao|2019-07-20 17:45:...| +---+------+--------------------+ 這一次查詢從內存中返回 scala> spark.sql("select * from customer").show +---+------+--------------------+ | id| name| last_mod| +---+------+--------------------+ | 1| neil|2019-07-20 17:09:...| | 2| jack|2019-07-20 17:09:...| | 3|martin|2019-07-20 17:09:...| | 4| tony|2019-07-20 17:09:...| | 5| eric|2019-07-20 17:09:...| | 6| king|2019-07-20 17:42:...| | 7| tao|2019-07-20 17:45:...| +---+------+--------------------+ 清空緩存 scala> spark.sqlContext.clearCache

3.2 調優相關參數

將數據緩存到內存中的相關優化參數
   spark.sql.inMemoryColumnarStorage.compressed  默認為 true  Spark SQL 將會基於統計信息自動地為每一列選擇一種壓縮編碼方式。  spark.sql.inMemoryColumnarStorage.batchSize  默認值:10000  緩存批處理大小。緩存數據時, 較大的批處理大小可以提高內存利用率和壓縮率,但同時也會帶來 OOM(Out Of Memory)的風險。 其他性能相關的配置選項(不過不推薦手動修改,可能在后續版本自動的自適應修改)  spark.sql.files.maxPartitionBytes  默認值:128 MB  讀取文件時單個分區可容納的最大字節數  spark.sql.files.openCostInBytes  默認值:4M  打開文件的估算成本, 按照同一時間能夠掃描的字節數來測量。當往一個分區寫入多個文件的時候會使用。高估更好, 這樣的話小文件分區將比大文件分區更快 (先被調度)。 spark.sql.autoBroadcastJoinThreshold  默認值:10M  用於配置一個表在執行 join 操作時能夠廣播給所有 worker 節點的最大字節大小。通過將這個值設置為 -1 可以禁用廣播。注意,當前數據統計僅支持已經運行了 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。 spark.sql.shuffle.partitions  默認值:200  用於配置 join 或聚合操作混洗(shuffle)數據時使用的分區數。