Spark計算模型RDD


RDD彈性分布式數據集

RDD概述

  RDD(Resilient Distributed Dataset)叫做彈性分布式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區、里面的元素可並行計算的集合。RDD具有數據流模型的特點:自動容錯、位置感知性調度和可伸縮性。RDD允許用戶在執行多個查詢時顯式地將數據緩存在內存中,后續的查詢能夠重用這些數據,這極大地提升了查詢速度。

  Resilient:RDD中的數據可以存儲在內存中或者磁盤中。 

  Distributed:RDD中的數據是分布式存儲的,可用於分布式計算。

  Dataset:一個數據集合,用於存放數據的。

 

(1)    傳統的MapReduce雖然具有自動容錯、平衡負載和可拓展性的優點,但是其最大缺點是采用非循環式的數據流模型,使得在迭代計算中要進行大量的磁盤IO操作。RDD正是解決這一缺點的抽象方法。

(2)     RDD是Spark提供的最重要的抽象的概念,它是一種具有容錯機制的特殊集合,可以分布在集群的節點上,以函數式編程來操作集合,進行各種並行操作。可以把RDD的結果數據進行緩存,方便進行多次重用,避免重復計算。

 

RDD是一種具有容錯性、基於內存計算的抽象方法,RDD是Spark Core的底層核心,Spark則是這個抽象方法的實現。

RDD五大屬性

  * - A list of partitions
    一個rdd有多個分區
  * - A function for computing each split
    作用在每一個分區中函數
  * - A list of dependencies on other RDDs
    一個rdd會依賴於很多其他RDD,這里就涉及到rdd的依賴關系
  * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    (可選項) 針對於kv類型的rdd才會有分區函數(必須要產生shuffle),分區函數就決定了數據會流入到子rdd的那些分區中
  * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
    (可選項)一個列表,存儲每個Partition的優先位置.數據的本地性,數據位置最優(spark在進行任務分配的時候,會優先考慮存有數據的worker節點來進行任務計算)
    一個文件有3個block塊
    block1-----》node1
    block2-----》node2
    block3-----》node3
           node4
           node5

 

創建RDD

  • 1、通過sparkContext調用parallelize,從一個已經存在的scala集合構建

    • sc.parallelize(List(1,2,3,4,5))

  • 2、讀取外部數據源

    • val rdd1=sc.textFile("/words.txt")

  • 3、從一個已經存在rdd經過對應的算子操作生成新的rdd

    • val rdd2=rdd1.flatMap(_.split(" "))

 

RDD編程API

RDD的算子分類

  Transformation(轉換):根據數據集創建一個新的數據集,計算后返回一個新RDD;例如:一個rdd進行map操作后生了一個新的rdd

  Action(動作):對rdd結果計算后返回一個數值value給驅動程序;

  例如:collect算子將數據集的所有元素收集完成返回給驅動程序。

Transformation

  RDD中的所有轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正運行。這種設計讓Spark更加有效率地運行。

常用的Transformation:

轉換

含義

map(func)

返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換后組成

filter(func)

返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成

flatMap(func)

類似於map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)

mapPartitions(func)

類似於map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]

mapPartitionsWithIndex(func)

類似於mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是

(Int, Interator[T]) => Iterator[U]

union(otherDataset)

對源RDD和參數RDD求並集后返回一個新的RDD

intersection(otherDataset)

對源RDD和參數RDD求交集后返回一個新的RDD

distinct([numTasks]))

對源RDD進行去重后返回一個新的RDD

groupByKey([numTasks])

在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD

reduceByKey(func, [numTasks])

在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設置

sortByKey([ascending], [numTasks])

在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD

sortBy(func,[ascending], [numTasks])

與sortByKey類似,但是更靈活

join(otherDataset, [numTasks])

在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD

cogroup(otherDataset, [numTasks])

在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD

coalesce(numPartitions)

減少 RDD 的分區數到指定值。

repartition(numPartitions)

重新給 RDD 分區

repartitionAndSortWithinPartitions(partitioner)

 

重新給 RDD 分區,並且每個分區內以記錄的 key 排序

 

Action

  觸發整個任務真正的運行

動作

含義

reduce(func)

reduce將RDD中元素前兩個傳給輸入函數,產生一個新的return值,新產生的return值與RDD中下一個元素(第三個元素)組成兩個元素,再被傳給輸入函數,直到最后只有一個值為止。

collect()

在驅動程序中,以數組的形式返回數據集的所有元素

count()

返回RDD的元素個數

first()

返回RDD的第一個元素(類似於take(1))

take(n)

返回一個由數據集的前n個元素組成的數組

takeOrdered(n, [ordering])

返回自然順序或者自定義順序的前 n 個元素

saveAsTextFile(path)

將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對於每個元素,Spark將會調用toString方法,將它裝換為文件中的文本

saveAsSequenceFile(path) 

將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統。

saveAsObjectFile(path) 

將數據集的元素,以 Java 序列化的方式保存到指定的目錄下

countByKey()

針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。

foreach(func)

在數據集的每一個元素上,運行函數func

foreachPartition(func)

在數據集的每一個分區上,運行函數func

 

RDD常用的算子操作

  Spark Rdd的所有算子操作,請見《sparkRDD函數詳解》https://www.cnblogs.com/jifengblog/p/9369258.html

  啟動spark-shell 進行測試:

spark-shell --master spark://node1:7077 

 

練習1:map、filter

//通過並行化生成rdd

val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))

//對rdd1里的每一個元素乘2然后排序

val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)

//過濾出大於等於5的元素

val rdd3 = rdd2.filter(_ >= 5)

//將元素以數組的方式在客戶端顯示

rdd3.collect

 

練習2:flatMap

val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))

//將rdd1里面的每一個元素先切分在壓平

val rdd2 = rdd1.flatMap(_.split(" "))

rdd2.collect

 

練習3:交集、並集

val rdd1 = sc.parallelize(List(5, 6, 4, 3))

val rdd2 = sc.parallelize(List(1, 2, 3, 4))

//求並集

val rdd3 = rdd1.union(rdd2)

//求交集

val rdd4 = rdd1.intersection(rdd2)

//去重

rdd3.distinct.collect

rdd4.collect

 

練習4:join、groupByKey

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))

val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))

//求join

val rdd3 = rdd1.join(rdd2)

rdd3.collect

//求並集

val rdd4 = rdd1 union rdd2

rdd4.collect

//按key進行分組

val rdd5=rdd4.groupByKey

rdd5.collect

 

練習5:cogroup

val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))

val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("jim", 2)))

//cogroup

val rdd3 = rdd1.cogroup(rdd2)

//注意cogroup與groupByKey的區別

rdd3.collect

 

練習6:reduce

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))

//reduce聚合

val rdd2 = rdd1.reduce(_ + _)

rdd2.collect

 

練習7:reduceByKey、sortByKey

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1)))

val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))

val rdd3 = rdd1.union(rdd2)

//按key進行聚合

val rdd4 = rdd3.reduceByKey(_ + _)

rdd4.collect

//按value的降序排序

val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))

rdd5.collect

 

練習8:repartition、coalesce

val rdd1 = sc.parallelize(1 to 10,3)

//利用repartition改變rdd1分區數

//減少分區

rdd1.repartition(2).partitions.size

//增加分區

rdd1.repartition(4).partitions.size

//利用coalesce改變rdd1分區數

//減少分區

rdd1.coalesce(2).partitions.size

 

  注意:repartition可以增加和減少rdd中的分區數,coalesce只能減少rdd分區數,增加rdd分區數不會生效。

 

RDD的依賴關系

  RDD和它依賴的父RDD的關系有兩種不同的類型,即窄依賴(narrow dependency)寬依賴(wide dependency)

窄依賴

  窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用

  總結:窄依賴我們形象的比喻為獨生子女

寬依賴

  寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition

  總結:寬依賴我們形象的比喻為超生

Lineage(血統)

  RDD只支持粗粒度轉換,即只記錄單個塊上執行的單個操作。將創建RDD的一系列Lineage(即血統)記錄下來,以便恢復丟失的分區。RDD的Lineage會記錄RDD的元數據信息和轉換行為,當該RDD的部分分區數據丟失時,它可以根據這些信息來重新運算和恢復丟失的數據分區

 

RDD的緩存

  Spark速度非常快的原因之一,就是在不同操作中可以在內存中持久化或者緩存數據集。當持久化某個RDD后,每一個節點都將把計算分區結果保存在內存中,對此RDD或衍生出的RDD進行的其他動作中重用。這使得后續的動作變得更加迅速。

  RDD通過persist方法或cache方法可以將前面的計算結果緩存,但是並不是這兩個方法被調用時立即緩存,而是觸發后面的action時,該RDD將會被緩存在計算節點的內存中,並供后面重用。

  通過查看源碼發現cache最終也是調用了persist方法默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。

 

 

DAG的生成

  DAG(Directed Acyclic Graph)叫做有向無環圖,原始的RDD通過一系列的轉換就形成了DAG,根據RDD之間依賴關系的不同將DAG划分成不同的Stage(調度階段)。對於窄依賴,partition的轉換處理在一個Stage中完成計算。對於寬依賴,由於有Shuffle的存在,只能在parent RDD處理完成后,才能開始接下來的計算,因此寬依賴是划分Stage的依據

 

Spark任務調度

 

  各個RDD之間存在着依賴關系,這些依賴關系就形成有向無環圖DAG,DAGScheduler對這些依賴關系形成的DAG進行Stage划分,划分的規則很簡單,從后往前回溯,遇到窄依賴加入本stage,遇見寬依賴進行Stage切分。完成了Stage的划分。DAGScheduler基於每個Stage生成TaskSet,並將TaskSet提交給TaskScheduler。TaskScheduler 負責具體的task調度,最后在Worker節點上啟動task。

DAGScheduler

(1)DAGScheduler對DAG有向無環圖進行Stage划分。

(2)記錄哪個RDD或者 Stage 輸出被物化(緩存),通常在一個復雜的shuffle之后,通常物化一下(cache、persist),方便之后的計算。

(3)重新提交shuffle輸出丟失的stage(stage內部計算出錯)給TaskScheduler

(4)將 Taskset 傳給底層調度器

  a)– spark-cluster TaskScheduler

  b)– yarn-cluster YarnClusterScheduler

  c)– yarn-client YarnClientClusterScheduler

 

TaskScheduler

(1)為每一個TaskSet構建一個TaskSetManager 實例管理這個TaskSet 的生命周期

(2)數據本地性決定每個Task最佳位置

(3)提交 taskset( 一組task) 到集群運行並監控

(4)推測執行,碰到計算緩慢任務需要放到別的節點上重試

(5)重新提交Shuffle輸出丟失的Stage給DAGScheduler

 

RDD容錯機制之checkpoint

 checkpoint是什么

1)Spark 在生產環境下經常會面臨transformation的RDD非常多(例如一個Job中包含1萬個RDD)或者具體transformation的RDD本身計算特別復雜或者耗時(例如計算時長超過1個小時),這個時候就要考慮對計算結果數據持久化保存;

2)Spark是擅長多步驟迭代的,同時擅長基於Job的復用,這個時候如果能夠對曾經計算的過程產生的數據進行復用,就可以極大的提升效率;

3)如果采用persist把數據放在內存中,雖然是快速的,但是也是最不可靠的;如果把數據放在磁盤上,也不是完全可靠的!例如磁盤會損壞,系統管理員可能清空磁盤

4)Checkpoint的產生就是為了相對而言更加可靠的持久化數據,在Checkpoint的時候可以指定把數據放在本地,並且是多副本的方式,但是在生產環境下是放在HDFS上,這就天然的借助了HDFS高容錯、高可靠的特征來完成了最大化的可靠的持久化數據的方式;

  假如進行一個1萬個算子操作,在9000個算子的時候persist,數據還是有可能丟失的,但是如果checkpoint,數據丟失的概率幾乎為0。

 

checkpoint原理機制

(1) RDD使用cache機制從內存中讀取數據,如果數據沒有讀到,會使用checkpoint機制讀取數據。此時如果沒有checkpoint機制,那么就需要找到父RDD重新計算數據了,因此checkpoint是個很重要的容錯機制。checkpoint就是對於一個RDD chain(鏈)如果后面需要反復使用某些中間結果RDD,可能因為一些故障導致該中間數據丟失,那么就可以針對該RDD啟動checkpoint機制,使用checkpoint首先需要調用sparkContext的setCheckpointDir方法,設置一個容錯文件系統目錄,比如hdfs,然后對RDD調用checkpoint方法。之后在RDD所處的job運行結束后,會啟動一個單獨的job來將checkpoint過的數據寫入之前設置的文件系統持久化,進行高可用。所以后面的計算在使用該RDD時,如果數據丟失了,但是還是可以從它的checkpoint中讀取數據,不需要重新計算。

(2) persist或者cache與checkpoint的區別在於,前者持久化只是將數據保存在BlockManager中但是其lineage是不變的,但是后者checkpoint執行完后,rdd已經沒有依賴RDD,只有一個checkpointRDD,checkpoint之后,RDD的lineage就改變了persist或者cache持久化的數據丟失的可能性更大,因為可能磁盤或內存被清理,但是checkpoint的數據通常保存到hdfs上,放在了高容錯文件系統。

 

總結:

  • checkpoint就是提供了一個相對而言更加可靠的持久化數據方式

  • checkpoint使用

    • sc.setCheckpointDir("hdfs文件目錄")

    • rdd1.checkpoint

    • 后面也需要一個action算子操作,才會觸發checkpoint數據寫入到HDFS

  • cache/persist/checkpoint

    • 區別

      • 三者都可以對數據進行持久化保存

      • cache、persist他們不會改變rdd的血統

      • checkpoint會改變血統

  • 當前一個rdd數據丟失了,如何得到

    • 首先看一下你有沒有設置cache,如果有,直接從內存中獲取得到,如果沒有,接下來看一下你有沒有設置checkpoint操作,如果有,直接獲取得到,如果也沒有,那么這個時候只能夠通過lineage血統重新計算恢復。

 

 

Spark運行架構

 Spark運行基本流程

 

 

1)      構建Spark Application的運行環境(啟動SparkContext),SparkContext向資源管理器(可以是Standalone、Mesos或YARN)注冊並申請運行Executor資源;

2)     資源管理器分配Executor資源並啟動Executor,Executor運行情況將隨着心跳發送到資源管理器上;

3)    SparkContext構建成DAG圖,將DAG圖分解成Stage,並把Taskset發送給Task Scheduler。Executor向SparkContext申請Task,Task Scheduler將Task發放給Executor運行同時SparkContext將應用程序代碼發放給Executor。

4)    Task在Executor上運行,運行完畢釋放所有資源。

 

Spark運行架構特點

  • 每個Application獲取專屬的executor進程,該進程在Application期間一直駐留,並以多線程方式運行tasks。
  • Spark任務與資源管理器無關,只要能夠獲取executor進程,並能保持相互通信就可以了。
  • 提交SparkContext的Client應該靠近Worker節點(運行Executor的節點),最好是在同一個Rack里,因為Spark程序運行過程中SparkContext和Executor之間有大量的信息交換;如果想在遠程集群中運行,最好使用RPC將SparkContext提交給集群,不要遠離Worker運行SparkContext。
  • Task采用了數據本地性和推測執行的優化機制。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM