一、spark SQL:類似於Hive,是一種數據分析引擎
什么是spark SQL?
spark SQL只能處理結構化數據
底層依賴RDD,把sql語句轉換成一個個RDD,運行在不同的worker上
特點:
1、容易集成:SQL語句
2、對不同的數據源提供統一的訪問方式:DataFrame 用DataFrame屏蔽數據源的差別
3、兼容Hive
大綱:
核心概念:DataFrame(看作表):就是表,是Spark SQL對結構化數據的抽象集合
表現形式:RDD
表=表結構+數據
DataFrame=schema+RDD
DataSet(新API接口 看作表)
如何創建DataFrame?
1、方式一:通過case class創建DataFrame
創建表結構
case class EMP(empno:Int,ename:String,job:String,mgr:String,hiredata:String,sal:Int,comm:String,deptno:Int)
導入emp.csv文件並指定分隔符
val lines = sc.textFile("/root/temp/emp.csv").map(_.split(","))
lines.collect
將表結構和數據關聯起來
val allEmp = lines.map(x=>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))
創建DataFrame:
val empDF = allEmp.toDF
操作DataFrame:
empDF.show:展示DataFrame
empDF.printSchema:打印DataFrame的表結構
2、方式二:通過SparkSession.createDataFrame()創建DataFrame
什么是spark session?
從spark2.0以后提供了統一訪問spark各個模塊的對象:spark session
創建表結構:用StructType類
import org.apache.spark.sql
import org.apache.spark.sql.types._
val myschema = StructType(List(empno:Int,ename:String,job:String,mgr:String,hiredata:String,sal:Int,comm:String,deptno:Int))
導入emp.csv文件並指定分隔符
val lines = sc.textFile("/root/temp/emp.csv").map(_.split(","))
將表結構和數據關聯起來,把讀入的數據emp.csv映射成一行,這里沒有帶表結構
import.org.apache.spark.sql._
val rowRDD = lines.map(x=>Row(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))
通過SparkSession.createDataFrame()創建表
val df = spark.createDataFrame(rowRDD,myschema)
3、方式三:直接讀取一個具有格式的數據文件作為DataFrame(json文件)
val peopleDF = spark.read.json("/root/training/")
4、操作DataFrame:DSL語句和SQL語句
DSL語句:empDF.show
empDF.printSchema
查詢所有員工的信息:df.show
查詢所有員工的姓名:df.select("ename").show
或者df.select($"ename").show
查詢員工信息:姓名 薪水 薪水+100
df.select($"ename",$"sal",$"sal"+100).show
查詢工資大於2000的員工
df.filter("sal">2000).show
分組:
df.groupBy("deptno").count.show
SQL語句:需要將DataFrame注冊成一張臨時視圖
df.createOrReplaceTempView("emp")
spark.sql("select * from emp").show
spark.sql("select * from emp where deptno=10").show
5、臨時視圖:2種
1、只在當前會話中有效:臨時視圖 df.createOrReplaceTempView("emp")
2、在全局范圍內都有效:全局臨時視圖 df.createGlobalTempView("empG")
例:在當前會話中
spark.sql("select * from emp").show
spark.sql("select * from global_temp.empG").show
例:在新的會話中
spark.newSession.sal("select * from emp").show
spark.newSession.sal("select * from global_temp.empG").show
二、使用數據源:
1、load函數加載數據源和save函數保存數據源
load函數默認的數據源是parquet文件
json函數默認的數據源是json文件
val usersDF = spark.read.load("/root/training/spakr-2.1.0-bin-hadoop2.7/examples/")
usersDF.select("name","favorite_color").show
usersDF.select("name","favorite_color").write.save("/root/temp/result")
2、Parquet文件:是sparkSQL load函數默認加載的數據源,按列存儲的文件
如何把其他文件格式轉換成parquet文件?
例:json文件---->parquet文件
val empJSON = spark.read.json("/root/temp/emp.json") #直接讀取一個具有格式的數據文件作為DataFrame
empJSON.write.parquet("/root/temp/empparquet") #/empparquet目錄不能事先存在
或者empJSON.wirte.mode("overwrite").parquet("/root/temp/result") #/result目錄可以事先存在
功能:支持Schema的合並
第一個文件:val df1 = sc.makeRDD(1 to 5).map(i=>(i,i*2)).toDF("single","double")
df1.write.parquet("/root/temp/test_table/key=1")
第二個文件:val df2 = sc.makeRD(6 to 10).map(i=>(i,i*3)).toDF("single","triple")
df2.write.parquet("/root/temp/test_table/key=2")
合並兩個文件:val df3 = spark.read.option("mergeSchema","true").parquet("/root/temp/test_table")
3、json文件:
spark.read.json("/root/training/spark-2.1.0-bin-hadoop-2.7/examples/src/main/resources/people.json")
spark.read.format("json").load("/root/training/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json")
4、RDBMS:需要把RDBMS的驅動加入到spark shell中
spark.read.format("jdbc").option("url","jdbc:oracle:thin:@192.168.182.11:1521/orcl.example.com").option("dbtable","scott.emp").option("user","scott").option("password","tiger").load
或使用Properties類
import java.util.Properties
val prop = new Properties()
prop.setProperty("user","scott")
prop.setProperty("password","tiger")
val oracleDF1 = spark.read.jdbc("jdbc:oracle:thin:@192.168.182.11:1521/orcl")
作者:李金澤AllenLi,清華大學碩士研究生,研究方向:大數據和人工智能