1.DataSet相關概念
Dataset是一個分布式的數據集。Dataset是Spark 1.6開始新引入的一個接口,它結合了RDD API的很多優點(包括強類型,支持lambda表達式等),以及Spark SQL的優點(優化后的執行引擎)。Dataset可以通過JVM對象來構造,然后通過transformation類算子(map,flatMap,filter等)來進行操作。Scala和Java的API中支持Dataset,但是Python不支持Dataset API。不過因為Python語言本身的天然動態特性,Dataset API的不少feature本身就已經具備了(比如可以通過row.columnName來直接獲取某一行的某個字段)。R語言的情況跟Python也很類似。
Dataframe就是按列組織的Dataset。在邏輯概念上,可以大概認為Dataframe等同於關系型數據庫中的表,或者是Python/R語言中的data frame,但是在底層做了大量的優化。Dataframe可以通過很多方式來構造:比如結構化的數據文件,Hive表,數據庫,已有的RDD。Scala,Java,Python,R等語言都支持Dataframe。在Scala API中,Dataframe就是Dataset[Row]的類型別名。在Java中,需要使用Dataset<Row>來代表一個Dataframe。
2.DataSet操作
- collect:將分布式存儲在集群上的分布式數據集(比如dataset),中的所有數據都獲取到driver端來
- first:獲取數據集中的第一條數據
- persist()/cache():持久化,如果要對一個dataset重復計算兩次的話,那么建議先對這個dataset進行持久化再進行操作,避免重復計算
- createTempView("employee")
- explain():答應執行計划,dataframe/dataset,比如執行了一個sql語句獲取的dataframe,實際上內部包含一個logical plan,邏輯執行計划,設計執行的時候,首先會通過底層的catalyst optimizer,生成物理執行計划,比如說會做一些優化,比如push filter,還會通過whole-stage code generation技術去自動化生成代碼,提升執行性能
- DataSet.write.save:將數據保存到指定目錄
- printSchema():打印結構
- 將DataFrame轉化為DataSet
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Long) val employeeDS=employee.as[Employee]
- coalesce和repartition:都是用來重新定義分區的,區別在於:coalesce,只能用於減少分區數量,而且可以選擇不發生shuffle,repartiton,可以增加分區,也可以減少分區,必須會發生shuffle,相當於是進行了一次重分區操作
- distinct和dropDuplicates:都是用來進行去重的,distinct,是根據每一條數據,進行完整內容的比對和去重, dropDuplicates,可以根據指定的字段進行去重
1 val employeeDistinct=employeeDS.distinct() 2 employeeDistinct.show() 3 val employeeDropDup=employeeDS.dropDuplicates(Seq("name")) 4 employeeDropDup.show()
- except:獲取在當前dataset中有,但是在另外一個dataset中沒有的元素
- filter:根據我們自己的邏輯,如果返回true,那么就保留該元素,否則就過濾掉該元素
- intersect:獲取兩個數據集的交集
employeeDS.except(employeeDS2).show()
employeeDS.intersect(employeeDS2).show()
employeeDS.filter(employee=>employee.age>35).show()
- map:將數據集中的每條數據都做一個映射,返回一條新數據
- flatMap:數據集中的每條數據都可以返回多條數據
- mapPartitions:一次性對一個partition中的數據進行處理
1 employeeDS.map(employee=>( 2 employee.name,employee.salary,employee.salary+1000 3 )).show() 4 employeeDS.flatMap(employee=>Seq( 5 (employee.name,employee.salary,employee.salary+1000), 6 (employee.name,employee.salary,employee.salary+2000) 7 )).show() 8 employeeDS.mapPartitions(employee=>{ 9 val result=scala.collection.mutable.ArrayBuffer[(String,Long,Long)]() 10 while(employee.hasNext){ 11 var temp=employee.next() 12 result += ((temp.name,temp.salary,temp.salary+5000)) 13 } 14 result.iterator 15 }).show()
- joinWith,兩個DataSet關聯,指定關聯條件
1 employee.joinWith(department, $"deptId" === $"id").show()
- sort:排序
1 employeeDS.sort($"salary".desc).show()
- randomSplit/sample
1 val employeeDSArr=employeeDS.randomSplit(Array(3,10,20)) 2 employeeDSArr.foreach(ds=>ds.show()) 3 employeeDS.sample(false, 0.3).show()
- groupBy/agg/avg/sum/max/min/count/countDistinct
1 employee 2 .join(department, $"depId" === $"id") 3 .groupBy(department("name")) 4 .agg(avg(employee("salary")), sum(employee("salary")), max(employee("salary")), min(employee("salary")), count(employee("name")), countDistinct(employee("name"))) 5 .show()
- collect_list/collect_set:collect_list就是將一個分組內,指定字段的值都收集到一起,不去重,collect_set,同上,但是唯一的區別是,會去重
1 /** 2 [1,WrappedArray(Leo, Jack),WrappedArray(Jack, Leo)] 3 [3,WrappedArray(Tom, Kattie),WrappedArray(Tom, Kattie)] 4 [2,WrappedArray(Marry, Jen, Jen),WrappedArray(Marry, Jen)] 5 */ 6 employee.groupBy(employee("depId")) 7 .agg(collect_list(employee("name")),collect_set(employee("name"))) 8 .collect() 9 .foreach(println)
鏈接:https://www.jianshu.com/p/f017716187b3
