spark2.0系列《一》—— RDD VS. DataFrame VS. DataSet


  雖說,spark我也不陌生,之前一直用python跑的spark,基本的core和SQL操作用的也是比較熟練。但是這一切的基礎都是在RDD上進行操作,即使是進行SQL操作也是將利用SpaekContext類中的textFile方法讀取txt文件返回RDD對象,然后使用SQLContext實例化載利用函數createDataFrame將格式化后的數據轉化為dataFrame或者利用createDataset將數據轉換為dataset。真不是一般的麻煩。。。話不多說,比如以下python代碼示例:

 1 # -*-coding:utf-8-*-
 2 # Created by wuying on 2017/3/28
 3 
 4 from pyspark.sql import Row
 5 from pyspark import SparkContext
 6 from pyspark.sql import SQLContext
 7 from pyspark.sql.functions import *
 8 
 9 
10 def create_df(sqlContext, raw_data):
11     """
12     :param row_data: original data
13     :return: data frame
14     """
15     lineLists = raw_data.map(lambda x: x.split(','))
16    //篩選部分有用的數據字段作為表頭
17     row_data = lineLists.map(lambda x: Row(
18     recordCode = x[0],
19     logicCode  = x[1],
20     deviceCode = x[2],
21     compId     = x[2][:3],
22     siteId     = x[2][:6],
23     transType  = x[4],
24     cardTime   = x[8],
25     compName   = x[12],
26     siteName   = x[13],
27     carCode    = x[14]
28     )
29     )
30     SZT_df = sqlContext.createDataFrame(row_data)
31     SZT_df.registerTempTable("SZT_df")
32 
33     return SZT_df
34 
35 
36 if __name__ == '__main__':
37     # Create DataFrame
38     # Load data from hdfs
39     inputFile = "P_GJGD_SZT_20170101"  //數據來源於地鐵打卡
40     sc = SparkContext(master="local[*]", appName="AppTest", pyFiles=["prepared.py"])
41     raw_data = sc.textFile(inputFile)
42     sqlContext = SQLContext(sc)
43     SZT_df = create_df(sqlContext, raw_data)
44     print SZT_df.dtypes

  1、RDD,英文全稱是“Resilient Distributed Dataset”,即彈性分布式數據集,聽起來高大上的名字,簡而言之就是大數據案例下的一種數據對象,RDD這個API在spark1.0中就已經存在,因此比較老的版本的tutorial中用的都是RDD作為原始數據處理對象,而在spark-shell中已經實例化好的sc對象一般通過加載數據產生的RDD這個對象的基礎上進行數據分析。當然,打草稿情況(未接觸企業級系統)下RDD API還是足夠我們對一般的數據進行轉換,清洗以及計數,里面有較為豐富的函數可以調用,比如常用的map, filter, groupBy等等,具體實現見pyspark。所以,這個RDD的簡單安全且易於理解使得很多人都是用RDD打開spark這個高大上之神器的大門(包括我~~)。

  首先,它不好操作,以我目前的知識水平而言,我寧願選dataFrame。因為dataFrame方便且高速,比如SQL語句,自從用了SQL,再也不想一步步map,一步步filter了。其次,據說,RDD無論是在集群上執行任務還是存儲到硬盤上。它都會默認使用java對象序列化(提高數據操作的性能),而序列化單個java和scala對象的開銷過大,並且需要將數據及其結構在各節點之間傳輸,而生成和銷毀個別對象需要進行垃圾收集這期間的開銷也非常大。

  2、DataFrame。說到dataFrame,我就想到R和pandas(python)中常用的數據框架就是dataFrame,估計后來spark的設計者從R和pandas這個兩個數據科學語言中的數據dataFrame中吸取靈感,不同的是dataFrame是從底層出發為大數據應用設計出的RDD的拓展,因此它具有RDD所不具有的幾個特性(Spark 1.3以后):

  • 處理數據能力從千字節到PB量級不等
  • 支持各種數據格式和存儲系統
  • 通過SPARK SQL Catalyst優化器進行高效率優化和代碼生成
  • 通過SPARK對所有大數據工具基礎架構進行無縫集成
  • 提供Python,Scala,Java 和R的api

  簡而言之,我們可以將dataFrame當作是關系數據庫中表或者是R或者Python中的dataFrame數據結構。實際上,有了dataFrame我們相當於spark可以管理數據視圖,以后傳輸數據只要在各個節點穿數據數據而不需要傳數據結構,這種方式比java序列化有效的多。

  直接上個scala代碼瞅瞅:

 1 package cn.sibat.metro
 2 import org.apache.spark.sql.SparkSession
 3 
 4 /**
 5   * Created by wing1995 on 2017/4/20
 6   */
 7 
 8 object Test {
 9   def main(args: Array[String]) = {
10     val spark = SparkSession
11         .builder()
12         .config("spark.sql.warehouse.dir", "file:/file:E:/bus")
13         .appName("Spark SQL Test")
14         .master("local[*]")
15         .getOrCreate()
16 
17     import spark.implicits._
18 
19     val df = spark.sparkContext
20       .textFile("E:\\trafficDataAnalysis\\SZTDataCheck\\testData.txt")
21       .map(_.split(","))
22       .map(line => SZT(line(0), line(1), line(2), line(2).substring(0, 3), line(2).substring(0, 6), line(4), line(8), line(12), line(13), line(14)))
23       .toDF()
24     df.show()
25     df.printSchema()
26   }
27 }
28 
29 case class SZT(recordCode: String, logicCode: String, terminalCode: String, compId: String, siteId: String,
30                transType: String, cardTime: String, compName: String, siteName: String, vehicleCode: String
31               )

  代碼真是清新可人啊,直接SparkSession實例化然后再怎么轉其他格式,怎么讀其他數據都可以。。。

  3、Dataset(Spark 1.6)

  跟DataFrame很像,不是很熟悉,貌似是為了兼容SCALA中的RDD和JAVA的面向對象而設計,事實證明Scala在Spark中的優勢是java取代不了的,即使java8已經做出不少改進。然而,Scala作為原生態語言,仍然是Spark使用者的主流。所以,接下來的博客陸續以Scala為主。

  個人是比較喜歡簡潔而有趣的Scala,為數據科學而設計!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM