Spark DataSet


1.DataSet相關概念

Dataset是一個分布式的數據集。Dataset是Spark 1.6開始新引入的一個接口,它結合了RDD API的很多優點(包括強類型,支持lambda表達式等),以及Spark SQL的優點(優化后的執行引擎)。Dataset可以通過JVM對象來構造,然后通過transformation類算子(map,flatMap,filter等)來進行操作。Scala和Java的API中支持Dataset,但是Python不支持Dataset API。不過因為Python語言本身的天然動態特性,Dataset API的不少feature本身就已經具備了(比如可以通過row.columnName來直接獲取某一行的某個字段)。R語言的情況跟Python也很類似。

Dataframe就是按列組織的Dataset。在邏輯概念上,可以大概認為Dataframe等同於關系型數據庫中的表,或者是Python/R語言中的data frame,但是在底層做了大量的優化。Dataframe可以通過很多方式來構造:比如結構化的數據文件,Hive表,數據庫,已有的RDD。Scala,Java,Python,R等語言都支持Dataframe。在Scala API中,Dataframe就是Dataset[Row]的類型別名。在Java中,需要使用Dataset<Row>來代表一個Dataframe。

2.DataSet操作

  • collect:將分布式存儲在集群上的分布式數據集(比如dataset),中的所有數據都獲取到driver端來
  • first:獲取數據集中的第一條數據
  • persist()/cache():持久化,如果要對一個dataset重復計算兩次的話,那么建議先對這個dataset進行持久化再進行操作,避免重復計算
  • createTempView("employee")
  • explain():答應執行計划,dataframe/dataset,比如執行了一個sql語句獲取的dataframe,實際上內部包含一個logical plan,邏輯執行計划,設計執行的時候,首先會通過底層的catalyst optimizer,生成物理執行計划,比如說會做一些優化,比如push filter,還會通過whole-stage code generation技術去自動化生成代碼,提升執行性能
  • DataSet.write.save:將數據保存到指定目錄
  • printSchema():打印結構
  • 將DataFrame轉化為DataSet
 
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Long)
val employeeDS=employee.as[Employee]
  • coalesce和repartition:都是用來重新定義分區的,區別在於:coalesce,只能用於減少分區數量,而且可以選擇不發生shuffle,repartiton,可以增加分區,也可以減少分區,必須會發生shuffle,相當於是進行了一次重分區操作
  • distinct和dropDuplicates:都是用來進行去重的,distinct,是根據每一條數據,進行完整內容的比對和去重, dropDuplicates,可以根據指定的字段進行去重
1 val employeeDistinct=employeeDS.distinct()
2  employeeDistinct.show()
3  val employeeDropDup=employeeDS.dropDuplicates(Seq("name"))
4  employeeDropDup.show()
  • except:獲取在當前dataset中有,但是在另外一個dataset中沒有的元素
  • filter:根據我們自己的邏輯,如果返回true,那么就保留該元素,否則就過濾掉該元素
  • intersect:獲取兩個數據集的交集
employeeDS.except(employeeDS2).show()
employeeDS.intersect(employeeDS2).show()
employeeDS.filter(employee=>employee.age>35).show()
  • map:將數據集中的每條數據都做一個映射,返回一條新數據
  • flatMap:數據集中的每條數據都可以返回多條數據
  • mapPartitions:一次性對一個partition中的數據進行處理
 1 employeeDS.map(employee=>(
 2   employee.name,employee.salary,employee.salary+1000
 3 )).show()
 4 employeeDS.flatMap(employee=>Seq(
 5     (employee.name,employee.salary,employee.salary+1000),
 6     (employee.name,employee.salary,employee.salary+2000)
 7     )).show()
 8 employeeDS.mapPartitions(employee=>{
 9   val result=scala.collection.mutable.ArrayBuffer[(String,Long,Long)]()
10   while(employee.hasNext){
11     var temp=employee.next()
12     result += ((temp.name,temp.salary,temp.salary+5000))
13   }
14   result.iterator
15 }).show()
  • joinWith,兩個DataSet關聯,指定關聯條件
1 employee.joinWith(department, $"deptId" === $"id").show()
  • sort:排序
1 employeeDS.sort($"salary".desc).show()
  • randomSplit/sample
1 val employeeDSArr=employeeDS.randomSplit(Array(3,10,20))
2 employeeDSArr.foreach(ds=>ds.show())
3 employeeDS.sample(false, 0.3).show()
  • groupBy/agg/avg/sum/max/min/count/countDistinct
1 employee
2     .join(department, $"depId" === $"id")  
3     .groupBy(department("name"))
4     .agg(avg(employee("salary")), sum(employee("salary")), max(employee("salary")), min(employee("salary")), count(employee("name")), countDistinct(employee("name")))
5     .show()
  • collect_list/collect_set:collect_list就是將一個分組內,指定字段的值都收集到一起,不去重,collect_set,同上,但是唯一的區別是,會去重
1 /**
2 [1,WrappedArray(Leo, Jack),WrappedArray(Jack, Leo)]
3 [3,WrappedArray(Tom, Kattie),WrappedArray(Tom, Kattie)]
4 [2,WrappedArray(Marry, Jen, Jen),WrappedArray(Marry, Jen)]
5 */
6      employee.groupBy(employee("depId"))
7      .agg(collect_list(employee("name")),collect_set(employee("name")))
8      .collect()
9      .foreach(println)


鏈接:https://www.jianshu.com/p/f017716187b3


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM