Sparksql的介紹以及常見操作


撰寫本文的目的:對於sparksql,網上有大量的詳細文檔,本人針對常用的操作進行一個整理,當然有大多數都是從其他地方搬過來的,包括官方文檔以及其他網友的一些分享,一來是通過此次整理加強自己的記憶,二來如果有幸幫到某位網友,那是本人莫大的榮幸,先感謝您的閱讀,廢話不多說,進入正文:

    下文所涉及到的相關軟件版本分別為:

    spark版本:v2.2.0

    hive  :  v1.2.1

    hadoop :  v2.7.6

前言:

    Spark sql是spark處理結構化數據的一個模塊,它的前身是shark,與基礎的spark rdd不同,spark sql提供了結構化數據及計算結果等信息的接口,在內部,spark sql使用這個額外的信息去執行額外的優化,有幾種方式可以跟spark sql進行交互,包括sql和dataset api,使用相同的執行引擎進行計算的時候,無論是使用哪一種計算引擎都可以一快速的計算。

Dataset and DataFrames

  RDD:在spark剛開始的時候,引入RDD(彈性分布式數據集)

    優點:

      1)編譯時類型安全,編譯時就能檢查出類型錯誤

      2)面向對象的編程分格,直接通過類名點的方式來操作數據

      例如:idAge.filter(_.age > "") //編譯時直接報錯

         idAgeRDDPerson.filter(_.age > 25) //直接操作一個個的person對象

    缺點:

      1)序列化和反序列化的性能開銷,無論是集群間的通信還是IO操作,都需要對對象的結果和數據進行序列化和反序列化

      2)GC的性能開銷,頻繁的創建和銷毀對象,勢必會增加GC

  DataFrame:spark1.3的時候引入了DataFrmae,是一個列方式組織的分布式數據集

    優點:

      1)引入了Schema,包含了一ROW位單位的每行數據的列信息,spark通過Schema就能夠讀懂數據,因此在通信和IO時就只需要序列化和反序列化數據,而結構的部分就可以省略了;

      2)off-heap:spark能夠以二進制的形式序列化數據(不包括結構)到off-heap(堆外內存),當要操作數據時,就直接操作off-heap內存,off-heap類似於地盤,schema類似於地圖,Spark有了地圖又有了自己地盤了,就可以自己說了算,不再受JVM的限制,也就不再受GC的困擾了,通過Schema和off-heap,DataFrame克服了RDD的缺點。對比RDD提升了計算效率,減少了數據的讀取,底層計算優化

      3)引入了新的引擎:Tungsten

      4)引入了新的語法解析框架:Catalyst

    缺點:

      DataFrame客服了RDD 的缺點,但是丟失了RDD的優點,DataFrame不是類型安全的,API也不是面向對象分格的。

      1)API不是面向對象的

        idAgeDF.filter(idAgeDF.col("age") > 22)

      2)DataFrame不是編譯時類型安全的,下面這種情況下不會報錯

        idAgeDF.filter(idAgeDF.col("age") > "")

  DataSet:到spark1.6的時候引入了DataSet,Encoder分布式數據集,是一個被添加的新接口,它提供了RDD 的優點(強類型化,能夠使用強大的lambda函數)

 /**
 * @groupname basic Basic Dataset functions
 * @groupname action Actions
 * @groupname untypedrel Untyped transformations
 * @groupname typedrel Typed transformations
 *
 * @since 1.6.0
 */
@InterfaceStability.Stable
class Dataset[T] private[sql](
    @transient val sparkSession: SparkSession,
    @DeveloperApi @InterfaceStability.Unstable @transient val queryExecution: QueryExecution,
    encoder: Encoder[T])
  extends Serializable {

 

       DataSet是一個類,其中包含了三個參數:

      1. SparkSession:環境信息
      2. QueryExecution:包含數據和執行邏輯
      3. Encoder:數據結構編碼信息(包含序列化,schema,數據類型)

    核心:Encoder

    優點:

      1)一個DataSet可以從JVM對象來構造並且使用轉換功能(map,flatMap,filter...)

      2)編譯時的類型安全檢查。性能極大地提升,內存使用極大降低,減少GC。極大地較少網絡數據的傳輸、極大地減少scala和java之間代碼的差異性

      3)DataFrame每一行對應一個Row。而DataSet的定義更加寬松,每一個record對應了一個任意的類型。DataFrame只是DataSet的一種特例:type DataFrame = Dataset[Row]

      4)不同的Row是一個泛華的無類型的JVM object,Dataset是有一系列的強類型的JVM object組成的,Scala的case class或者java class定義。因此Dataset可以在編譯時進行類型檢查

      5)Dataset一Catalyst邏輯執行計划表示,並且數據以編碼的二進制形式被存儲,不需要反序列化就可以執行sorting,shuffle等操作

      6)Dataset創建需要一個顯示的Encoder,把對象序列化為二進制

      7)在scala API中,DataFrame僅僅是一個DataSet[Row]類型的別名,然而,在Java API中,用戶需要使用Dataset<Row> 去代表一個DataFrame。

sparkSession

  Spark2.0中開始引入了SparkSession的概念,它為用戶提供了一個統一的切入點來使用Spark的各項功能,包括SQLContext和HiveContext的組合(未來可能會加上StreamingContext),用戶不但可以使用DataFrame和Dataset的各種API,大大降低了spark的學習難度

   創建sparkSession:

scala:

import org.apache.spark.sql.SparkSession
val ssc = new SparkSession
    .Builder()
    .appName("name_in_webUI") //這里的這個名字隨便起,只要是自己能認識,最終這個是要顯示在weiUI界面的
    .enableHiveSupport()  //如果需要訪問hive,這一步不能少,如果只是讀取本地文件這一句可以省去
    .getOrCreate()  
//創建之后可以設置運行參數
ssc.conf.set("spark.sql.shuffle.partitions",4)
ssc.conf.set("spark.executor.memory","2g")

java:

import org.apache.spark.sql.SparkSession;
SparkSession spark
= SparkSession .builder() .appName("name_in_webUI") .getOrCreate();
spark.conf().set("spark.sql.shuffle.partitions",4);
spark.conf().set("spark.executor.memory","2g");

    Row

    Row是一個泛華的無類型的JVM object

    Row的訪問方式:

import org.apache.spark.sql.Row

val row = Row(1,'asd',3.3)
//Row的訪問方式
row(0)
row(1)
row(2)

row.getInt(0)
row.getString(1)
row.getDouble(2)

row.getAs[Int](0)
row.getAs[String](1)
row.getAs[Double](2)

    DataFrame:

    DataFrame即是帶有schema信息的RDD,spark直接可通過Schema就可以讀懂信息。

     schema:

      DataFrame中提供了詳細的數據信息,從而使得sparkSql可以清楚的知道數據集中包含了哪些列,每列的名稱和類型是什么?DataFrame的結構信息即為schema。

    schema的定義方式:

    import org.apache.spark.sql.types._

    1、來自官網文檔

val schema1 = StructType( StructField("name", StringType, false) :: 
                          StructField("age",  IntegerType, false) :: 
                          StructField("height", IntegerType, false) ::  Nil)

val schema2 = StructType( Seq(StructField("name", StringType, false),
                              StructField("age",  IntegerType, false),
                              StructField("height", IntegerType, false)))

val schema3 = StructType( List(StructField("name", StringType, false),
                               StructField("age",  IntegerType, false),
                               StructField("height", IntegerType, false)))

    2、來自spark源碼

val schema4 = (new StructType).
add(StructField("name", StringType, false)).
add(StructField("age",  IntegerType, false)).
add(StructField("height", IntegerType, false))

val schema5 = (new StructType).
add("name", StringType, true, "comment1").
add("age", IntegerType, false, "comment2").
add("height", IntegerType, true, "comment3")

    3、最便捷的方式

val schema6 = (new StructType).
add("name", "string", false).
add("age", "integer", false).
add("height", "integer", false)

RDD、DataFrame和DataSet的共性與區別

共性:

  1. 三者都是spark平台下的分布式彈性數據集,為處理超大型數據提供便利
  2. 三者都有惰性機制。在創建時、轉換時(如map)不會立即執行,只有在遇到action算子的時候(比如foreach),才開始進行觸發計算。極端情況下,如果代碼中只有創建、轉換,但是沒有在后面的action中使用對應的結果,在執行時會被跳過。
  3. 三者都有partition的概念,都有緩存(cache)的操作,還可以進行檢查點操作(checkpoint)
  4. 三者都有許多共同的函數(如map、filter,sorted等等)。
  5. 在對DataFrame和DataSet操作的時候,大多數情況下需要引入隱式轉換(ssc.implicits._)

不同:

  DataFrame:

    DataFrame是DataSet的特例,也就是說DataSet[Row]的別名

    DataFrame = RDD + schema

    1. DataFrame的每一行的固定類型為Row,只有通過解析才能獲得各個字段的值
    2. DataFrame與DataSet通常與spark ml同時使用
    3. DataFrame與DataSet均支持sparkSql操作,比如select,groupby等,也可以注冊成臨時表,進行sql語句操作
    4. DataFrame與DateSet支持一些方便的保存方式,比如csv,可以帶上表頭,這樣每一列的字段名就可以一目了然

  DataSet:

    DataSet = RDD + case class

    1. DataSet與DataFrame擁有相同的成員函數,區別只是只是每一行的數據類型不同。
    2. DataSet的每一行都是case class,在自定義case class之后可以很方便的獲取每一行的信息

DataFrame和DataSet的基本操作

   DataFrame和DataSet的創建

 

 

 

 

     DataFrame

  1、集合轉DataFrame

val ssc = SparkSession().Builder.master("test").appName("test").getOrCreate
val seq1 = Seq(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
val df1 = ssc.createDataFrame(seq1).withColumnRenamed("_1", "name1").
          withColumnRenamed("_2", "age1").withColumnRenamed("_3", "height1")
df1.orderBy(desc("age1")).show(10)
import ssc.implicit._
val df2 = ssc.createDataFrame(seq1).toDF("name", "age", "height") 

 

 

   2、RDD轉DataFrame

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
val arr = Array(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
val rdd1 = sc.makeRDD(arr).map(f=>Row(f._1, f._2, f._3))
val schema = StructType( StructField("name", StringType, false) :: 
  StructField("age",  IntegerType, false) :: 
  StructField("height", IntegerType, false) ::  Nil)
// false:說明該字段不允許為null true:說明該字段可以為null val rddToDF = spark.createDataFrame(rdd1, schema) rddToDF.orderBy(desc("name")).show(false)

  DataSet

  1、由range生成DataSet

val numDS = spark.range(5,100,5)
numDS.orderBy(desc("id")).show(5)
numDS.describe().show

  

 

 

   2、由集合生成DS

case class Person(name:String, age:Int, height:Int)
val seq1 = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144), Person("Andy", 16, 165))
val spark:SparkSession = SparkSession.Builder....
val ds1 = spark.createDataset(seq1)
ds1.show
val seq2 = Seq(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
val ds2 = spark.createDataset(seq2)
ds2.show

 

 

   3、由RDD進行轉換

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val arr = Array(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
val rdd2 = sc.makeRDD(arr).map(f=>Row(f._1, f._2, f._3))
val rdd3 = sc.makeRDD(arr).map(f=>Person(r._1,f._2,f._3)) val ds3 = sc.createDataset(rdd2)
val ds4 = rdd3.toDS() ds3.show(10)

 

 

 通過SparkSession讀取文件

import org.apache.spark.sql.types._
val schema2 = StructType( StructField("name", StringType, false) :: 
                          StructField("age",  IntegerType, false) :: 
                          StructField("height", IntegerType, false) ::  Nil)
val df7 = ssc.read.options(Map(("delimiter", ","), ("header", "false"))).schema(schema2).csv("file:///home/spark/t01.csv") // 讀取本地文件
df7.show()

DataSet的基礎函數

 

 

 

import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.types._
case class Person(name:String, age:Int, height:Int)
spark.sparkContext.setCheckpointDir("hdfs://node1:8020/checkpoint")
// 1 DataSet存儲類型
val seq1 = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144), Person("Andy", 16, 165))
val ds1 = spark.createDataset(seq1)
ds1.show()
ds1.checkpoint()
ds1.cache()
ds1.persist(StorageLevel.MEMORY_ONLY)
ds1.count()
ds1.show()
ds1.unpersist(true)

// 2 DataSet結構屬性
ds1.columns
ds1.dtypes
ds1.explain()
ds1.col("name")
ds1.printSchema		// 常用
// 3 DataSet rdd數據互轉
val rdd1 = ds1.rdd
val ds2 = rdd1.toDS()
ds2.show()
val df2 = rdd1.toDF()
df2.show()

// 4 Dataset保存文件
ds1.select("name", "age", "height").write.format("csv").save("data/sql1/my01.csv")
// 讀取保存的文件
val schema2 = StructType( StructField("name", StringType, false) :: 
                          StructField("age",  IntegerType, false) :: 
                          StructField("height", IntegerType, false) ::  Nil)

val out = spark.read.
  options(Map(("delimiter", ","), ("header", "false"))). 
  schema(schema2).csv("data/sql1/*")
out.show(10)

DataSet的Action操作

 

 

 

// 1 顯示數據集
val seq1 = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144), Person("Andy", 16, 165))
val ds1 = spark.createDataset(seq1)
// 缺省顯示20行
ds1.show()
// 顯示2行
ds1.show(2)
// 顯示20行,不截斷字符
ds1.show(20, false)

// 2 獲取數據集
// collect返回的是數組
val c1 = ds1.collect()
// collectAsList返回的是List
val c2 = ds1.collectAsList()
val h1 = ds1.head()
val h2 = ds1.head(3)
val f1 = ds1.first()
val f2 = ds1.take(2)
val t2 = ds1.takeAsList(2)
ds.limit(10).show		// 取10行數據生成新的DataSet

// 3 統計數據集
ds1.count()
// 返回全部列的統計(count、mean、stddev、min、max)
ds1.describe().show
// 返回指定列的統計(count、mean、stddev、min、max)
ds1.describe("age").show
ds1.describe("age", "height").show

// 4 聚集
ds1.reduce{ (f1, f2) => Person("sum", f1.age+f2.age, f1.height+f2.height) }

  

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM