RDD原理與詳解


RDD詳解

原文連接 http://xiguada.org/spark_rdd/

RDD(Resilient Distributed Datasets彈性分布式數據集),是spark中最重要的概念,可以簡單的把RDD理解成一個提供了許多操作接口的數據集合,和一般數據集不同的是,其實際數據分布存儲於一批機器中(內存或磁盤中)。當然,RDD肯定不會這么簡單,它的功能還包括容錯、集合內的數據可以並行處理等。圖1是RDD類的視圖。

圖1

    一個簡單的例子

下面是一個實用scala語言編寫的spark應用(摘自Apache Spark 社區https://spark.apache.org/docs/latest/quick-start.html)。

/* SimpleApp.scala */

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.SparkConf

 

object SimpleApp {

def main(args: Array[String]) {

val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system

val conf = new SparkConf().setAppName("Simple Application") //設置程序名字

val sc = new SparkContext(conf)

val logData = sc.textFile(logFile, 2).cache() //加載文件為RDD,並緩存

val numAs = logData.filter(line => line.contains("a")).count()//包含a的行數

val numBs = logData.filter(line => line.contains("b")).count()//包含b的行數

println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))

}

}

    這個程序只是簡單的對輸入文件README.md包含'a'和'b'的行分別計數。當然如果你想運行這個程序,需要把YOUR_SPARK_HOME替換為Spark的安裝目錄。程序中定義了一個RDDlogData,並調用cache,把RDD數據緩存在內存中,這樣能防止重復加載文件。filterRDD提供的一種操作,它能過濾出符合條件的數據,countRDD提供的另一個操作,它能返回RDD數據集中的記錄條數。

RDD操作類型

    上述例子介紹了兩種RDD的操作:filter與count;事實上,RDD還提供了許多操作方法,如map,groupByKey,reduce等操作。RDD的操作類型分為兩類,轉換(transformations),它將根據原有的RDD創建一個新的RDD;行動(actions),對RDD操作后把結果返回給driver。例如,map是一個轉換,它把數據集中的每個元素經過一個方法處理后返回一個新的RDD;而reduce則是一個action,它收集RDD的所有數據后經過一些方法的處理,最后把結果返回給driver。

    RDD的所有轉換操作都是lazy模式,即Spark不會立刻計算結果,而只是簡單的記住所有對數據集的轉換操作。這些轉換只有遇到action操作的時候才會開始計算。這樣的設計使得Spark更加的高效,例如,對一個輸入數據做一次map操作后進行reduce操作,只有reduce的結果返回給driver,而不是把數據量更大的map操作后的數據集傳遞給driver。

    下面分別是transformations和action類型的操作。

Transformations類型的操作

Action類型的操作

更多RDD的操作描述和編程方法請參考社區文檔:https://spark.apache.org/docs/latest/programming-guide.html

 

RDD底層實現原理

RDD是一個分布式數據集,顧名思義,其數據應該分部存儲於多台機器上。事實上,每個RDD的數據都以Block的形式存儲於多台機器上,下圖是Spark的RDD存儲架構圖,其中每個Executor會啟動一個BlockManagerSlave,並管理一部分Block;而Block的元數據由Driver節點的BlockManagerMaster保存。BlockManagerSlave生成Block后向BlockManagerMaster注冊該Block,BlockManagerMaster管理RDD與Block的關系,當RDD不再需要存儲的時候,將向BlockManagerSlave發送指令刪除相應的Block。

圖2 RDD存儲原理

RDD cache的原理

RDD的轉換過程中,並不是每個RDD都會存儲,如果某個RDD會被重復使用,或者計算其代價很高,那么可以通過顯示調用RDD提供的cache()方法,把該RDD存儲下來。那RDD的cache是如何實現的呢?

RDD中提供的cache()方法只是簡單的把該RDD放到cache列表中。當RDD的iterator被調用時,通過CacheManager把RDD計算出來,並存儲到BlockManager中,下次獲取該RDD的數據時便可直接通過CacheManager從BlockManager讀出。

RDD dependency與DAG

    RDD提供了許多轉換操作,每個轉換操作都會生成新的RDD,這是新的RDD便依賴於原有的RDD,這種RDD之間的依賴關系最終形成了DAG(Directed Acyclic Graph)。

    RDD之間的依賴關系分為兩種,分別是NarrowDependency與ShuffleDependency,其中ShuffleDependency為子RDD的每個Partition都依賴於父RDD的所有Partition,而NarrowDependency則只依賴一個或部分的Partition。下圖的groupBy與join操作是ShuffleDependency,map和union是NarrowDependency。

圖3 RDD dependency

    

RDD partitioner與並行度

    每個RDD都有Partitioner屬性,它決定了該RDD如何分區,當然Partition的個數還將決定每個Stage的Task個數。當前Spark需要應用設置Stage的並行Task個數(配置項為:spark.default.parallelism),在未設置的情況下,子RDD會根據父RDD的Partition決定,如map操作下子RDD的Partition與父Partition完全一致,Union操作時子RDD的Partition個數為父Partition個數之和。

    如何設置spark.default.parallelism對用戶是一個挑戰,它會很大程度上決定Spark程序的性能。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM