Spark 學習(八) SparkSQL簡介


一,Spark SQL概述

  1.1 什么是Spark SQL

  1.2 為什么學Spark SQL

二,DataFrames

  2.1 什么是DataFrames

  2.2 創建DataFrames

三,DataFrame常用操作

  3.1 DSL風格語法

  3.2 SQL風格語法

四,SparkSQL編程實例

  4.1 前期准備

  4.2 通過反射推斷Schema

  4.3 通過StructType直接指定Schema

  4.4 操作DataFrameAPI的形式進行數據操作

    4.5 Spark2.X的sql實現方式

  4.6 SparkDataSet

 

 

 

 

 

正文

一,Spark SQL概述

  1.1 什么是Spark SQL

  

  

  Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫做DataFrame並且作為分布式SQL查詢引擎的作用。

  1.2 為什么學Spark SQL

  我們已經學習了Hive,它是將Hive SQL轉換成MapReduce然后提交到集群上執行,大大簡化了編寫MapReduce的程序的復雜性,由於MapReduce這種計算模型執行效率比較慢。所有Spark SQL的應運而生,它是將Spark SQL轉換成RDD,然后提交到集群執行,執行效率非常快!

  1.易整合

  

  2.統一的數據訪問方式

  

  3.兼容Hive

  

  4.標准的數據連接

  

二,DataFrames

  2.1 什么是DataFrames

  與RDD類似,DataFrame也是一個分布式數據容器。然而DataFrame更像傳統數據庫的二維表格,除了數據以外,還記錄數據的結構信息,即schema。同時,與Hive類似,DataFrame也支持嵌套數據類型(struct、array和map)。從API易用性的角度上 看,DataFrame API提供的是一套高層的關系操作,比函數式的RDD API要更加友好,門檻更低。由於與R和Pandas的DataFrame類似,Spark DataFrame很好地繼承了傳統單機數據分析的開發體驗。

·  

  2.2 創建DataFrames

//1.在本地創建一個文件,有三列,分別是id、name、age,用空格分隔,然后上傳到hdfs上
hdfs dfs -put person.txt /

//2.在spark shell執行下面命令,讀取數據,將每一行的數據使用列分隔符分割
val lineRDD = sc.textFile("hdfs://node1.xiaoniu.com:9000/person.txt").map(_.split(" "))

//3.定義case class(相當於表的schema)
case class Person(id:Int, name:String, age:Int)

//4.將RDD和case class關聯
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))

//5.將RDD轉換成DataFrame
val personDF = personRDD.toDF

//6.對DataFrame進行處理
personDF.show

三,DataFrame常用操作

  3.1 DSL風格語法

//查看DataFrame中的內容
personDF.show

//查看DataFrame部分列中的內容
personDF.select(personDF.col("name")).show
personDF.select(col("name"), col("age")).show
personDF.select("name").show

//打印DataFrame的Schema信息
personDF.printSchema

//查詢所有的name和age,並將age+1
personDF.select(col("id"), col("name"), col("age") + 1).show
personDF.select(personDF("id"), personDF("name"), personDF("age") + 1).show


//過濾age大於等於18的
personDF.filter(col("age") >= 18).show


//按年齡進行分組並統計相同年齡的人數
personDF.groupBy("age").count().show()

  3.2 SQL風格語法

//如果想使用SQL風格的語法,需要將DataFrame注冊成表
personDF.registerTempTable("t_person")

//查詢年齡最大的前兩名
sqlContext.sql("select * from t_person order by age desc limit 2").show


//顯示表的Schema信息
sqlContext.sql("desc t_person").show

四,SparkSQL編程實例

  4.1 前期准備

  前面我們學習了如何在Spark Shell中使用SQL完成查詢,現在我們來實現在自定義的程序中編寫Spark SQL查詢程序。首先在maven項目的pom.xml中添加Spark SQL的依賴

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
</dependency>

  4.2 通過反射推斷Schema

  創建一個object如下:

package cn.edu360.spark06

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object spark1XDemo1 {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("spark1XDemo1").setMaster("local[2]")
        val sc = new SparkContext(conf)
        //將SparkContext包裝進而增強
        val sQLContext = new SQLContext(sc)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkSQL")
        val boyRDD: RDD[Boy] = lines.map(line => {
            val fields: Array[String] = line.split(",")
            val id: Long = fields(0).toLong
            val name: String = fields(1)
            val age: Int = fields(2).toInt
            val fv: Double = fields(3).toDouble
            Boy(id, name, age, fv)
        }
        )
        //該RDD裝的是Boy類型的數據,有了shcma信息,但是還是一個RDD
        //將RDD轉換成DataFrame
        //導入隱式轉換
        import sQLContext.implicits._
        val bdf: DataFrame = boyRDD.toDF

        //變成DF后就可以使用兩種API進行編程了
        //把DataFrame先注冊臨時表
        bdf.registerTempTable("t_boy")
        //書寫SQL(SQL方法應其實是Transformation)
        val result: DataFrame = sQLContext.sql("select * from t_boy order by fv desc, age asc")

        //查看結果(觸發Action)
        result.show()
        sc.stop()
    }
}
// 若要將這些字段解析后進行結構化的描述信息
// 這里進行表的描述
case class Boy(id: Long, name: String, age: Int, fv: Double)

  4.3 通過StructType直接指定Schema

  創建一個object為:

package cn.edu360.spark06

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext}

object spark1XDemo2 {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("spark1XDemo2").setMaster("local[2]")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkSQL/")

        //將數據進行整理
        val rowRDD: RDD[Row] = lines.map(line => {
            val fields: Array[String] = line.split(",")
            val id: Long = fields(0).toLong
            val name: String = fields(1)
            val age: Int = fields(2).toInt
            val fv: Double = fields(3).toDouble
            // 生成ROWRDD
            Row(id, name, age, fv)
        })
        //結果類型,其實就是表頭,用於描述DataFrame
        val scme = StructType(List(
            StructField("id", LongType, true),
            StructField("name", StringType, true),
            StructField("age", IntegerType, true),
            StructField("fv", DoubleType, true)
        ))

        //將RowRDD關聯schema
        val bdf: DataFrame = sqlContext.createDataFrame(rowRDD, scme)
        bdf.registerTempTable("t_boy")
        val result: DataFrame = sqlContext.sql("select * from t_boy order by fv desc, age asc")
        result.show()
        sc.stop()
    }
}

  4.4 操作DataFrameAPI的形式進行數據操作

package cn.edu360.spark06

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object spark1DataFrameOperate {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("spark1DataFrameOperate").setMaster("local[2]")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkSQL/")
        val rowRDD: RDD[Row] = lines.map(line => {
            val fields: Array[String] = line.split(",")
            val id: Long = fields(0).toLong
            val name: String = fields(1)
            val age: Int = fields(2).toInt
            val fv: Double = fields(3).toDouble
            // 生成ROWRDD
            Row(id, name, age, fv)
        })
        //結果類型,其實就是表頭,用於描述DataFrame
        val scme = StructType(List(
            StructField("id", LongType, true),
            StructField("name", StringType, true),
            StructField("age", IntegerType, true),
            StructField("fv", DoubleType, true)
        ))

        val bdf: DataFrame = sqlContext.createDataFrame(rowRDD, scme)

        //不使用SQL的方式,就不用注冊臨時表了
        val df1: DataFrame = bdf.select("name", "age", "fv")
        df1.show()
        
        // 操作數據的函數需要導入
        import sqlContext.implicits._
        val df2: Any = bdf.orderBy($"fv" desc, $"age" asc)
    }
}

  4.5 Spark2.X的sql實現方式

package cn.edu360.spark06

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}


object spark2XDemo1 {
    def main(args: Array[String]): Unit = {
        //spark2.x SQL的編程API(SparkSession)
        //是spark2.x SQL執行的入口
        val sparkSession: SparkSession = SparkSession.builder()
            .appName("SQLTest1")
            .master("local[2]")
            .getOrCreate()
        //創建RDD
        val lines: RDD[String] = sparkSession.sparkContext.textFile("hdfs://hd1:9000/sparkSQL")
        val rowRDD: RDD[Row] = lines.map(line => {
            val fields = line.split(",")
            val id = fields(0).toLong
            val name = fields(1)
            val age = fields(2).toInt
            val fv = fields(3).toDouble
            Row(id, name, age, fv)
        })
        val scm = StructType(List(
            StructField("id", LongType, true),
            StructField("name", StringType, true),
            StructField("age", IntegerType, true),
            StructField("fv", DoubleType, true)
        ))
        val df: DataFrame = sparkSession.createDataFrame(rowRDD, scm)

        df.registerTempTable("t_boy")
        val result: DataFrame = sparkSession.sql("select * from t_boy")

        val result2: DataFrame = df.select("name", "age", "fv")

        import sparkSession.implicits._
        val result3: Dataset[Row] = df.orderBy($"fv" desc, $"age" asc)
        result.show()
        result2.show()
        result3.show()
        sparkSession.stop()
    }

}

  4.6 SparkDataSet

  Dataset分布式數據集,是對RDD的進一步封裝,是更加智能的RDD, 在這進行wordcount實例:
package cn.edu360.spark06

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object sparkDateSet1 {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession.builder()
            .appName("sparkDateSet1")
            .master("local[2]")
            .getOrCreate()
        val lines: Dataset[String] = spark.read.textFile("hdfs://hd1:9000/wordcount/input/")

        import spark.implicits._
        val words: Dataset[String] = lines.flatMap(_.split(" "))

        // 通過操作Dataset進行數據操作
        val result: DataFrame = words.select("value")
        result.show()
        
        // 注冊視圖操作SQL形式
        words.createTempView("v_wc")
        val result2: DataFrame = spark.sql("select value, count(*) from v_wc group by value")
        result2.show()
        spark.stop()
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM