Spark新特性(DataFrame/DataSet、Structured Streaming和Spark Session)


spark 新特性主要增加DataFrame/DataSet、Structured Streaming和Spark Session

1. DataFrame/DataSet主要替換之前的RDD,主要優勢在執行效率、集群間通信、執行優化和GC開銷比RDD有優勢。

2. Structured Streaming大部分場景替換之前的Streaming,比之前的優勢集中中簡潔的模型、一致的API、卓越的性能和Event Time的支持

3. SparkSession的概念,它為用戶提供了一個統一的切入點來使用Spark的各項功能,SparkConf、SparkContext和SQLContext都已經被封裝在SparkSession當中。

 

1. DataFrame、Dataset

1.1 RDD與DataFrame、Dataset對比

 

RDD

Dataset/DataFrame

執行效率

創建大量臨時對象,對GC造成壓力,優化需要對Spark運行時機制有一定的了解,門檻較高

利用Spark SQL引擎,自動優化

集群間的通信

集群間的通信,還是IO操作都需要對對象的結構和數據進行序列化和反序列化

當序列化數據時,Encoder產生字節碼與off-heap進行交互,能夠達到按需訪問數據的效果,而不用反序列化整個對象

執行優化

 

需要用戶自己優化(join之后又做了一次filter操作。如果原封不動地執行這個執行計划,最終的執行效率是不高的。因為join是一個代價較大的操作)

自動優化,能把能將filter下推到 join下方

GC開銷

頻繁的創建和銷毀對象,GC開銷高

引入off-heap:能使用JVM堆以外的內存,off-heap就像地盤,schema就像地圖,Spark有地圖又有自己地盤了,就可以自己說了算了,不再受JVM的限制,也就不再收GC的困擾了

 

 

 

1.2 快速理解Spark Dataset

case class People(id:Long, name:String, age:Int)

sc.makeRDD(seq(People(1, "zhangshan", 23),People(2, "lisi", 35)))

 

 

RDD中的兩行數據

People(id =1, name=“zhangshan”,age=23)

People(id =1, name=“lisi”,age=35)

 

DataFrame的數據

id: bigint

name:String

age:bigint

1

zhangshan

23

2

lisi

35

 

DataFrame比RDD多了一個表頭信息(Schema),像一張表了,DataFrame還配套了新的操作數據的方法,DataFrame API(如df.select())和SQL(select id, name from xx_table where ...),通過DataFrame API或SQL處理數據,會自動經過Spark 優化器(Catalyst)的優化,即使你寫的程序或SQL不高效,也可以運行的很快。

DataSet的數據。

Value:People[id:bingint, name:String,age:bigint]

People(id =1, name=“zhangshan”,age=23)

People(id =1, name=“lisi”,age=35)

 

相對於RDD,Dataset提供了強類型支持,也是在RDD的每行數據加了類型約束。使用Dataset API的程序,會經過Spark SQL的優化器進行優化,相比DataFrame,Dataset提供了編譯時類型檢查。

RDD轉換DataFrame后不可逆,但RDD轉換Dataset是可逆的

1.3 實踐

創建RDD

image.png 

通過RDD創建DataFrame,再通過DataFrame轉換成RDD,發現RDD的類型變成了Row類型

image.png 

image.png 

通過RDD創建Dataset,再通過Dataset轉換為RDD,發現RDD還是原始類型

image.png 

經典的wordCount舉例

使用Dataset API做轉換操作

image.png 

使用SQL進行單詞統計

image.png 

使用SQL進行排名分析

image.png 

 

2. Structured Streaming

Structured Streaming 是一個可拓展,容錯的,基於Spark SQL執行引擎的流處理引擎。使用小量的靜態數據模擬流處理。伴隨流數據的到來,Spark SQL引擎會逐漸連續處理數據並且更新結果到最終的Table中。你可以在Spark SQL上引擎上使用DataSet/DataFrame API處理流數據的聚集,事件窗口,和流與批次的連接操作等。最后Structured Streaming 系統快速,穩定,端到端的恰好一次保證,支持容錯的處理。

2.1 Structured Streaming的Spark優勢

Ø 簡潔的模型。Structured Streaming的模型很簡潔,易於理解。用戶可以直接把一個流想象成是無限增長的表格。

Ø 一致的API。由於和Spark SQL共用大部分API,對Spark SQL熟悉的用戶很容易上手,代碼也十分簡潔。同時批處理和流處理程序還可以共用代碼,不需要開發兩套不同的代碼,顯著提高了開發效率。

Ø 卓越的性能。Structured Streaming在與Spark SQL共用API的同時,也直接使用了Spark SQL的Catalyst優化器和Tungsten,數據處理性能十分出色。此外,Structured Streaming還可以直接從未來Spark SQL的各種性能優化中受益。

Ø Event Time的支持,Stream-Stream Join(2.3.0新增的功能),毫秒級延遲(2.3.0即將加入的Continuous Processing)

 

2.2 Structured Streaming介紹

Structured Streaming則是在Spark 2.0加入的經過重新設計的全新流式引擎。它的模型十分簡潔,易於理解。一個流的數據源從邏輯上來說就是一個不斷增長的動態表格,隨着時間的推移,新數據被持續不斷地添加到表格的末尾。用戶可以使用Dataset/DataFrame或者SQL來對這個動態數據源進行實時查詢。每次查詢在邏輯上就是對當前的表格內容執行一次SQL查詢。如何執行查詢則是由用戶通過觸發器(Trigger)來設定。用戶既可以設定定期執行,也可以讓查詢盡可能快地執行,從而達到實時的效果。一個流的輸出有多種模式,既可以是基於整個輸入執行查詢后的完整結果,也可以選擇只輸出與上次查詢相比的差異,或者就是簡單地追加最新的結果。這個模型對於熟悉SQL的用戶來說很容易掌握,對流的查詢跟查詢一個表格幾乎完全一樣。

image.png 

對輸入的查詢將生成 “Result Table” (結果表)。每個 trigger interval (觸發間隔)(例如,每 1 秒),新 row (行)將附加到 Input Table ,最終更新 Result Table 。無論何時更新 result table ,我們都希望將 changed result rows (更改的結果行)寫入 external sink (外部接收器)。

2.3 Structured Streaming實踐

創建一個 streaming DataFrame ,從監聽 localhost:9999 的服務器上接收的 text data (文本數據),並且將 DataFrame 轉換以計算 word counts 。

Structured Streaming 里,outputMode,現在有complete,append,update 三種,現在的版本只實現了前面兩種。

complete,每次計算完成后,你都能拿到全量的計算結果。

append,每次計算完成后,你能拿到增量的計算結果。

image.png
輸入:image.png

Complete模式結果

image.png 

 


Append 模式結果

image.png 

3. SparkSession

SparkSession的概念,它為用戶提供了一個統一的切入點來使用Spark的各項功能,用戶不但可以使用DataFrame和Dataset的各種API,學習Spark2的難度也會大大降低。

SparkConf、SparkContext和SQLContext都已經被封裝在SparkSession當中


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM