詳細解讀Spark的數據分析引擎:Spark SQL


 詳細解讀Spark的數據分析引擎:Spark SQL

一、spark SQL:類似於Hive,是一種數據分析引擎

什么是spark SQL?

spark SQL只能處理結構化數據

底層依賴RDD,把sql語句轉換成一個個RDD,運行在不同的worker上

特點:

1、容易集成:SQL語句

2、對不同的數據源提供統一的訪問方式:DataFrame 用DataFrame屏蔽數據源的差別

3、兼容Hive

大綱:

詳細解讀Spark的數據分析引擎:Spark SQL

核心概念:DataFrame(看作表):就是表,是Spark SQL對結構化數據的抽象集合

表現形式:RDD

表=表結構+數據

DataFrame=schema+RDD

DataSet(新API接口 看作表)

如何創建DataFrame?

1、方式一:通過case class創建DataFrame

創建表結構

case class EMP(empno:Int,ename:String,job:String,mgr:String,hiredata:String,sal:Int,comm:String,deptno:Int)

導入emp.csv文件並指定分隔符

val lines = sc.textFile("/root/temp/emp.csv").map(_.split(","))

lines.collect

將表結構和數據關聯起來

val allEmp = lines.map(x=>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))

創建DataFrame:

val empDF = allEmp.toDF

操作DataFrame:

empDF.show:展示DataFrame

empDF.printSchema:打印DataFrame的表結構

2、方式二:通過SparkSession.createDataFrame()創建DataFrame

什么是spark session?

從spark2.0以后提供了統一訪問spark各個模塊的對象:spark session

創建表結構:用StructType類

import org.apache.spark.sql

import org.apache.spark.sql.types._

val myschema = StructType(List(empno:Int,ename:String,job:String,mgr:String,hiredata:String,sal:Int,comm:String,deptno:Int))

導入emp.csv文件並指定分隔符

val lines = sc.textFile("/root/temp/emp.csv").map(_.split(","))

將表結構和數據關聯起來,把讀入的數據emp.csv映射成一行,這里沒有帶表結構

import.org.apache.spark.sql._

val rowRDD = lines.map(x=>Row(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))

通過SparkSession.createDataFrame()創建表

val df = spark.createDataFrame(rowRDD,myschema)

3、方式三:直接讀取一個具有格式的數據文件作為DataFrame(json文件)

val peopleDF = spark.read.json("/root/training/")

4、操作DataFrame:DSL語句和SQL語句

DSL語句:empDF.show

empDF.printSchema

查詢所有員工的信息:df.show

查詢所有員工的姓名:df.select("ename").show

或者df.select($"ename").show

查詢員工信息:姓名 薪水 薪水+100

df.select($"ename",$"sal",$"sal"+100).show

查詢工資大於2000的員工

df.filter("sal">2000).show

分組:

df.groupBy("deptno").count.show

SQL語句:需要將DataFrame注冊成一張臨時視圖

df.createOrReplaceTempView("emp")

spark.sql("select * from emp").show

spark.sql("select * from emp where deptno=10").show

5、臨時視圖:2種

1、只在當前會話中有效:臨時視圖 df.createOrReplaceTempView("emp")

2、在全局范圍內都有效:全局臨時視圖 df.createGlobalTempView("empG")

例:在當前會話中

spark.sql("select * from emp").show

spark.sql("select * from global_temp.empG").show

例:在新的會話中

spark.newSession.sal("select * from emp").show

spark.newSession.sal("select * from global_temp.empG").show

詳細解讀Spark的數據分析引擎:Spark SQL

二、使用數據源:

1、load函數加載數據源和save函數保存數據源

load函數默認的數據源是parquet文件

json函數默認的數據源是json文件

val usersDF = spark.read.load("/root/training/spakr-2.1.0-bin-hadoop2.7/examples/")

usersDF.select("name","favorite_color").show

usersDF.select("name","favorite_color").write.save("/root/temp/result")

2、Parquet文件:是sparkSQL load函數默認加載的數據源,按列存儲的文件

如何把其他文件格式轉換成parquet文件?

例:json文件---->parquet文件

val empJSON = spark.read.json("/root/temp/emp.json") #直接讀取一個具有格式的數據文件作為DataFrame

empJSON.write.parquet("/root/temp/empparquet") #/empparquet目錄不能事先存在

或者empJSON.wirte.mode("overwrite").parquet("/root/temp/result") #/result目錄可以事先存在

功能:支持Schema的合並

第一個文件:val df1 = sc.makeRDD(1 to 5).map(i=>(i,i*2)).toDF("single","double")

df1.write.parquet("/root/temp/test_table/key=1")

第二個文件:val df2 = sc.makeRD(6 to 10).map(i=>(i,i*3)).toDF("single","triple")

df2.write.parquet("/root/temp/test_table/key=2")

合並兩個文件:val df3 = spark.read.option("mergeSchema","true").parquet("/root/temp/test_table")

3、json文件:

spark.read.json("/root/training/spark-2.1.0-bin-hadoop-2.7/examples/src/main/resources/people.json")

spark.read.format("json").load("/root/training/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json")

4、RDBMS:需要把RDBMS的驅動加入到spark shell中

spark.read.format("jdbc").option("url","jdbc:oracle:thin:@192.168.182.11:1521/orcl.example.com").option("dbtable","scott.emp").option("user","scott").option("password","tiger").load

或使用Properties類

import java.util.Properties

val prop = new Properties()

prop.setProperty("user","scott")

prop.setProperty("password","tiger")

val oracleDF1 = spark.read.jdbc("jdbc:oracle:thin:@192.168.182.11:1521/orcl")

作者:李金澤AllenLi,清華大學碩士研究生,研究方向:大數據和人工智能


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM