文章目錄
需求概述
將RDD轉換得到DataFrame,主要有兩種方法:利用反射機制 和 通過編程結構與RDD進行交互。
步驟
一、創建Maven工程並導包
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!-- <verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
二、選用第一種方法:利用反射機制配合樣例類構建DataFrame
開發代碼
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
// 定義person case class
case class Person(id: Int,name: String,age: Int)
object SparkDF {
def main(args: Array[String]): Unit = {
// 獲取SparkSession
val sparkSession: SparkSession = SparkSession.builder().appName("SparkDF").master("local[2]").config("spark.driver.host", "localhost").getOrCreate()
// 獲取SparkContext
val sparkContext: SparkContext = sparkSession.sparkContext
// 過濾篩選日志
sparkContext.setLogLevel("WARN")
//讀取文件
val fileRDD: RDD[String] = sparkContext.textFile("/Users/zhaozhuang/Desktop/4、Spark/Spark第三天/Spark第三天教案/資料/person.txt")
//按照分隔符切割數據
val splitRDD: RDD[Array[String]] = fileRDD.map(x => x.split(" "))
//關聯person case class
val personRDD: RDD[Person] = splitRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
//准備把personRDD轉換成DataFrame
//首先要導包
import sparkSession.implicits._
val personDF: DataFrame = personRDD.toDF()
//todo 至此已經成功通過反射機制構建DataFrame
/** * 接下來可以使用DFL語法和SQL語法驗證是否構建成功 */
println("*******************************************DSL語法測試開始********************************************")
//查看全表數據 show()默認展示前20條數據
personDF.show()
//查看指定字段數據
personDF.select($"name",$"age").show()
//查看schema
personDF.printSchema()
//過濾篩選數據
personDF.filter($"age" > 25).show()
println("*******************************************DSL語法測試結束********************************************")
println("*******************************************SQL語法測試開始********************************************")
// 將DataFrame注冊為一張table,有三種方法
//第一種 已過時
val person1: Unit = personDF.registerTempTable("person1")
//第二種
val person2: Unit = personDF.createTempView("person2")
//第三種 (推薦)
val person3: Unit = personDF.createOrReplaceTempView("person3")
//打印三鍾表的數據
sparkSession.sql("select * from person1").show()
sparkSession.sql("select * from person2").show()
sparkSession.sql("select * from person3").show()
//實現left join操作
sparkSession.sql("select * from person1 p1 left join person2 p2 on p1.id = p2.id").show()
println("*******************************************SQL語法測試結束********************************************")
sparkContext.stop()
sparkSession.close()
}
}
控制台結果
*******************************************DSL語法測試開始********************************************
+---+--------+---+
| id| name|age|
+---+--------+---+
| 1|zhangsan|108|
| 2| lisi| 28|
| 3| wangwu| 58|
| 4| zhaoliu| 24|
| 5| zhuqi| 35|
| 6| qiuba| 28|
+---+--------+---+
+--------+---+
| name|age|
+--------+---+
|zhangsan|108|
| lisi| 28|
| wangwu| 58|
| zhaoliu| 24|
| zhuqi| 35|
| qiuba| 28|
+--------+---+
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
+---+--------+---+
| id| name|age|
+---+--------+---+
| 1|zhangsan|108|
| 2| lisi| 28|
| 3| wangwu| 58|
| 5| zhuqi| 35|
| 6| qiuba| 28|
+---+--------+---+
*******************************************DSL語法測試結束********************************************
*******************************************SQL語法測試開始********************************************
+---+--------+---+
| id| name|age|
+---+--------+---+
| 1|zhangsan|108|
| 2| lisi| 28|
| 3| wangwu| 58|
| 4| zhaoliu| 24|
| 5| zhuqi| 35|
| 6| qiuba| 28|
+---+--------+---+
+---+--------+---+
| id| name|age|
+---+--------+---+
| 1|zhangsan|108|
| 2| lisi| 28|
| 3| wangwu| 58|
| 4| zhaoliu| 24|
| 5| zhuqi| 35|
| 6| qiuba| 28|
+---+--------+---+
+---+--------+---+
| id| name|age|
+---+--------+---+
| 1|zhangsan|108|
| 2| lisi| 28|
| 3| wangwu| 58|
| 4| zhaoliu| 24|
| 5| zhuqi| 35|
| 6| qiuba| 28|
+---+--------+---+
+---+--------+---+---+--------+---+
| id| name|age| id| name|age|
+---+--------+---+---+--------+---+
| 1|zhangsan|108| 1|zhangsan|108|
| 6| qiuba| 28| 6| qiuba| 28|
| 3| wangwu| 58| 3| wangwu| 58|
| 5| zhuqi| 35| 5| zhuqi| 35|
| 4| zhaoliu| 24| 4| zhaoliu| 24|
| 2| lisi| 28| 2| lisi| 28|
+---+--------+---+---+--------+---+
*******************************************SQL語法測試結束********************************************
Process finished with exit code 0
選用第二種方法:通過StrucType配合Row構建DataFrame
開發代碼
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object SparkDF2 {
def main(args: Array[String]): Unit = {
//獲取SparkSession
val sparkSession: SparkSession = SparkSession.builder().appName("saprkDF2").master("local[2]").config("spark.driver.host", "localhost").getOrCreate()
//獲取SparkContext
val sparkContext: SparkContext = sparkSession.sparkContext
//設置日志篩選
sparkContext.setLogLevel("WARN")
//讀取文件
val fileRDD: RDD[String] = sparkContext.textFile("/Users/zhaozhuang/Desktop/4、Spark/Spark第三天/Spark第三天教案/資料/person.txt")
//對數據進行切割
val arrayRDD: RDD[Array[String]] = fileRDD.map(x => x.split(" "))
//將arrayRDD轉換為row
val rowRDD: RDD[Row] = arrayRDD.map(x => Row(x(0).toInt, x(1), x(2).toInt))
//獲取StructType對象
val structType = new StructType().add("id",IntegerType).add("name",StringType).add("age",IntegerType)
//需要兩個參數 RDD[Row] 和 StructType
val personDF: DataFrame = sparkSession.createDataFrame(rowRDD, structType)
//接下來DSL和SQL的操作和之前相同
personDF.printSchema()
sparkContext.stop()
sparkSession.close()
}
}
控制台結果
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
Process finished with exit code 0