Spark的核心RDD(Resilient Distributed Datasets彈性分布式數據集)


Spark的核心RDD(Resilient Distributed Datasets彈性分布式數據集)

 

鋪墊

  1. 在hadoop中一個獨立的計算,例如在一個迭代過程中,除可復制的文件系統(HDFS)外沒有提供其他存儲的概念,這就導致在網絡上進行數據復制而增加了大量的消耗,而對於兩個的MapReduce作業之間數據共享只有一個辦法,就是將其寫到一個穩定的外部存儲系統,如分布式文件系統。這會引入數據備份、磁盤I/O以及序列化,這些都會引起大量的開銷,從而占據大部分的應用執行時間。所以我們發現如果在計算過程中如能共享數據,那將會降低集群開銷同時還能減少任務執行時間。
  2. 而 spark中的RDDs讓用戶可以直接控制數據的共享。RDD具有可容錯和並行數據結構特征,可以指定數據存儲到硬盤還是內存、控制數據的分區方法並在數據集上進行種類豐富的操作。
  3. 目前提出的基於集群的內存存儲抽象,比如分布式共享內存(Distributed Shared Memory|DSM),鍵-值存儲(Key-Value|Nosql),數據庫(RDBMS)等提供了一個對內部狀態基於細粒度更新的接口(例如,表格里面的單元)。而這樣設計,提供容錯性的方法:在主機之間復制數據,或者對各主機的更新情況做日志記錄。但這兩種方法對於數據密集型的任務來說代價很高,因為它們需要在帶寬遠低於內存的集群網絡間拷貝大量的數據,同時還將產生大量的存儲開銷。但RDD提供一種基於粗粒度變換(如 map,filter,join)的接口,該接口會將相同的操作應用到多個數據集上。這使得他們可以通過記錄用來創建數據集的變換(lineage),而不需存儲真正的數據,進而達到高效的容錯性。當一個RDD的某個分區丟失的時候,RDD記錄有足夠的信息記錄其如何通過其他的RDD進行計算,且只需重新計算該分區。因此,丟失的數據可以被很快的恢復,而不需要昂貴的復制代價。

主角

首先我們來思考一個問題吧:Spark的計算模型是如何做到並行的呢?如果你有一箱香蕉,讓三個人拿回家吃完(好吧,我承認我愛吃香蕉,哈哈),如果不拆箱子就會很麻煩對吧,哈哈,一個箱子嘛,當然只有一個人才能抱走了。這時候智商正常的人都知道要把箱子打開,倒出來香蕉,分別拿三個小箱子重新裝起來,然后,各自抱回家去啃吧。 
Spark和很多其他分布式計算系統都借用了這種思想來實現並行:把一個超大的數據集,切分成N個小堆,找M個執行器(M < N),各自拿一塊或多塊數據慢慢玩,玩出結果了再收集在一起,這就算執行完啦。那么Spark做了一項工作就是:凡是能夠被我算的,都是要符合我的要求的,所以spark無論處理什么數據先整成一個擁有多個分塊的數據集再說,這個數據集就叫RDD。 
好了,那現在就詳細介紹下RDD吧

1.概念 
RDD(Resilient Distributed Datasets,彈性分布式數據集)是一個分區的只讀記錄的集合。RDD只能通過在穩定的存儲器或其他RDD的數據上的確定性操作來創建。我們把這些操作稱作變換以區別其他類型的操作。例如 map,filter和join。 
RDD在任何時候都不需要被”物化”(進行實際的變換並最終寫入穩定的存儲器上)。實際上,一個RDD有足夠的信息描述着其如何從其他穩定的存儲器上的數據生成。它有一個強大的特性:從本質上說,若RDD失效且不能重建,程序將不能引用該RDD。而用戶可以控制RDD的其他兩個方面:持久化和分區。用戶可以選擇重用哪個RDD,並為其制定存儲策略(比如,內存存儲)。也可以讓RDD中的數據根據記錄的key分布到集群的多個機器。 這對位置優化來說是有用的,比如可用來保證兩個要jion的數據集都使用了相同的哈希分區方式。

2.spark 編程接口 
對編程人員通過對穩定存儲上的數據進行變換操作(如map和filter).而得到一個或多個RDD。然后可以調用這些RDD的actions(動作)類的操作。這類操作的目是返回一個值或是將數據導入到存儲系統中。動作類的操作如count(返回數據集的元素數),collect(返回元素本身的集合)和save(輸出數據集到存儲系統)。spark直到RDD第一次調用一個動作時才真正計算RDD。 
還可以調用RDD的persist(持久化)方法來表明該RDD在后續操作中還會用到。默認情況下,spark會將調用過persist的RDD存在內存中。但若內存不足,也可以將其寫入到硬盤上。通過指定persist函數中的參數,用戶也可以請求其他持久化策略(如Tachyon)並通過標記來進行persist,比如僅存儲到硬盤上或是在各機器之間復制一份。最后,用戶可以在每個RDD上設定一個持久化的優先級來指定內存中的哪些數據應該被優先寫入到磁盤。 
PS: 
緩存有個緩存管理器,spark里被稱作blockmanager。注意,這里還有一個誤區是,很多初學的同學認為調用了cache或者persist的那一刻就是在緩存了,這是完全不對的,真正的緩存執行指揮在action被觸發。

說了一大堆枯燥的理論,我用一個例子來解釋下吧: 
現在數據存儲在hdfs上,而數據格式以“;”作為每行數據的分割:

"age";"job";"marital";"education";"default";"balance";"housing";"loan" 30;"unemployed";"married";"primary";"no";1787;"no";"no" 33;"services";"married";"secondary";"no";4789;"yes";"yes"

scala代碼如下:

 //1.定義了以一個HDFS文件(由數行文本組成)為基礎的RDD val lines = sc.textFile("/data/spark/bank/bank.csv")  //2.因為首行是文件的標題,我們想把首行去掉,返回新RDD是withoutTitleLines val withoutTitleLines = lines.filter(!_.contains("age"))  //3.將每行數據以;分割下,返回名字是lineOfData的新RDD val lineOfData = withoutTitleLines.map(_.split(";"))  //4.將lineOfData緩存到內存到,並設置緩存名稱是lineOfData lineOfData.setName("lineOfData") lineOfData.persist  //5.獲取大於30歲的數據,返回新RDD是gtThirtyYearsData val gtThirtyYearsData = lineOfData.filter(line => line(0).toInt > 30)  //到此,集群上還沒有工作被執行。但是,用戶現在已經可以在動作(action)中使用RDD。  //計算大於30歲的有多少人 gtThirtyYearsData.count  //返回結果是3027

OK,我現在要解釋兩個概念NO.1 什么是lineage?,NO.2 transformations 和 actions是什么? 
lineage
這里寫圖片描述

在上面查詢大於30歲人查詢里,我們最開始得出去掉標題行所對應的RDD lines,即為withTitleLines,接着對withTitleLines進行map操作分割每行數據內容,之后再次進行過濾age大於30歲的人、最后進行count(統計所有記錄)。Spark的調度器會對最后的那個兩個變換操作流水線化,並發送一組任務給那些保存了lineOfData對應的緩存分區的節點。另外,如果lineOfData的某個分區丟失,Spark將只在該分區對應的那些行上執行原來的split操作即可恢復該分區。 
所以在spark計算時,當前RDD不可用時,可以根據父RDD重新計算當前RDD數據,但如果父RDD不可用時,可以可以父RDD的父RDD重新計算父RDD。

transformations 和 actions

transformations操作理解成一種惰性操作,它只是定義了一個新的RDD,而不是立即計算它。相反,actions操作則是立即計算,並返回結果給程序,或者將結果寫入到外存儲中。

下面我以示例解釋下:

這里寫圖片描述

先簡單介紹這些吧,稍后文章我會詳細介紹每個方法的使用,感興趣可以看spark官方文檔

3.RDDs接口5個特性

這里寫圖片描述

簡單概括為:一組分區,他們是數據集的最小分片;一組 依賴關系,指向其父RDD;一個函數,基於父RDD進行計算;以及划分策略和數據位置的元數據。例如:一個表現HDFS文件的RDD將文件的每個文件塊表示為一個分區,並且知道每個文件塊的位置信息。同時,對RDD進行map操作后具有相同的划分。當計算其元素時,將map函數應用於父RDD的數據。

4.RDDs依賴關系

  1. 在spark中如何表示RDD之間的依賴關系分為兩類: 
    窄依賴:每個父RDD的分區都至多被一個子RDD的分區使用,即為OneToOneDependecies; 
    寬依賴:多個子RDD的分區依賴一個父RDD的分區,即為OneToManyDependecies。 
    例如,map操作是一種窄依賴,而join操作是一種寬依賴(除非父RDD已經基於Hash策略被划分過了)
  2. 詳細介紹: 
    首先,窄依賴允許在單個集群節點上流水線式執行,這個節點可以計算所有父級分區。例如,可以逐個元素地依次執行filter操作和map操作。相反,寬依賴需要所有的父RDD數據可用並且數據已經通過類MapReduce的操作shuffle完成。 
    其次,在窄依賴中,節點失敗后的恢復更加高效。因為只有丟失的父級分區需要重新計算,並且這些丟失的父級分區可以並行地在不同節點上重新計算。與此相反,在寬依賴的繼承關系中,單個失敗的節點可能導致一個RDD的所有先祖RDD中的一些分區丟失,導致計算的重新執行。 
    對於hdfs:HDFS文件作為輸入RDD。對於這些RDD,partitions代表文件中每個文件塊的分區(包含文件塊在每個分區對象中的偏移量),preferredLocations表示文件塊所在的節點,而iterator讀取這些文件塊。 
    對於map:在任何一個RDD上調用map操作將返回一個MappedRDD對象。這個對象與其父對象具有相同的分區以及首選地點(preferredLocations),但在其迭代方法(iterator)中,傳遞給map的函數會應用到父對象記錄。 
    再一個經典的RDDs依賴圖吧 
    這里寫圖片描述

5.作業調度

當用戶對一個RDD執行action(如count 或save)操作時, 調度器會根據該RDD的lineage,來構建一個由若干階段(stage) 組成的一個DAG(有向無環圖)以執行程序,如下圖所示。 
每個stage都包含盡可能多的連續的窄依賴型轉換。各個階段之間的分界則是寬依賴所需的shuffle操作,或者是DAG中一個經由該分區能更快到達父RDD的已計算分區。之后,調度器運行多個任務來計算各個階段所缺失的分區,直到最終得出目標RDD。 
調度器向各機器的任務分配采用延時調度機制並根據數據存儲位置(本地性)來確定。若一個任務需要處理的某個分區剛好存儲在某個節點的內存中,則該任務會分配給那個節點。否則,如果一個任務處理的某個分區,該分區含有的RDD提供較佳的位置(例如,一個HDFS文件),我們把該任務分配到這些位置。 
“對應寬依賴類的操作 {比如 shuffle依賴),會將中間記錄物理化到保存父分區的節點上。這和MapReduce物化Map的輸出類似,能簡化數據的故障恢復過程。 
對於執行失敗的任務,只要它對應stage的父類信息仍然可用,它便會在其他節點上重新執行。如果某些stage變為不可用(例如,因為shuffle在map階段的某個輸出丟失了),則重新提交相應的任務以並行計算丟失的分區。 
若某個任務執行緩慢 (即”落后者”straggler),系統則會在其他節點上執行該任務的拷貝,這與MapReduce做法類似,並取最先得到的結果作為最終的結果。 
這里寫圖片描述 
實線圓角方框標識的是RDD。陰影背景的矩形是分區,若已存於內存中則用黑色背景標識。RDD G 上一個action的執行將會以寬依賴為分區來構建各個stage,對各stage內部的窄依賴則前后連接構成流水線。在本例中,stage 1 的輸出已經存在RAM中,所以直接執行 stage 2 ,然后stage 3。

6.內存管理

Spark提供了三種對持久化RDD的存儲策略:未序列化Java對象存於內存中、序列化后的數據存於內存及磁盤存儲。第一個選項的性能表現是最優秀的,因為可以直接訪問在JAVA虛擬機內存里的RDD對象。在空間有限的情況下,第二種方式可以讓用戶采用比JAVA對象圖更有效的內存組織方式,代價是降低了性能。第三種策略適用於RDD太大難以存儲在內存的情形,但每次重新計算該RDD會帶來額外的資源開銷。

對於有限可用內存,Spark使用以RDD為對象的LRU回收算法來進行管理。當計算得到一個新的RDD分區,但卻沒有足夠空間來存儲它時,系統會從最近最少使用的RDD中回收其一個分區的空間。除非該RDD便是新分區對應的RDD,這種情況下,Spark會將舊的分區繼續保留在內存,防止同一個RDD的分區被循環調入調出。因為大部分的操作會在一個RDD的所有分區上進行,那么很有可能已經存在內存中的分區將會被再次使用。

7.檢查點支持(checkpoint) 
雖然lineage可用於錯誤后RDD的恢復,但對於很長的lineage的RDD來說,這樣的恢復耗時較長。因此,將某些RDD進行檢查點操作(Checkpoint)保存到穩定存儲上,是有幫助的。 
通常情況下,對於包含寬依賴的長血統的RDD設置檢查點操作是非常有用的,在這種情況下,集群中某個節點的故障會使得從各個父RDD得出某些數據丟失,這時就需要完全重算。相反,對於那些窄依賴於穩定存儲上數據的RDD來說,對其進行檢查點操作就不是有必要的。如果一個節點發生故障,RDD在該節點中丟失的分區數據可以通過並行的方式從其他節點中重新計算出來,計算成本只是復制整個RDD的很小一部分。 
Spark當前提供了為RDD設置檢查點(用一個REPLICATE標志來持久化)操作的API,讓用戶自行決定需要為哪些數據設置檢查點操作。 
最后,由於RDD的只讀特性使得比常用的共享內存更容易做checkpoint,因為不需要關心一致性的問題,RDD的寫出可在后台進行,而不需要程序暫停或進行分布式快照。


序幕

好了,講了一大堆RDD理論上概念,現在,問問自己什么是RDD呢?我用最簡單幾句話概括下吧。 
RDD是spark的核心,也是整個spark的架構基礎,RDD是彈性分布式集合(Resilient Distributed Datasets)的簡稱,是分布式只讀且已分區集合對象。這些集合是彈性的,如果數據集一部分丟失,則可以對它們進行重建。具有自動容錯、位置感知調度和可伸縮性,而容錯性是最難實現的,大多數分布式數據集的容錯性有兩種方式:數據檢查點和記錄數據的更新。對於大規模數據分析系統,數據檢查點操作成本高,主要原因是大規模數據在服務器之間的傳輸帶來的各方面的問題,相比記錄數據的更新,RDD也只支持粗粒度的轉換,也就是記錄如何從其他RDD轉換而來(即lineage),以便恢復丟失的分區。 
簡而言之,特性如下: 
1. 數據結構不可變 
2. 支持跨集群的分布式數據操作 
3. 可對數據記錄按key進行分區 
4. 提供了粗粒度的轉換操作 
5. 數據存儲在內存中,保證了低延遲性


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM