常見應用 - SparkSql 之 DSL(1)


 
         
package com.zhangxiaofan.test

import org.apache.spark._
import org.apache.spark.sql.SparkSession

/**
* @author ZhangXiaoFan
* @create 2020-10-20 9:58
*/
case class Person2(name: String, age: Int, sex: String, salary: Int, deptNo: Int)
case class Dept(deptNo: Int, deptName: String)

object SparkSQLDSLDemo {
def main(args: Array[String]): Unit = {
//1、創建spark上下文環境
val conf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("WorldCount")
val spark = new SparkSession.Builder()
.config(conf)
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._

// 2. 直接創建模擬數據
val rdd1 = spark.sparkContext.parallelize(Array(
Person2("張三", 21, "M", 1235, 1),
Person2("李四", 20, "F", 1235, 1),
Person2("王五", 26, "M", 1235, 1),
Person2("小明", 25, "F", 1225, 1),
Person2("小花", 24, "F", 1425, 1),
Person2("小華", 23, "M", 1215, 1),
Person2("gerry", 22, "F", 1415, 2),
Person2("tom", 21, "F", 1855, 2),
Person2("lili", 20, "F", 1455, 2),
Person2("莉莉", 18, "M", 1635, 2)
))
val rdd2 = spark.sparkContext.parallelize(Array(
Dept(1, "部門1"),
Dept(2, "部門2")
))

val personDataFrame = rdd1.toDF()
val deptDataFrame = rdd2.toDF()

//注冊udf
var sexToNum = udf((sex: String) => {
sex.toUpperCase match {
case "M" => 0
case "F" => 1
case _ => -1
}
})

//select
println("----select-----")
deptDataFrame.select(col("deptNo"),col("deptName")).show()
deptDataFrame.select(col("deptNo") + 1 as "deptNo" ,col("deptName")).show()
deptDataFrame.select((col("deptNo") + 1).alias("deptNo") ,col("deptName")).show()
personDataFrame.select(col("name"),col("age") , col("sex"), sexToNum(col("sex")).as("sex_num")).show()

// where/filter
println("------where/filter-------")
personDataFrame.where(col("age").>(22 ) ).show()
personDataFrame.where(col("age") > 20 && col("sex") === "M" && col("deptNo") === 1).show()
personDataFrame.where(col("age") > 20 || col("sex") === "M" || col("deptNo").equalTo(1)).show()


// group by
println("------group by-------")
personDataFrame
.groupBy(col("sex"))
.agg(sum(col("salary")),avg(col("salary")).as("avg_salary"))
.show()

println("----------join--------------------")
personDataFrame.join(deptDataFrame, (personDataFrame.col("deptNo") === deptDataFrame.col("deptNo")).and(personDataFrame.col("deptNo") === deptDataFrame.col("deptNo")) ,"inner").show()
personDataFrame.join(deptDataFrame, "deptNo" ).show()
personDataFrame
.join(deptDataFrame, Seq("deptNo"), "left_outer").show

}
}

  

 
         
         
       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM