Spark DataFrame及RDD與DataSet轉換成DataFrame
一、什么是DataFrame
DataFrame和RDD一樣,也是Spark的一種彈性分布式數據集,它是一個由列組成的數據集,概念上等同於關系型數據庫中的一張表。DataFrame可以從非常寬泛的數據源中的構建,比如結構化的數據文件,Hive中的表,外部數據庫,或者已經創建好的RDDs等等。在Scala和Java中,DataFrame由行數據集表示。在Scala API中,DataFrame 可以簡單看成DataSer[Row],而在Java API中,使用DataSet<Row>表示DataFrame。有人肯定會問,已經有了彈性分布式數據集RDD,為什么還要引入DataFrame呢?因為在Spark中,我們可以像在關系型數據庫中使用SQL操作數據庫表一樣,使用Spark SQL操作DataFrame。這讓熟悉關系型數據庫SQL人員也能輕松掌握。
二、創建DataFrame
首先導入Spark Core、Spark SQL、和Hadoop Client,pom.xml文件如下:
下面使用三種方式創建DataFrame
itemdata.data文件數據格式如下:
第一列是user_id,第二列是item_id,第三列是score。
1、以csv格式讀取文件
步驟:
(1)使用SparkSesstion創建Spark SQL的切入點spark,
(2)spark以csv格式讀取HDFS文件系統/input/mahout-demo/目錄下的文件itemdata.data,
讀取的結果就是是DataFrame數據集,程序如下:
運行結果如下:
默認DataFrame的列名為_cn(n=0,1,2.……)。后面我們會說明如何指定列名
2、以一般格式讀取文件
同樣使用創建好的spark切入點,調用textFile函數讀取文件itemdata.data,生成的是DataSet數據集,數據文件的每一行內容作為整體是DataSet的一列,首先需要map算子將數據按“,”分詞,得到三個元素的數組作為整體,再次使用map將數組編程具有三列的DataSet,最后使用toDF()函數將其轉換成DataFrame數據集,這里我們在toDF()函數中指明了每個列的列名。程序如下:
程序運行結果:
3、以SparkContext作為切入點讀取文件
Spark SQL的切入點SparkSesstion封裝了Spark Core的切入點SparkContext,所以SparkContext對象可以由SparkSesstion創建。SparkContext讀取文件生成的是RDD數據集,轉換如下:
程序運行結果
DataFrame的列名默認為_n(n=1,2,3,……)。
4、指定列名
通過定義case class Data實體類,為DataFrame每個列指定列名,代碼如下:
三種方式程序運行結果都是:
注:關於切入點的問題,其實需要搞清楚Spark的組成,它主要有五個部分組成:
(1)Spark Core
Spark核心部分,操作RDD數據集
(2)Spark SQL
像關系型數據一樣,使用SQL操作DataFrame數據集
(3)Spark Streaming
實時數據流處理
(4)Spark MLlib
機器學習算法庫
(5)GraphX
圖計算
三、Spark SQL操作DataFrame
1、創建臨時視圖
基本步驟:
(1)創建DataFrame
(2)由DataFrame創建臨時視圖
(3)寫Spark SQL操作臨時視圖
代碼如下:
我們創建DataFrame后,創建了一個名為data的臨時視圖(SparkSession關閉后,臨時視圖立即會失效),然后寫了Spark SQL查詢評分大於10分的所有行,並打印前3行。運行結果如下:
2、創建全局臨時視圖
首先我們創建了一張全局臨時視圖,然后使用Spark SQL查詢視圖。注意全局臨時視圖存放在系統隱藏的數據庫global_temp中,訪問data臨時視圖時需要使用global_temp.data。全局臨時視圖,在一個Spark Sesstion關閉后,其他的Spark Sesstion可以繼續使用,知道Spark應用被關閉。
程序運行結果
3、不創建臨時視圖
如下:
程序運行結果如下: