Spark DataFrame及RDD与DataSet转换成DataFrame


Spark DataFrame及RDD与DataSet转换成DataFrame

一、什么是DataFrame

        DataFrame和RDD一样,也是Spark的一种弹性分布式数据集,它是一个由列组成的数据集,概念上等同于关系型数据库中的一张表。DataFrame可以从非常宽泛的数据源中的构建,比如结构化的数据文件,Hive中的表,外部数据库,或者已经创建好的RDDs等等。在Scala和Java中,DataFrame由行数据集表示。在Scala API中,DataFrame 可以简单看成DataSer[Row],而在Java API中,使用DataSet<Row>表示DataFrame。有人肯定会问,已经有了弹性分布式数据集RDD,为什么还要引入DataFrame呢?因为在Spark中,我们可以像在关系型数据库中使用SQL操作数据库表一样,使用Spark SQL操作DataFrame。这让熟悉关系型数据库SQL人员也能轻松掌握。

 

二、创建DataFrame

        首先导入Spark Core、Spark SQL、和Hadoop Client,pom.xml文件如下:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.leboop</groupId> <artifactId>mahout</artifactId> <version>1.0-SNAPSHOT</version> <properties> <!-- scala版本号 --> <scala.version>2.11</scala.version> <!-- spark版本号 --> <spark.version>2.3.0</spark.version> <!-- hadoop版本 --> <hadoop.version>2.7.3</hadoop.version> </properties> <dependencies> <!-- spark core--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- spark sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop客户端,用于操作HDFS --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> </project>

 

下面使用三种方式创建DataFrame

itemdata.data文件数据格式如下:

0162381440670851711,4,7.0
0162381440670851711,11,4.0
0162381440670851711,32,1.0
0162381440670851711,176,27.0
0162381440670851711,183,11.0
0162381440670851711,184,5.0
0162381440670851711,207,9.0
0162381440670851711,256,3.0
0162381440670851711,258,4.0

第一列是user_id,第二列是item_id,第三列是score。

1、以csv格式读取文件

步骤:

(1)使用SparkSesstion创建Spark SQL的切入点spark,

(2)spark以csv格式读取HDFS文件系统/input/mahout-demo/目录下的文件itemdata.data,

读取的结果就是是DataFrame数据集,程序如下:

package com.leboop.rdd import org.apache.spark.sql.SparkSession /** * DataFrame创建Demo */ object DataFrameDemo { def main(args: Array[String]): Unit = { //创建Spark SQL的切入点(RDD的切入点是SparkContext) val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate() //以csv格式读取文件,直接生成DataFrame数据集 val dataDF = spark.read.csv("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data") //打印前三行 dataDF.show(3) } } 

运行结果如下:

+-------------------+---+---+
|                _c0|_c1|_c2|
+-------------------+---+---+
|0162381440670851711|  4|7.0|
|0162381440670851711| 11|4.0|
|0162381440670851711| 32|1.0|
+-------------------+---+---+
only showing top 3 rows

默认DataFrame的列名为_cn(n=0,1,2.……)。后面我们会说明如何指定列名

 

2、以一般格式读取文件

        同样使用创建好的spark切入点,调用textFile函数读取文件itemdata.data,生成的是DataSet数据集,数据文件的每一行内容作为整体是DataSet的一列,首先需要map算子将数据按“,”分词,得到三个元素的数组作为整体,再次使用map将数组编程具有三列的DataSet,最后使用toDF()函数将其转换成DataFrame数据集,这里我们在toDF()函数中指明了每个列的列名。程序如下:

package com.leboop.rdd import org.apache.spark.sql.SparkSession /** * DataFrame创建Demo */ object DataFrameDemo { def main(args: Array[String]): Unit = { //创建Spark SQL的切入点(RDD的切入点是SparkContext) val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate() //spark读取一般文件,生成的是DataSet数据集 val dataDataSet = spark.read.textFile("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data") //使用toDF()函数将DataSet转换成DataFrame import spark.implicits._ val dataDF2 = dataDataSet.map(x=>x.split(",")).map(x=>(x(0),x(1),x(2))).toDF("user_id","item_id","score") //打印前三行 dataDF2.show(3) } } 

程序运行结果:

+-------------------+-------+-----+
|            user_id|item_id|score|
+-------------------+-------+-----+
|0162381440670851711|      4|  7.0|
|0162381440670851711|     11|  4.0|
|0162381440670851711|     32|  1.0|
+-------------------+-------+-----+
only showing top 3 rows

 

3、以SparkContext作为切入点读取文件

        Spark SQL的切入点SparkSesstion封装了Spark Core的切入点SparkContext,所以SparkContext对象可以由SparkSesstion创建。SparkContext读取文件生成的是RDD数据集,转换如下:

package com.leboop.rdd import org.apache.spark.sql.SparkSession /** * DataFrame创建Demo */ object DataFrameDemo { def main(args: Array[String]): Unit = { //创建Spark SQL的切入点(RDD的切入点是SparkContext) val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate() val rdd = spark.sparkContext.textFile("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data") val dataDF3 = rdd.map(x=>x.split(",")).map(x=>(x(0),x(1),x(2))).toDF() dataDF3.show(3) } } 

程序运行结果

+-------------------+---+---+
|                 _1| _2| _3|
+-------------------+---+---+
|0162381440670851711|  4|7.0|
|0162381440670851711| 11|4.0|
|0162381440670851711| 32|1.0|
+-------------------+---+---+
only showing top 3 rows

DataFrame的列名默认为_n(n=1,2,3,……)。

 

4、指定列名

通过定义case class Data实体类,为DataFrame每个列指定列名,代码如下:

package com.leboop.rdd import org.apache.spark.sql.SparkSession /** * DataFrame创建Demo */ case class Data(user_id:String,item_id:String,score:Double) object DataFrameDemo { def main(args: Array[String]): Unit = { //创建Spark SQL的切入点(RDD的切入点是SparkContext) val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate() //以csv格式读取文件,直接生成DataFrame数据集 val dataDF = spark.read.csv("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data").toDF("user_id","item_id","score") //打印前三行 dataDF.show(3) //spark读取一般文件,生成的是DataSet数据集 val dataDataSet = spark.read.textFile("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data") //使用toDF()函数将DataSet转换成DataFrame import spark.implicits._ val dataDF2 = dataDataSet.map(x=>x.split(",")).map(x=>Data(x(0),x(1),x(2).toDouble)).toDF() //打印前三行 dataDF2.show(3) val rdd = spark.sparkContext.textFile("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data") val dataDF3 = rdd.map(x=>x.split(",")).map(x=>Data(x(0),x(1),x(2).toDouble)).toDF() dataDF3.show(3) } } 

三种方式程序运行结果都是:

+-------------------+-------+-----+
|            user_id|item_id|score|
+-------------------+-------+-----+
|0162381440670851711|      4|  7.0|
|0162381440670851711|     11|  4.0|
|0162381440670851711|     32|  1.0|
+-------------------+-------+-----+
only showing top 3 rows

注:关于切入点的问题,其实需要搞清楚Spark的组成,它主要有五个部分组成:

(1)Spark Core

Spark核心部分,操作RDD数据集

(2)Spark SQL

像关系型数据一样,使用SQL操作DataFrame数据集

(3)Spark Streaming

实时数据流处理

(4)Spark MLlib

机器学习算法库

(5)GraphX

图计算

 

三、Spark SQL操作DataFrame

1、创建临时视图

基本步骤:

(1)创建DataFrame

(2)由DataFrame创建临时视图

(3)写Spark SQL操作临时视图

代码如下:

package com.leboop.rdd import org.apache.spark.sql.SparkSession /** * DataFrame创建Demo */ case class Data(user_id:String,item_id:String,score:Double) object DataFrameDemo { def main(args: Array[String]): Unit = { //创建Spark SQL的切入点(RDD的切入点是SparkContext) val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate() //以csv格式读取文件,直接生成DataFrame数据集 val dataDF = spark.read.csv("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data").toDF("user_id","item_id","score") //创建临时视图 dataDF.createOrReplaceTempView("data") //sql查询 spark.sql("select * from data where score>10").show(3) } } 

我们创建DataFrame后,创建了一个名为data的临时视图(SparkSession关闭后,临时视图立即会失效),然后写了Spark SQL查询评分大于10分的所有行,并打印前3行。运行结果如下:

+-------------------+-------+-----+
|            user_id|item_id|score|
+-------------------+-------+-----+
|0162381440670851711|    176| 27.0|
|0162381440670851711|    183| 11.0|
|0162381440670851711|    259| 16.0|
+-------------------+-------+-----+
only showing top 3 rows

 

2、创建全局临时视图

package com.leboop.rdd import org.apache.spark.sql.SparkSession /** * DataFrame创建Demo */ case class Data(user_id:String,item_id:String,score:Double) object DataFrameDemo { def main(args: Array[String]): Unit = { //创建Spark SQL的切入点(RDD的切入点是SparkContext) val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate() //以csv格式读取文件,直接生成DataFrame数据集 val dataDF = spark.read.csv("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data").toDF("user_id","item_id","score") //创建全局临时视图 dataDF.createGlobalTempView("data") //sql查询 spark.sql("select * from global_temp.data where score>10").show(3) } } 

首先我们创建了一张全局临时视图,然后使用Spark SQL查询视图。注意全局临时视图存放在系统隐藏的数据库global_temp中,访问data临时视图时需要使用global_temp.data。全局临时视图,在一个Spark Sesstion关闭后,其他的Spark Sesstion可以继续使用,知道Spark应用被关闭。

程序运行结果

+-------------------+-------+-----+
|            user_id|item_id|score|
+-------------------+-------+-----+
|0162381440670851711|    176| 27.0|
|0162381440670851711|    183| 11.0|
|0162381440670851711|    259| 16.0|
+-------------------+-------+-----+
only showing top 3 rows

 

3、不创建临时视图

如下:

package com.leboop.rdd import org.apache.spark.sql.SparkSession /** * DataFrame创建Demo */ case class Data(user_id:String,item_id:String,score:Double) object DataFrameDemo { def main(args: Array[String]): Unit = { //创建Spark SQL的切入点(RDD的切入点是SparkContext) val spark = SparkSession.builder().appName("spark-sql-demo").master("local").getOrCreate() //以csv格式读取文件,直接生成DataFrame数据集 val dataDF = spark.read.csv("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data").toDF("user_id","item_id","score") //打印dataDF的数据结构 dataDF.printSchema() //查询user_id列 dataDF.select("user_id","score").show(3) //过滤出评分大于10的所有数据 dataDF.filter("score>10").show(3) //查询user_id,score,score加上10 起别名score dataDF.selectExpr("user_id","score","score+10 as score").show(3) //使用$符号查询,需要隐士转换 import spark.implicits._ dataDF.select($"user_id",$"score",$"score"+10).show(3) } } 

程序运行结果如下:

root
 |-- user_id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- score: string (nullable = true)



+-------------------+-----+
|            user_id|score|
+-------------------+-----+
|0162381440670851711|  7.0|
|0162381440670851711|  4.0|
|0162381440670851711|  1.0|
+-------------------+-----+
only showing top 3 rows



+-------------------+-------+-----+
|            user_id|item_id|score|
+-------------------+-------+-----+
|0162381440670851711|    176| 27.0|
|0162381440670851711|    183| 11.0|
|0162381440670851711|    259| 16.0|
+-------------------+-------+-----+
only showing top 3 rows




+-------------------+-----+-----+
|            user_id|score|score|
+-------------------+-----+-----+
|0162381440670851711|  7.0| 17.0|
|0162381440670851711|  4.0| 14.0|
|0162381440670851711|  1.0| 11.0|
+-------------------+-----+-----+
only showing top 3 rows




+-------------------+-----+------------+
|            user_id|score|(score + 10)|
+-------------------+-----+------------+
|0162381440670851711|  7.0|        17.0|
|0162381440670851711|  4.0|        14.0|
|0162381440670851711|  1.0|        11.0|
+-------------------+-----+------------+
only showing top 3 rows


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM