歡迎大家關注我的公眾號,“互聯網西門二少”,我將繼續輸出我的技術干貨~

該部分分為兩篇,分別介紹RDD與Dataset/DataFrame:
二、DataSet/DataFrame
該篇主要介紹DataSet與DataFrame。
一、生成DataFrame
1.1.通過case class構造DataFrame
package com.personal.test
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
object DataFrameTest {
case class Person(id: Int, name: String, age: Int)
def main(args: Array[String]): Unit = {
val dataPath = "hdfs://192.168.60.164:9000/user/name/input"
val spark = SparkSession
.builder()
.appName("DataFrameTest")
.getOrCreate()
import spark.implicits._
val caseClassDF = Seq(Person(1, "lily", 18), Person(2, "lucy", 20)).toDF("id", "name", "age")
println("=========== show: ============")
caseClassDF.show()
println("=========== schema: ===========")
caseClassDF.printSchema()
spark.stop()
}
}
這里通過“import spark.implicits._”使用了隱式Encoder,否則無法調用“.toDF()”,這里的spark為上面定義的sparkSession變量,並不是“import org.apache.spark”,注意不要混淆。
運行結果:

可以看到,我們將兩個Person實例封裝為DataFrame實例,之后便可以像操作表/視圖一樣來對其進行其他處理了。
1.2.通過數值構造DataFrame
package com.personal.test
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
object DataFrameTest {
def main(args: Array[String]): Unit = {
val dataPath = "hdfs://192.168.60.164:9000/user/name/input"
val spark = SparkSession
.builder()
.appName("DataFrameTest")
.getOrCreate()
import spark.implicits._
val valueDF = Seq(1, 2, 3).toDF("id")
println("=========== show: ============")
valueDF.show()
println("=========== schema: ===========")
valueDF.printSchema()
spark.stop()
}
}
通過這個和上面的例子可以看到,我們可以通過Seq將任何值類型對象轉換為DataFrame對象,Seq類似於Java的List。
運行結果:

1.3.通過SparkSession讀取數據
package com.personal.test
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
object DataFrameTest {
case class Person(id: Int, name: String, age: Int)
def main(args: Array[String]): Unit = {
val dataPath = "hdfs://192.168.60.164:9000/user/name/input"
val spark = SparkSession
.builder()
.appName("DataFrameTest")
.getOrCreate()
// 默認將整行定義為value: String, 可以為空
val hdfsDF = spark.read.text(dataPath)
println("=========== show: ============")
hdfsDF.show()
println("=========== schema: ===========")
hdfsDF.printSchema()
val personEncoder: Encoder[Person] = Encoders.product
val personDF = hdfsDF.map(row => {
//val value = row.getAs[String]("value")
val value = row.getString(0)
val fields = value.trim.split(",")
Person(Integer.parseInt(fields(0)), fields(1), Integer.parseInt(fields(2)))
})(personEncoder).toDF()
println("=========== show: ============")
personDF.show()
println("=========== schema: ===========")
personDF.printSchema()
spark.stop()
}
}
運行結果:

可以看到,“spark.read.text(dataPath)”默認將文件中的一行定義為String類型的value字段,可以通過get(0)、getString(0)或getAs[String]("value")來獲取value的內容。
這里我們沒有使用“import spark.implicits._”將Person隱式Encoder,而是通過“val personEncoder: Encoder[Person] = Encoders.product”顯式定義一個Encoder[Person]類型的變量,並調用“hdfsDF.map(...)(personEncoder)”來顯式Encoder,並在map之后調用".toDF"將map的結果轉換為DataFrame,map的結果為DataSet類型,所以DataSet可以直接調用“.toDF”轉換為DataFrame。如果使用“import spark.implicits._” ,就不需要定義“personEncoder”變量,也不需要為map的最后一個參數賦值。
1.4.通過RDD轉換為DataFrame
package com.personal.test
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
object DataFrameTest {
case class Person(id: Int, name: String, age: Int)
def main(args: Array[String]): Unit = {
val dataPath = "hdfs://192.168.60.164:9000/user/name/input"
val spark = SparkSession
.builder()
.appName("DataFrameTest")
.getOrCreate()
import spark.implicits._
val rddDF = spark.sparkContext.textFile(dataPath)
.map(row => row.split(","))
.map(fields => Person(Integer.parseInt(fields(0)), fields(1), Integer.parseInt(fields(2))))
.toDF("id", "name", "age")
println("=========== show: ============")
rddDF.show()
println("=========== schema: ===========")
rddDF.printSchema()
spark.stop()
}
}
可以看到,RDD轉換為DataFrame與通過Seq生成DataFrame一樣,都需要“import spark.implicits._”,否則無法調用“.toDF”。
運行結果:

二、生成DataSet
2.1.通過case class構造DataSet
package com.personal.test
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
object DataSetTest {
case class Person(id: Int, name: String, age: Int)
def main(args: Array[String]): Unit = {
val dataPath = "hdfs://192.168.60.164:9000/user/name/input"
val spark = SparkSession
.builder()
.appName("DataSetTest")
.getOrCreate()
import spark.implicits._
val caseClassDs = Seq(Person(1, "lily", 18), Person(2, "lucy", 20))
.toDS()
println("=========== show: ============")
caseClassDs.show()
println("=========== schema: ===========")
caseClassDs.printSchema()
spark.stop()
}
}
可以看到,類似於1.1,只需要將“.toDF”換為“.toDS”即可得到DataSet類型的結果。
運行結果:

可以看到其結構等於DF。
2.2.通過數值構造DataSet
package com.personal.test
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
object DataSetTest {
case class Person(id: Int, name: String, age: Int)
def main(args: Array[String]): Unit = {
val dataPath = "hdfs://192.168.60.164:9000/user/name/input"
val spark = SparkSession
.builder()
.appName("DataSetTest")
.getOrCreate()
import spark.implicits._
val valueDs = Seq(1, 2, 3).toDS()
println("=========== show: ============")
valueDs.show()
println("=========== schema: ===========")
valueDs.printSchema()
spark.stop()
}
}
運行結果:

可以看到,其結構類似於DF,但是列名默認為value。
2.3.通過SparkSession讀取數據
package com.personal.test
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
object DataSetTest {
case class Person(id: Int, name: String, age: Int)
def main(args: Array[String]): Unit = {
val dataPath = "hdfs://192.168.60.164:9000/user/name/input"
val spark = SparkSession
.builder()
.appName("DataSetTest")
.getOrCreate()
// 默認將整行定義為value: String, 可以為空
val hdfsDF = spark.read.text(dataPath)
println("=========== show: ============")
hdfsDF.show()
println("=========== schema: ===========")
hdfsDF.printSchema()
val personEncoder: Encoder[Person] = Encoders.product
val personDs = hdfsDF.map(row => {
val value = row.getAs[String]("value")
//val value = row.getString(0)
val fields = value.trim.split(",")
Person(Integer.parseInt(fields(0)), fields(1), Integer.parseInt(fields(2)))
})(personEncoder)
println("=========== show: ============")
personDs.show()
println("=========== schema: ===========")
personDs.printSchema()
spark.stop()
}
}
通過1.3與2.3可以看到,SparkSession讀取文件(SparkSession.read.*)得到的為DataFrame,DataFrame經過map、filter等操作后得到的為DataSet,DataSet又可以通過“.toDF”轉換為DataFrame,這也印證了官網對DataFrame的定義:
type DataFrame = DataSet[Row]
2.4.通過RDD轉換為DataSet
package com.personal.test
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
object DataSetTest {
case class Person(id: Int, name: String, age: Int)
def main(args: Array[String]): Unit = {
val dataPath = "hdfs://192.168.60.164:9000/user/name/input"
val spark = SparkSession
.builder()
.appName("DataSetTest")
.getOrCreate()
import spark.implicits._
val rddDS = spark.sparkContext.textFile(dataPath)
.map(row => row.split(","))
.map(fields => Person(Integer.parseInt(fields(0)), fields(1), Integer.parseInt(fields(2))))
.toDS()
println("=========== show: ============")
rddDS.show()
println("=========== schema: ===========")
rddDS.printSchema()
spark.stop()
}
}
類似於1.4,只需要將“.toDF”替換為“.toDS”即可得到DataSet類型的結果。
通過以上(一)、(二)內容我們看到了如何將文件、數值、對象、RDD轉換為DataSet/DataFrame,以及DataSet與DataFrame之間如何互轉,如何隱式/顯式使用Encoder。
接下來介紹基於DataSet/DataFrame可以進行哪些操作。
三、基於DataSet/DataFrame的操作
除了上例中用到的map,還有filter、count、first、foreach、reduce等同樣可以基於RDD進行的操作,此外,還有幾個特殊操作:
3.1. select
select可以接受一個或多個參數,用於從DataSet/DataFrame中獲取指定字段。
3.2. createOrReplaceTempView
createOrReplaceTempView用於將DataSet/DataFrame的數據臨時保存為視圖,方便后續使用SparkSession.sql()進行操作,Session結束時生命周期結束。
3.3. printSchema
如(一)(二)中示例所示,printSchema用於打印DataSet/DataFrame數據集的樹形結構定義。
3.4. withColumnRenamed
withColumnRenamed用於對列重命名,類似於sql語句“select a as aa, b as bb from table”中的as。
3.5. join
join用於按指定的join表達式與join類型(默認為inner join)將另一個DataSet/DataFrame與當前DataSet/DataFrame合並。
這里舉一個不完整的例子,用以演示基於DataSet/DataFrame的操作。
val exposureLogEncoder: Encoder[ExposureLog] = Encoders.product
val exposureLogTupleEncoder: Encoder[Tuple1[ExposureLog]] = Encoders.product
// 計數器
val dataCounter = spark.sparkContext.longAccumulator("dataCounter")
val legalDataCounter = spark.sparkContext.longAccumulator("legalDataCounter")
val illegalAvCounter = spark.sparkContext.longAccumulator("illegalAvCounter")
val illegalKvCounter = spark.sparkContext.longAccumulator("illegalKvCounter")
val illegalReportkeyCounter = spark.sparkContext.longAccumulator("illegalReportkeyCounter")
// 1.曝光日志: select id,ei,av,ui,kv from mta_t_boss_v1_5362
val exposureLogDF = HiveUtil.readTableByPartition(tdw, exposure, spark, partition)
if (exposureLogDF == null) {
System.exit(2)
}
val exposureLogDS = exposureLogDF.select("id", "ei", "ei", "av", "ui", "kv", "log_time")
.filter(row => {
dataCounter.add(1)
val av = row.getAs[String]("av")
if (av == null
|| av.trim.startsWith("1.6.2")
|| av.compareTo("0.9.5")<0) {
illegalAvCounter.add(1)
false
}
else true
})
.filter(row => {
val kv = row.getAs[String]("kv")
if (kv == null || kv.trim.length == 0) {
illegalKvCounter.add(1)
false
}
else true
})
.map(row =>parseExposure(row))(exposureLogTupleEncoder)
.filter(exposure => {
val obj = exposure._1
if (obj == null) {
illegalReportkeyCounter.add(1)
false
}
else {
legalDataCounter.add(1)
true
}
})
.map(row => row._1)(exposureLogEncoder)
val resultCount = exposureLogDS.persist().count()
println(s"Data count: ${dataCounter.value}")
println(s"Legal data count: ${legalDataCounter.value}")
println(s"Result count: ${resultCount}")
println(s"Illegal av count: ${illegalAvCounter.value}")
println(s"Illegal kv count: ${illegalKvCounter.value}")
println(s"Illegal reportKey count: ${illegalReportkeyCounter.value}")
// 1.save log info to hdfs
exposureLogDS.persist().map(exposure => exposure.toString())(Encoders.STRING)
.write.mode(SaveMode.Overwrite).text(logSavePath)
println(s"[INFO] save log to ${logSavePath} success.")
exposureLogDS.persist().select("sign", "channel").createOrReplaceTempView("log")
val sql = "select sign, channel, count(*) as data_count " +
"from log " +
"group by sign, channel"
val aggDF = ss.sql(sql)
aggDF.printSchema()
// 2.save log statistics info to hdfs
aggDF.map(row => row.mkString(","))(Encoders.STRING)
.repartition(1)
.write.mode(SaveMode.Overwrite)
.text(logStatisticsInfoSavePath)
println(s"[INFO] save statistics info to ${logStatisticsInfoSavePath} success.")
注:HiveUtil.readTableByPartition()為自定義函數,用於從hive中讀取指定數據庫/表/分區的數據,結果為DataFrame類型。
上例從hive中讀取數據后,使用select獲取指定字段,然后使用filter根據指定字段進行非法數據過濾,之后再調用map進行數據預處理、解析等工作,之后在調用filter進行空數據過濾,最后使用map對Tuple1拆箱。之后將處理結果通過map構造為字符串並保存到hdfs,同時使用createOrReplaceTempView創建臨時視圖,再通過SparkSession.sql對視圖進行聚合操作,以統計sign,channel緯度的記錄數,之后使用printSchema打印sql操作后數據集的schema結構,最后將聚合后的統計信息通過map構造為字符串保存到hdfs。可以看到日常數據處理過程中會經常遇到如上例一般的需求。
另外,提一下,使用Accumulator的時候要保證只執行一次action操作,否則需要執行cache或者persist來保證計數器不重復計數,如上例中重復使用了exposureLogDS,如果不執行persist/cache會導致計數器重復計數。
另外,注意例中第一個map返回的結構為Tuple1[ExposureLog],之所以將ExposureLog又包了一層,是因為“Product type is represented as a row, and the entire row can not be null in Spark SQL like normal databases”,所以如果需要返回null對象,就需要對其裝箱,使返回值為非空對象,再在后續流程(如最后一個filter 、map)中拆箱。
其他操作可以參考官網API:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset

