RDD.DataFrame.DataSet的區別和聯系
共性:
1)都是spark中得彈性分布式數據集,輕量級
2)都是惰性機制,延遲計算
3)根據內存情況,自動緩存,加快計算速度
4)都有partition分區概念
5)眾多相同得算子:map flatmap 等等
區別:
1)RDD不支持SQL
2)DF每一行都是Row類型,不能直接訪問字段,必須解析才行
3)DS每一行是什么類型是不一定的,在自定義了case class之后可以很自由的獲 得每一行的信息
4)DataFrame與Dataset均支持spark sql的操作,比如select,group by之類,還 能注冊臨時表/視窗,進行sql語句操作
5)可以看出,Dataset在需要訪問列中的某個字段時是非常方便的,然而,如果要 寫一些適配性很強的函數時,如果使用Dataset,行的類型又不確定,可能是 各種case class,無法實現適配,這時候用DataFrame即Dataset[Row]就能比較 好的解決問題。
轉化:
1)DF/DS轉RDD
- Val Rdd = DF/DS.rdd
2) DS/RDD轉DF
- import spark.implicits._
- 調用 toDF(就是把一行數據封裝成row類型)
package com.imooc.bigdata.chapter04
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object InteroperatingRDDApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").appName("DatasetApp").getOrCreate()
//runInferSchema(spark)
runProgrammaticSchema(spark)
spark.stop()
}
/**
* 第二種方式:自定義編程
*/
def runProgrammaticSchema(spark:SparkSession): Unit = {
import spark.implicits._
// step1
val peopleRDD: RDD[String] = spark.sparkContext.textFile("E:\\06-work\\03-java\\01-JavaCodeDome\\SparkSqlCode\\sparksql-train\\data\\people.txt")
val peopleRowRDD: RDD[Row] = peopleRDD.map(_.split(",")) // RDD
.map(x => Row(x(0), x(1).trim.toInt))
// step2
val struct =
StructType(
StructField("name", StringType, true) ::
StructField("age", IntegerType, false) ::Nil)
// step3
val peopleDF: DataFrame = spark.createDataFrame(peopleRowRDD, struct)
peopleDF.show()
peopleRowRDD
}
/**
* 第一種方式:反射
* 1)定義case class
* 2)RDD map,map中每一行數據轉成case class
*/
def runInferSchema(spark: SparkSession): Unit = {
import spark.implicits._
val peopleRDD: RDD[String] = spark.sparkContext.textFile("E:\\06-work\\03-java\\01-JavaCodeDome\\SparkSqlCode\\sparksql-train\\data\\people.txt")
//TODO... RDD => DF
val peopleDF: DataFrame = peopleRDD.map(_.split(",")) //RDD
.map(x => People(x(0), x(1).trim.toInt)) //RDD
.toDF()
//peopleDF.show(false)
peopleDF.createOrReplaceTempView("people")
val queryDF: DataFrame = spark.sql("select name,age from people where age between 19 and 29")
//queryDF.show()
//queryDF.map(x => "Name:" + x(0)).show() // from index
queryDF.map(x => "Name:" + x.getAs[String]("name")).show // from field
}
case class People(name:String, age:Int)
}
3)RDD轉DS
將RDD的每一行封裝成樣例類,再調用toDS方法
4)DF轉DS
根據row字段定義樣例類,再調用asDS方法[樣例類]
package com.imooc.bigdata.chapter04
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object DatasetApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").appName("DatasetApp").getOrCreate()
import spark.implicits._
val ds: Dataset[Person] = Seq(Person("PK","30")).toDS()
//ds.show()
val primitiveDS: Dataset[Int] = Seq(1,2,3).toDS()
//primitiveDS.map(x => x+1).collect().foreach(println)
val peopleDF: DataFrame = spark.read.json("E:\\06-work\\03-java\\01-JavaCodeDome\\SparkSqlCode\\sparksql-train\\data\\people.json")
val peopleDS: Dataset[Person] = peopleDF.as[Person]
// peopleDS.show(false)
// 是在運行期報錯
//peopleDF.select("anme").show()
peopleDS.map(x => x.name).show() //編譯期報錯
spark.stop()
}
case class Person(name: String, age: String)
}
特別注意:
在使用一些特殊的操作時,一定要加上 import spark.implicits._ 不然toDF、toDS無法使用
