SparkSQL常用API總結


SparkSQL常用API總結

讀取數據

  • 文件讀取

    #本地文件讀取
    	#創建SparkSession
        val spark=SparkSession.builder()
        .appName("RW")
        .master("local[6]")	#本地運行
        .getOrCreate()
    	
    	#隱式轉換導入,DataFrame、Dataset與序列集合之間的轉換
    	import spark.implicits._
    	
    	#Schema定義
    	val schema=types.StructType{
          List(
            StructField("id",IntegerType),
            StructField("year",IntegerType),
            StructField("month",IntegerType),
            StructField("day",IntegerType),
            StructField("hour",IntegerType),
            StructField("season",IntegerType),
            StructField("pm",DoubleType)
          )
        }
    
    	#文件讀取
        val df=spark.read
    	//.schema(schema)
    	.option("delimiter","\t")	#指定數據分隔符
        .option("header",value = true)	#將文件第一行作為header
        .option("inferSchema",value = true)	#自動推斷類型,也可以自己定義Schema
        .csv("dataset/BeijingPM.csv")
    
    
    
    #HDFS文件讀取
    	#文件讀取
    	val df=spark.read
    	//.schema(schema)
    	.option("delimiter","\t")	#指定數據分隔符
        .option("header",value = true)	#將文件第一行作為header
        .option("inferSchema",value = true)	#自動推斷類型,也可以自己定義Schema
        .csv("hdfs://host:port/dataset/BeijingPM.csv")
    
  • Mysql數據源

    val df=spark.read
    	.format("jdbc")
        .option("url","jdbc:mysql://192.168.2.136:3306/spark")
    	.option("user","root")
    	.option("password","root")
    	.option("dbtable","student")
    	.option("driver","com.mysql.jdbc.Driver")
    	.load()
    
  • Hive數據源

    #訪問Hive,必須在集群中運行
    val spark=SparkSession.builder()
          .appName("Hive")
          .enableHiveSupport()	#啟用Hive支持
          .config("hive.metastore.uris","thrift://node03:9083")		#通過thirft服務訪問hive
          .config("spark.sql.warehouse.dir","/dataset/hive")
          .getOrCreate()
    val schema=StructType{
          List(
            StructField("name",StringType),
            StructField("age",IntegerType),
            StructField("gpa",FloatType)
          )
        }
    val df=spark.read
        .option("delimiter","\t")
        .schema(schema)
        .csv("hdfs://node01:8020/dataset/studenttab10k")
    

數據寫入

  • 文件寫入

    #本地文件寫入
    df.write
          .partitionBy("year","month")	#按列名指定分區
          .mode(SaveMode.Overwrite)	#寫入模式,error、append、overwrite、ignore
          .csv("dataset/pm_partitions")
    #HDFS寫入
    df.write
          .mode(SaveMode.Append)
          .save("hdfs://192.168.2.135:8020/mr")
    
  • Mysql寫入

    df.write
    	.format("jdbc")
        .option("url","jdbc:mysql://192.168.2.136:3306/spark")
    	.option("user","root")
    	.option("password","root")
    	.option("dbtable","student")
    	.option("driver","com.mysql.jdbc.Driver")
    	.save()
    
  • Hive寫入

    #集群運行,同“數據寫入”代碼
    df.write
          .mode(SaveMode.Overwrite)
          .saveAsTable("spark1.student")
    

API操作

有類型轉換
  • 轉換操作

    #通過 flatMap 可以將一條數據轉為一個數組, 后再展開這個數組放入 Dataset
    import spark.implicits._
    val ds = Seq("hello world", "hello pc").toDS()
    ds.flatMap( _.split(" ") ).show()
    
    #map 可以將數據集中每條數據轉為另一種形式
    import spark.implicits._
    val ds = Seq(Person("zhangsan", 15), Person("lisi", 15)).toDS()
    ds.map( person => Person(person.name, person.age * 2) ).show()
    
    #mapPartitions 和 map 一樣, 但是 map 的處理單位是每條數據, mapPartitions 的處理單位是每個分區
    import spark.implicits._
    val ds = Seq(Person("zhangsan", 15), Person("lisi", 15)).toDS()
    ds.mapPartitions( iter => {
        val returnValue = iter.map(
          item => Person(item.name, item.age * 2)
        )
        returnValue
      } )
      .show()
    
    #map 和 mapPartitions 以及 transform 都是轉換, map 和 mapPartitions 是針對數據, 而 transform 是針對整個數據集, 這種方式最大的區別就是 transform 可以直接拿到 Dataset 進行操作
    import spark.implicits._
    val ds = spark.range(5)
    ds.transform( dataset => dataset.withColumn("doubled", 'id * 2) )
    
    #as[Type] 算子的主要作用是將弱類型的 Dataset 轉為強類型的 Dataset, 它有很多適用場景, 但是最常見的還是在讀取數據的時候, 因為 DataFrameReader 體系大部分情況下是將讀出來的數據轉換為 DataFrame 的形式, 如果后續需要使用 Dataset 的強類型 API, 則需要將 DataFrame 轉為 Dataset. 可以使用 as[Type] 算子完成這種操作
    import spark.implicits._
    
    val structType = StructType(
      Seq(
        StructField("name", StringType),
        StructField("age", IntegerType),
        StructField("gpa", FloatType)
      )
    )
    
    val sourceDF = spark.read
      .schema(structType)
      .option("delimiter", "\t")
      .csv("dataset/studenttab10k")
    
    val dataset = sourceDF.as[Student]
    dataset.show()
    
  • filter

    #filter 用來按照條件過濾數據集
    import spark.implicits._
    val ds = Seq(Person("zhangsan", 15), Person("lisi", 15)).toDS()
    ds.filter( person => person.name == "lisi" ).show()
    
  • groupByKey

    #grouByKey 算子的返回結果是 KeyValueGroupedDataset, 而不是一個 Dataset, 所以必須要先經過 KeyValueGroupedDataset 中的方法進行聚合, 再轉回 Dataset, 才能使用 Action 得出結果
    
    其實這也印證了分組后必須聚合的道理
    import spark.implicits._
    val ds = Seq(Person("zhangsan", 15), Person("zhangsan", 15), Person("lisi", 15)).toDS()
    ds.groupByKey( person => person.name ).count().show()
    
  • sample

    #sample 會隨機在 Dataset 中抽樣
    val ds = spark.range(15)
    ds.sample(withReplacement = false, fraction = 0.4).show()
    
  • 排序

    #orderBy 配合 Column 的 API, 可以實現正反序排列
    import spark.implicits._
    val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
    ds.orderBy("age").show()
    ds.orderBy('age.desc).show()
    
    #其實 orderBy 是 sort 的別名, 所以它們所實現的功能是一樣的
    import spark.implicits._
    val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
    ds.sort('age.desc).show()
    
  • 分區

    #減少分區, 此算子和 RDD 中的 coalesce 不同, Dataset 中的 coalesce 只能減少分區數, coalesce 會直接創建一個邏輯操作, 並且設置 Shuffle 為 false
    val ds = spark.range(15)
    ds.coalesce(1).explain(true)
    
    #repartitions 有兩個作用, 一個是重分區到特定的分區數, 另一個是按照某一列來分區, 類似於 SQL 中的 DISTRIBUTE BY
    val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
    ds.repartition(4)
    ds.repartition('name)
    
  • 去重

    #使用 dropDuplicates 可以去掉某一些列中重復的行
    import spark.implicits._
    val ds = spark.createDataset(Seq(Person("zhangsan", 15), Person("zhangsan", 15), Person("lisi", 15)))
    ds.dropDuplicates("age").show()
    
    #當 dropDuplicates 中沒有傳入列名的時候, 其含義是根據所有列去重, dropDuplicates() 方法還有一個別名, 叫做 distinct
    import spark.implicits._
    val ds = spark.createDataset(Seq(Person("zhangsan", 15), Person("zhangsan", 15), Person("lisi", 15)))
    ds.distinct().show()
    
無類型轉換
  • 選擇

    #select 用來選擇某些列出現在結果集中
    import spark.implicits._
    val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
    ds.select($"name").show()
    
    #在 SQL 語句中, 經常可以在 select 子句中使用 count(age), rand() 等函數, 在 selectExpr 中就可以使用這樣的 SQL 表達式, 同時使用 select 配合 expr 函數也可以做到類似的效果
    import spark.implicits._
    import org.apache.spark.sql.functions._
    val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
    ds.selectExpr("count(age) as count").show()
    ds.selectExpr("rand() as random").show()
    ds.select(expr("count(age) as count")).show()
    
    #通過 Column 對象在 Dataset 中創建一個新的列或者修改原來的列
    import spark.implicits._
    import org.apache.spark.sql.functions._
    val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
    ds.withColumn("random", expr("rand()")).show()
    
    #修改列名
    import spark.implicits._
    val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
    ds.withColumnRenamed("name", "new_name").show()
    
  • drop

    #剪掉某個列
    import spark.implicits._
    val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
    ds.drop('age).show()
    
  • groupBy

    #按照給定的行進行分組
    import spark.implicits._
    val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
    ds.groupBy('name).count().show()
    
聚合
  • 一維聚合(groupBy)

    groupBy 算子會按照列將 Dataset 分組, 並返回一個 RelationalGroupedDataset 對象, 通過 RelationalGroupedDataset 可以對分組進行聚合
    
    Step 1: 加載實驗數據
    private val spark = SparkSession.builder()
        .master("local[6]")
        .appName("aggregation")
        .getOrCreate()
    
      import spark.implicits._
    
      private val schema = StructType(
        List(
          StructField("id", IntegerType),
          StructField("year", IntegerType),
          StructField("month", IntegerType),
          StructField("day", IntegerType),
          StructField("hour", IntegerType),
          StructField("season", IntegerType),
          StructField("pm", DoubleType)
        )
      )
    
      private val pmDF = spark.read
        .schema(schema)
        .option("header", value = true)
        .csv("dataset/pm_without_null.csv")
    Step 2: 使用 functions 函數進行聚合
    import org.apache.spark.sql.functions._
    
    val groupedDF: RelationalGroupedDataset = pmDF.groupBy('year)
    
    groupedDF.agg(avg('pm) as "pm_avg")
      .orderBy('pm_avg)
      .show()
    Step 3: 除了使用 functions 進行聚合, 還可以直接使用 RelationalGroupedDataset 的 API 進行聚合
    groupedDF.avg("pm")
      .orderBy('pm_avg)
      .show()
    
    groupedDF.max("pm")
      .orderBy('pm_avg)
      .show()
    
  • 多維聚合(rollup)

    rollup 操作符其實就是 groupBy 的一個擴展, rollup 會對傳入的列進行滾動 groupBy, groupBy 的次數為列數量 + 1, 最后一次是對整個數據集進行聚合
    例如:rollup("year","month"),他會進行groupBy("year","month")、groupBy("year")、groupBy(null)三次聚合,然后進行數據合並
    
    Step 1: 創建數據集
    import org.apache.spark.sql.functions._
    
    val sales = Seq(
      ("Beijing", 2016, 100),
      ("Beijing", 2017, 200),
      ("Shanghai", 2015, 50),
      ("Shanghai", 2016, 150),
      ("Guangzhou", 2017, 50)
    ).toDF("city", "year", "amount")
    Step 1: rollup 的操作
    sales.rollup("city", "year")
      .agg(sum("amount") as "amount")
      .sort($"city".desc_nulls_last, $"year".asc_nulls_last)
      .show()
    
    /**
      * 結果集:
      * +---------+----+------+
      * |     city|year|amount|
      * +---------+----+------+
      * | Shanghai|2015|    50| <-- 上海 2015 的小計
      * | Shanghai|2016|   150|
      * | Shanghai|null|   200| <-- 上海的總計
      * |Guangzhou|2017|    50|
      * |Guangzhou|null|    50|
      * |  Beijing|2016|   100|
      * |  Beijing|2017|   200|
      * |  Beijing|null|   300|
      * |     null|null|   550| <-- 整個數據集的總計
      * +---------+----+------+
      */
    Step 2: 如果使用基礎的 groupBy 如何實現效果?
    val cityAndYear = sales
      .groupBy("city", "year") // 按照 city 和 year 聚合
      .agg(sum("amount") as "amount")
    
    val city = sales
      .groupBy("city") // 按照 city 進行聚合
      .agg(sum("amount") as "amount")
      .select($"city", lit(null) as "year", $"amount")
    
    val all = sales
      .groupBy() // 全局聚合
      .agg(sum("amount") as "amount")
      .select(lit(null) as "city", lit(null) as "year", $"amount")
    
    cityAndYear
      .union(city)
      .union(all)
      .sort($"city".desc_nulls_last, $"year".asc_nulls_last)
      .show()
    
    /**
      * 統計結果:
      * +---------+----+------+
      * |     city|year|amount|
      * +---------+----+------+
      * | Shanghai|2015|    50|
      * | Shanghai|2016|   150|
      * | Shanghai|null|   200|
      * |Guangzhou|2017|    50|
      * |Guangzhou|null|    50|
      * |  Beijing|2016|   100|
      * |  Beijing|2017|   200|
      * |  Beijing|null|   300|
      * |     null|null|   550|
      * +---------+----+------+
      */
    很明顯可以看到, 在上述案例中, rollup 就相當於先按照 city, year 進行聚合, 后按照 city 進行聚合, 最后對整個數據集進行聚合, 在按照 city 聚合時, year 列值為 null, 聚合整個數據集的時候, 除了聚合列, 其它列值都為 null
    
  • 多維聚合(cube)

    #cube 的功能和 rollup 是一樣的, 但也有區別, 區別如下
    
    rollup(A, B).sum
    
    其結果集中會有三種數據形式: A B , A null, null null
    
    不知道大家發現沒, 結果集中沒有對 B 列的聚合結果
    
    cube(A, B).sum
    
    其結果集中會有四種數據形式: A B , A null , null null , null B 
    
    不知道大家發現沒, 比 rollup 的結果集中多了一個 null B , 也就是說, rollup 只會按照第一個列來進行組合聚合, 但是 cube 會將全部列組合聚合
    
連接
 /**
   * cross鏈接,笛卡爾積
   */
  @Test
  def crossJoin(): Unit ={
    person.crossJoin(cities)
      .where(person.col("cityId")===cities.col("id"))
      .show()
  }

  /**
   * 內連接
   */
  @Test
  def innerJoin(): Unit ={
    person.join(cities,person.col("cityId")===cities.col("id"),"inner")
      .show()
  }

  /**
   * 全外連接
   */
  @Test
  def fullouterJoin(): Unit ={
    person.join(cities,
      person.col("cityId")===cities.col("id")
    ,"full").show()
  }

  /**
   * 左外鏈接
   */
  @Test
  def leftJoin(): Unit ={
    person.join(cities,
      person.col("cityId")===cities.col("id"),
    "left").show()
    person.join(cities,
      person.col("cityId")===cities.col("id"),
    "right").show()
  }

  /**
   * semi     anti
   */
  @Test
  def semiAndAnti(): Unit ={
    person.join(cities,person.col("cityId")===cities.col("id"),
    "leftanti").show()

    person.join(cities,person.col("cityId")===cities.col("id"),
    "leftsemi").show()
  }

函數

UDF
#用戶可以將自定義函數,並將其注冊為SparkSql函數來使用
 /**
   * UDF
   */
  @Test
  def myudf(): Unit ={
    import org.apache.spark.sql.functions._
    val df=Seq("1","2","3").toDF("value")
    val toAppendStr1=udf(toAppendStr _)		#注冊語法,返回函數
    df.select(toAppendStr1('value)).show()
  }

  def toAppendStr(str:String): String ={
    "LIHAO_"+str
  }
窗口函數
#數據進行每組內操作,即分組后對組內數據進行操作
#語法,(作用函數) over (window) as "別名"
  @Test
  def firstSecond(): Unit = {
    val spark = SparkSession.builder()
      .appName("window")
      .master("local[6]")
      .getOrCreate()

    import spark.implicits._

    import org.apache.spark.sql.functions._

    val data = Seq(
      ("Thin", "Cell phone", 6000),
      ("Normal", "Tablet", 1500),
      ("Mini", "Tablet", 5500),
      ("Ultra thin", "Cell phone", 5000),
      ("Very thin", "Cell phone", 6000),
      ("Big", "Tablet", 2500),
      ("Bendable", "Cell phone", 3000),
      ("Foldable", "Cell phone", 3000),
      ("Pro", "Tablet", 4500),
      ("Pro2", "Tablet", 6500)
    )

    val source = data.toDF("product", "category", "revenue")

    val window=Window.partitionBy("category")
      .orderBy('revenue.desc)

    source.select('product,'category,'revenue,dense_rank() over window as "rank")
      .where('rank<=2)
      .select('product,'category,'revenue)
      .show()

    val meanWindow=Window.partitionBy("category")
        .orderBy('revenue.desc)
    

    source.select('product,'category,'revenue,(max('revenue) over(meanWindow)) -'revenue  as "mean" )
      .show()

  }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM