Spark DataFrame及RDD與DataSet轉換成DataFrame


Spark DataFrame及RDD與DataSet轉換成DataFrame

一、什么是DataFrame

        DataFrame和RDD一樣,也是Spark的一種彈性分布式數據集,它是一個由列組成的數據集,概念上等同於關系型數據庫中的一張表。DataFrame可以從非常寬泛的數據源中的構建,比如結構化的數據文件,Hive中的表,外部數據庫,或者已經創建好的RDDs等等。在Scala和Java中,DataFrame由行數據集表示。在Scala API中,DataFrame 可以簡單看成DataSer[Row],而在Java API中,使用DataSet<Row>表示DataFrame。有人肯定會問,已經有了彈性分布式數據集RDD,為什么還要引入DataFrame呢?因為在Spark中,我們可以像在關系型數據庫中使用SQL操作數據庫表一樣,使用Spark SQL操作DataFrame。這讓熟悉關系型數據庫SQL人員也能輕松掌握。

 

二、創建DataFrame

        首先導入Spark Core、Spark SQL、和Hadoop Client,pom.xml文件如下:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.leboop</groupId> <artifactId>mahout</artifactId> <version>1.0-SNAPSHOT</version> <properties> <!-- scala版本號 --> <scala.version>2.11</scala.version> <!-- spark版本號 --> <spark.version>2.3.0</spark.version> <!-- hadoop版本 --> <hadoop.version>2.7.3</hadoop.version> </properties> <dependencies> <!-- spark core--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- spark sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop客戶端,用於操作HDFS --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> </project>

 

下面使用三種方式創建DataFrame

itemdata.data文件數據格式如下:

0162381440670851711,4,7.0
0162381440670851711,11,4.0
0162381440670851711,32,1.0
0162381440670851711,176,27.0
0162381440670851711,183,11.0
0162381440670851711,184,5.0
0162381440670851711,207,9.0
0162381440670851711,256,3.0
0162381440670851711,258,4.0

第一列是user_id,第二列是item_id,第三列是score。

1、以csv格式讀取文件

步驟:

(1)使用SparkSesstion創建Spark SQL的切入點spark,

(2)spark以csv格式讀取HDFS文件系統/input/mahout-demo/目錄下的文件itemdata.data,

讀取的結果就是是DataFrame數據集,程序如下:

package com.leboop.rdd import org.apache.spark.sql.SparkSession /** * DataFrame創建Demo */ object DataFrameDemo { def main(args: Array[String]): Unit = { //創建Spark SQL的切入點(RDD的切入點是SparkContext) val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate() //以csv格式讀取文件,直接生成DataFrame數據集 val dataDF = spark.read.csv("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data") //打印前三行 dataDF.show(3) } } 

運行結果如下:

+-------------------+---+---+
|                _c0|_c1|_c2|
+-------------------+---+---+
|0162381440670851711|  4|7.0|
|0162381440670851711| 11|4.0|
|0162381440670851711| 32|1.0|
+-------------------+---+---+
only showing top 3 rows

默認DataFrame的列名為_cn(n=0,1,2.……)。后面我們會說明如何指定列名

 

2、以一般格式讀取文件

        同樣使用創建好的spark切入點,調用textFile函數讀取文件itemdata.data,生成的是DataSet數據集,數據文件的每一行內容作為整體是DataSet的一列,首先需要map算子將數據按“,”分詞,得到三個元素的數組作為整體,再次使用map將數組編程具有三列的DataSet,最后使用toDF()函數將其轉換成DataFrame數據集,這里我們在toDF()函數中指明了每個列的列名。程序如下:

package com.leboop.rdd import org.apache.spark.sql.SparkSession /** * DataFrame創建Demo */ object DataFrameDemo { def main(args: Array[String]): Unit = { //創建Spark SQL的切入點(RDD的切入點是SparkContext) val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate() //spark讀取一般文件,生成的是DataSet數據集 val dataDataSet = spark.read.textFile("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data") //使用toDF()函數將DataSet轉換成DataFrame import spark.implicits._ val dataDF2 = dataDataSet.map(x=>x.split(",")).map(x=>(x(0),x(1),x(2))).toDF("user_id","item_id","score") //打印前三行 dataDF2.show(3) } } 

程序運行結果:

+-------------------+-------+-----+
|            user_id|item_id|score|
+-------------------+-------+-----+
|0162381440670851711|      4|  7.0|
|0162381440670851711|     11|  4.0|
|0162381440670851711|     32|  1.0|
+-------------------+-------+-----+
only showing top 3 rows

 

3、以SparkContext作為切入點讀取文件

        Spark SQL的切入點SparkSesstion封裝了Spark Core的切入點SparkContext,所以SparkContext對象可以由SparkSesstion創建。SparkContext讀取文件生成的是RDD數據集,轉換如下:

package com.leboop.rdd import org.apache.spark.sql.SparkSession /** * DataFrame創建Demo */ object DataFrameDemo { def main(args: Array[String]): Unit = { //創建Spark SQL的切入點(RDD的切入點是SparkContext) val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate() val rdd = spark.sparkContext.textFile("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data") val dataDF3 = rdd.map(x=>x.split(",")).map(x=>(x(0),x(1),x(2))).toDF() dataDF3.show(3) } } 

程序運行結果

+-------------------+---+---+
|                 _1| _2| _3|
+-------------------+---+---+
|0162381440670851711|  4|7.0|
|0162381440670851711| 11|4.0|
|0162381440670851711| 32|1.0|
+-------------------+---+---+
only showing top 3 rows

DataFrame的列名默認為_n(n=1,2,3,……)。

 

4、指定列名

通過定義case class Data實體類,為DataFrame每個列指定列名,代碼如下:

package com.leboop.rdd import org.apache.spark.sql.SparkSession /** * DataFrame創建Demo */ case class Data(user_id:String,item_id:String,score:Double) object DataFrameDemo { def main(args: Array[String]): Unit = { //創建Spark SQL的切入點(RDD的切入點是SparkContext) val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate() //以csv格式讀取文件,直接生成DataFrame數據集 val dataDF = spark.read.csv("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data").toDF("user_id","item_id","score") //打印前三行 dataDF.show(3) //spark讀取一般文件,生成的是DataSet數據集 val dataDataSet = spark.read.textFile("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data") //使用toDF()函數將DataSet轉換成DataFrame import spark.implicits._ val dataDF2 = dataDataSet.map(x=>x.split(",")).map(x=>Data(x(0),x(1),x(2).toDouble)).toDF() //打印前三行 dataDF2.show(3) val rdd = spark.sparkContext.textFile("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data") val dataDF3 = rdd.map(x=>x.split(",")).map(x=>Data(x(0),x(1),x(2).toDouble)).toDF() dataDF3.show(3) } } 

三種方式程序運行結果都是:

+-------------------+-------+-----+
|            user_id|item_id|score|
+-------------------+-------+-----+
|0162381440670851711|      4|  7.0|
|0162381440670851711|     11|  4.0|
|0162381440670851711|     32|  1.0|
+-------------------+-------+-----+
only showing top 3 rows

注:關於切入點的問題,其實需要搞清楚Spark的組成,它主要有五個部分組成:

(1)Spark Core

Spark核心部分,操作RDD數據集

(2)Spark SQL

像關系型數據一樣,使用SQL操作DataFrame數據集

(3)Spark Streaming

實時數據流處理

(4)Spark MLlib

機器學習算法庫

(5)GraphX

圖計算

 

三、Spark SQL操作DataFrame

1、創建臨時視圖

基本步驟:

(1)創建DataFrame

(2)由DataFrame創建臨時視圖

(3)寫Spark SQL操作臨時視圖

代碼如下:

package com.leboop.rdd import org.apache.spark.sql.SparkSession /** * DataFrame創建Demo */ case class Data(user_id:String,item_id:String,score:Double) object DataFrameDemo { def main(args: Array[String]): Unit = { //創建Spark SQL的切入點(RDD的切入點是SparkContext) val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate() //以csv格式讀取文件,直接生成DataFrame數據集 val dataDF = spark.read.csv("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data").toDF("user_id","item_id","score") //創建臨時視圖 dataDF.createOrReplaceTempView("data") //sql查詢 spark.sql("select * from data where score>10").show(3) } } 

我們創建DataFrame后,創建了一個名為data的臨時視圖(SparkSession關閉后,臨時視圖立即會失效),然后寫了Spark SQL查詢評分大於10分的所有行,並打印前3行。運行結果如下:

+-------------------+-------+-----+
|            user_id|item_id|score|
+-------------------+-------+-----+
|0162381440670851711|    176| 27.0|
|0162381440670851711|    183| 11.0|
|0162381440670851711|    259| 16.0|
+-------------------+-------+-----+
only showing top 3 rows

 

2、創建全局臨時視圖

package com.leboop.rdd import org.apache.spark.sql.SparkSession /** * DataFrame創建Demo */ case class Data(user_id:String,item_id:String,score:Double) object DataFrameDemo { def main(args: Array[String]): Unit = { //創建Spark SQL的切入點(RDD的切入點是SparkContext) val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate() //以csv格式讀取文件,直接生成DataFrame數據集 val dataDF = spark.read.csv("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data").toDF("user_id","item_id","score") //創建全局臨時視圖 dataDF.createGlobalTempView("data") //sql查詢 spark.sql("select * from global_temp.data where score>10").show(3) } } 

首先我們創建了一張全局臨時視圖,然后使用Spark SQL查詢視圖。注意全局臨時視圖存放在系統隱藏的數據庫global_temp中,訪問data臨時視圖時需要使用global_temp.data。全局臨時視圖,在一個Spark Sesstion關閉后,其他的Spark Sesstion可以繼續使用,知道Spark應用被關閉。

程序運行結果

+-------------------+-------+-----+
|            user_id|item_id|score|
+-------------------+-------+-----+
|0162381440670851711|    176| 27.0|
|0162381440670851711|    183| 11.0|
|0162381440670851711|    259| 16.0|
+-------------------+-------+-----+
only showing top 3 rows

 

3、不創建臨時視圖

如下:

package com.leboop.rdd import org.apache.spark.sql.SparkSession /** * DataFrame創建Demo */ case class Data(user_id:String,item_id:String,score:Double) object DataFrameDemo { def main(args: Array[String]): Unit = { //創建Spark SQL的切入點(RDD的切入點是SparkContext) val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate() //以csv格式讀取文件,直接生成DataFrame數據集 val dataDF = spark.read.csv("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data").toDF("user_id","item_id","score") //打印dataDF的數據結構 dataDF.printSchema() //查詢user_id列 dataDF.select("user_id","score").show(3) //過濾出評分大於10的所有數據 dataDF.filter("score>10").show(3) //查詢user_id,score,score加上10 起別名score dataDF.selectExpr("user_id","score","score+10 as score").show(3) //使用$符號查詢,需要隱士轉換 import spark.implicits._ dataDF.select($"user_id",$"score",$"score"+10).show(3) } } 

程序運行結果如下:

root
 |-- user_id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- score: string (nullable = true)



+-------------------+-----+
|            user_id|score|
+-------------------+-----+
|0162381440670851711|  7.0|
|0162381440670851711|  4.0|
|0162381440670851711|  1.0|
+-------------------+-----+
only showing top 3 rows



+-------------------+-------+-----+
|            user_id|item_id|score|
+-------------------+-------+-----+
|0162381440670851711|    176| 27.0|
|0162381440670851711|    183| 11.0|
|0162381440670851711|    259| 16.0|
+-------------------+-------+-----+
only showing top 3 rows




+-------------------+-----+-----+
|            user_id|score|score|
+-------------------+-----+-----+
|0162381440670851711|  7.0| 17.0|
|0162381440670851711|  4.0| 14.0|
|0162381440670851711|  1.0| 11.0|
+-------------------+-----+-----+
only showing top 3 rows




+-------------------+-----+------------+
|            user_id|score|(score + 10)|
+-------------------+-----+------------+
|0162381440670851711|  7.0|        17.0|
|0162381440670851711|  4.0|        14.0|
|0162381440670851711|  1.0|        11.0|
+-------------------+-----+------------+
only showing top 3 rows


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM