spark 三種數據集的關系(一)


 

Catalyst Optimizer:

 

Dataset

數據集僅可用Scala或Java。但是,我們提供了以下上下文來更好地理解Spark 2.0的方向數據集是在2015年作為Apache Spark 1.6版本的一部分引入的。datasets的目標是提供一個類型安全的編程接口。

這允許開發人員使用具有編譯時類型安全性的半結構化數據(如JSON或鍵值對)進行工作(也就是說,生產應用程序在運行之前可以檢查錯誤)。

Python不實現Dataset API的部分原因是Python不是一種類型安全的語言。同樣重要的是,數據集API包含高級領域特定的語言操作,如sum()、avg()、join()和group()。

后一個特性意味着您具有傳統Spark RDDs的靈活性,但是代碼也更容易表達、讀取和編寫。

 

從下圖中可以看出,DataFrame和Dataset都屬於作為Apache Spark的一部分引入的新Dataset API2.0:

DataFrame和Dataset api的統一有可能創建打破向后兼容性的更改。

從下圖中可以看出,DataFrame和Dataset都屬於作為Apache Spark的一部分引入的新Dataset API2.0:DataFrame和Dataset api的統一有可能創建打破向后兼容性的更改。

這是Apache Spark 2.0成為主要版本的主要原因之一(最小化任何中斷的更改)。從下圖中可以看出,DataFrame和Dataset都屬於作為Apache Spark的一部分引入的新Dataset API2.0:

如前所述,Dataset API提供了一個類型安全的、面向對象的編程接口。數據集可以通過將表達式和數據字段公開給查詢接口和

的快速內存編碼來利用。但是,隨着DataFrame和Dataset現在作為Apache Spark 2.0的一部分統一起來,DataFrame現在是Dataset非類型化API的別名。

更具體地說:

DataFrame = Dataset[T]

 

DataFrame是什么?

在Spark中,DataFrame是一種以RDD為基礎的分布式數據集,類似於傳統數據庫中的二維表格。DataFrame與RDD的主要區別在於,前者帶有schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結構信息,從而對藏於DataFrame背后的數據源以及作用於DataFrame之上的變換進行了針對性的優化,最終達到大幅提升運行時效率的目標。反觀RDD,由於無從得知所存數據元素的具體內部結構,Spark Core只能在stage層面進行簡單、通用的流水線優化。

 

創建DataFrame

在Spark SQL中,開發者可以非常便捷地將各種內、外部的單機、分布式數據轉換為DataFrame。以下Python示例代碼充分體現了Spark SQL 1.3.0中DataFrame數據源的豐富多樣和簡單易用:

# 從Hive中的users表構造DataFrame
users = sqlContext.table("users")

# 加載S3上的JSON文件
logs = sqlContext.load("s3n://path/to/data.json", "json")

# 加載HDFS上的Parquet文件
clicks = sqlContext.load("hdfs://path/to/data.parquet", "parquet")

# 通過JDBC訪問MySQL
comments = sqlContext.jdbc("jdbc:mysql://localhost/comments", "user")

# 將普通RDD轉變為DataFrame
rdd = sparkContext.textFile("article.txt") \
                  .flatMap(lambda line: line.split()) \
                  .map(lambda word: (word, 1)) \
                  .reduceByKey(lambda a, b: a + b) \
wordCounts = sqlContext.createDataFrame(rdd, ["word", "count"])

# 將本地數據容器轉變為DataFrame
data = [("Alice", 21), ("Bob", 24)]
people = sqlContext.createDataFrame(data, ["name", "age"])

# 將Pandas DataFrame轉變為Spark DataFrame(Python API特有功能)
sparkDF = sqlContext.createDataFrame(pandasDF)

使用DataFrame

和Pandas類似,Spark DataFrame也提供了一整套用於操縱數據的DSL。

這些DSL在語義上與SQL關系查詢非常相近(這也是Spark SQL能夠為DataFrame提供無縫支持的重要原因之一)。以下是一組用戶數據分析示例:

# 創建一個只包含"年輕"用戶的DataFrame
young = users.filter(users.age < 21)

# 也可以使用Pandas風格的語法
young = users[users.age < 21]

# 將所有人的年齡加1
young.select(young.name, young.age + 1)

# 統計年輕用戶中各性別人數
young.groupBy("gender").count()

# 將所有年輕用戶與另一個名為logs的DataFrame聯接起來
young.join(logs, logs.userId == users.userId, "left_outer")

 除DSL以外,我們當然也可以像以往一樣,用SQL來處理DataFrame:

young.registerTempTable("young")
sqlContext.sql("SELECT count(*) FROM young")

最后,當數據分析邏輯編寫完畢后,我們便可以將最終結果保存下來或展現出來:

# 追加至HDFS上的Parquet文件
young.save(path="hdfs://path/to/data.parquet",
           source="parquet",
           mode="append")
 
# 覆寫S3上的JSON文件
young.save(path="s3n://path/to/data.json",
           source="json",
           mode="append")
 
# 保存為SQL表
young.saveAsTable(tableName="young", source="parquet" mode="overwrite")
 
# 轉換為Pandas DataFrame(Python API特有功能)
pandasDF = young.toPandas()
 
# 以表格形式打印輸出
young.show()</span>

 

1.幕后英雄:Spark SQL查詢優化器與代碼生成

正如RDD的各種變換實際上只是在構造RDD DAG,DataFrame的各種變換同樣也是lazy的。它們並不直接求出計算結果,而是將各種變換組裝成與RDD DAG類似的邏輯查詢計划。如前所述,由於DataFrame帶有schema元信息,Spark SQL的查詢優化器得以洞察數據和計算的精細結構,從而施行具有很強針對性的優化。隨后,經過優化的邏輯執行計划被翻譯為物理執行計划,並最終落實為RDD DAG。

 

 

 

這樣做的好處體現在幾個方面:

1. 用戶可以用更少的申明式代碼闡明計算邏輯,物理執行路徑則交由Spark SQL自行挑選。一方面降低了開發成本,一方面也降低了使用門檻——很多情況下,即便新手寫出了較為低效的查詢,Spark SQL也可以通過過濾條件下推、列剪枝等策略予以有效優化。這是RDD API所不具備的。

2. Spark SQL可以動態地為物理執行計划中的表達式生成JVM字節碼,進一步實現歸避虛函數調用開銷、削減對象分配次數等底層優化,使得最終的查詢執行性能可以與手寫代碼的性能相媲美。

3. 對於PySpark而言,采用DataFrame編程時只需要構造體積小巧的邏輯執行計划,物理執行全部由JVM端負責,Python解釋器和JVM間大量不必要的跨進程通訊得以免除。如上圖所示,一組簡單的對一千萬整數對做聚合的測試中,PySpark中DataFrame API的性能輕松勝出RDD API近五倍。此外,今后Spark SQL在Scala端對查詢優化器的所有性能改進,PySpark都可以免費獲益。

2.外部數據源API增強

 

 

 

數據寫入支持

在Spark 1.2.0中,外部數據源API只能將外部數據源中的數據讀入Spark,而無法將計算結果寫回數據源;同時,通過數據源引入並注冊的表只能是臨時表,相關元信息無法持久化。在1.3.0中,我們提供了完整的數據寫入支持,從而補全了多數據源互操作的最后一塊重要拼圖。前文示例中Hive、Parquet、JSON、Pandas等多種數據源間的任意轉換,正是這一增強的直接成果。

站在Spark SQL外部數據源開發者的角度,數據寫入支持的API主要包括:

2.1. 數據源表元數據持久化

CREATE [TEMPORARY] TABLE [IF NOT EXISTS] 
 <table-name> [(col-name data-type [, ...)] 
 USING <source> [OPTIONS ...] 
 [AS <select-query>]

由此,注冊自外部數據的SQL表既可以是臨時表,也可以被持久化至Hive metastore。需要持久化支持的外部數據源,除了需要繼承原有的RelationProvider以外,還需繼承CreatableRelationProvider。

2.2. InsertableRelation

支持數據寫入的外部數據源的relation類,還需繼承trait InsertableRelation,並在insert方法中實現數據插入邏輯。

Spark 1.3.0中內置的JSON和Parquet數據源都已實現上述API,可以作為開發外部數據源的參考示例。

 

3.統一的load/save API

在Spark 1.2.0中,要想將SchemaRDD中的結果保存下來,便捷的選擇並不多。常用的一些包括:

rdd.saveAsParquetFile(...)
rdd.saveAsTextFile(...)
rdd.toJSON.saveAsTextFile(...)
rdd.saveAsTable(...)
....

可見,不同的數據輸出方式,采用的API也不盡相同。更令人頭疼的是,我們缺乏一個靈活擴展新的數據寫入格式的方式。

針對這一問題,1.3.0統一了load/save API,讓用戶按需自由選擇外部數據源。這套API包括:

1.SQLContext.table
#從SQL表中加載DataFrame。

2.SQLContext.load
#從指定的外部數據源加載DataFrame。

3.SQLContext.createExternalTable
#將指定位置的數據保存為外部SQL表,元信息存入Hive metastore,並返回包含相應數據的DataFrame。

4.DataFrame.save
#將DataFrame寫入指定的外部數據源。

5.DataFrame.saveAsTable
#將DataFrame保存為SQL表,元信息存入Hive metastore,同時將數據寫入指定位置。

4.Parquet數據源增強

Spark SQL從一開始便內置支持Parquet這一高效的列式存儲格式。在開放外部數據源API之后,原有的Parquet支持也正在逐漸轉向外部數據源。1.3.0中,Parquet外部數據源的能力得到了顯著增強。主要包括schema合並和自動分區處理。

1.Schema合並

與ProtocolBuffer和Thrift類似,Parquet也允許用戶在定義好schema之后隨時間推移逐漸添加新的列,只要不修改原有列的元信息,新舊schema仍然可以兼容。這一特性使得用戶可以隨時按需添加新的數據列,而無需操心數據遷移。

2.分區信息發現

按目錄對同一張表中的數據分區存儲,是Hive等系統采用的一種常見的數據存儲方式。新的Parquet數據源可以自動根據目錄結構發現和推演分區信息。

3.分區剪枝

分區實際上提供了一種粗粒度的索引。當查詢條件中僅涉及部分分區時,通過分區剪枝跳過不必要掃描的分區目錄,可以大幅提升查詢性能。

以下Scala代碼示例統一展示了1.3.0中Parquet數據源的這幾個能力(Scala代碼片段):

// 創建兩個簡單的DataFrame,將之存入兩個獨立的分區目錄
val df1 = (1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.save("data/test_table/key=1", "parquet", SaveMode.Append)
val df2
= (6 to 10).map(i => (i, i * 2)).toDF("single", "double") df2.save("data/test_table/key=2", "parquet", SaveMode.Append)
// 在另一個DataFrame中引入一個新的列,並存入另一個分區目錄 val df3 = (11 to 15).map(i => (i, i * 3)).toDF("single", "triple") df3.save("data/test_table/key=3", "parquet", SaveMode.Append)
// 一次性讀入整個分區表的數據 val df4 = sqlContext.load("data/test_table", "parquet") // 按分區進行查詢,並展示結果 val df5 = df4.filter($"key" >= 2)
df5.show()

這段代碼的執行結果為: 

6  12   null 2 
7  14   null 2 
8  16   null 2 
9  18   null 2 
10 20   null 2 
11 null 33   3 
12 null 36   3 
13 null 39   3 
14 null 42   3 
15 null 45   3

可見,Parquet數據源自動從文件路徑中發現了key這個分區列,並且正確合並了兩個不相同但相容的schema。值得注意的是,在最后的查詢中查詢條件跳過了key=1這個分區。Spark SQL的查詢優化器會根據這個查詢條件將該分區目錄剪掉,完全不掃描該目錄中的數據,從而提升查詢性能。

總體來說

  • schema : RDD每一行的數據, 結構都是一樣的. 這個結構就存儲在schema中. Spark通過schame就能夠讀懂數據, 因此在通信和IO時就只需要序列化和反序列化數據, 而結構的部分就可以省略了.
  • off-heap : 意味着JVM堆以外的內存, 這些內存直接受操作系統管理(而不是JVM)。Spark能夠以二進制的形式序列化數據(不包括結構)到off-heap中, 當要操作數據時, 就直接操作off-heap內存. 由於Spark理解schema, 所以知道該如何操作.

off-heap就像地盤, schema就像地圖, Spark有地圖又有自己地盤了, 就可以自己說了算了, 不再受JVM的限制, 也就不再收GC的困擾了.

通過schema和off-heap, DataFrame解決了RDD的缺點, 但是卻丟了RDD的優點. DataFrame不是類型安全的, API也不是面向對象風格的.所以我們后來在spark2.0引入的dataset。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM