Spark架構與作業執行流程簡介(scala版)


  在講spark之前,不得不詳細介紹一下RDD(Resilient Distributed Dataset),打開RDD的源碼,一開始的介紹如此:

 

字面意思就是彈性分布式數據集,是spark中最基本的數據抽象,它代表一個不可變可分區、里面的元素可並行計算的集合。

Resilient:彈性的,它表示的是數據可以保存在磁盤,也可以保存在內存中

Distributed:它的數據分布式存儲,並且可以做分布式的計算

Dataset:一個數據集,簡單的理解為集合,用於存放數據的

事實上,關於RDD有5個特性,同樣我們可以看看源碼是怎么介紹這5個特性的:

  1. A list of partitions (每個RDD都有一個分區列表)
  2. A function for computing each split (作用在每個分區上面的函數)
  3. A list of dependencies on other RDDs (一個RDD依賴其他多個RDD,這個特性很重要,rdd的容錯機制就是根據這個特性而來的)
  4. Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) [可選項:針對於 kv 鍵值對的RDD才具有該分區特性]
  5. Optionally, a list of preferred locations to compute each split on (e.g. block locations foran HDFS file) [可選項 : 數據本地性,數據最優,選擇盡量存儲在worker節點上的數據節點。]

那么我們又如何更好的去理解這5個特性呢?

1)一組分片(Partition),即數據集的基本組成單位。

對於RDD來說,每個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶可  以在創建RDD時指定RDD的分片個數,如果沒有指定,那么就會采用默認值。默認值  就是程序所分配到的CPU Core的數目。

2)一個計算每個分區的函數。

Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不需要保存每次計算的結果。

3)RDD之間的依賴關系。

RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前后依賴關系。在部分分區數據丟失時,Spark可以通過這個依賴關系重新計算丟失的分區數據,而不是對RDD的所有分區進行重新計算。

4)一個Partitioner,即RDD的分片函數。

當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另外一個是基於范圍的RangePartitioner。只有對於於key-value的RDD,才會有Partitioner, 非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。

5)一個列表,存儲存取每個Partition的優先位置(preferred location)。

對於一個HDFS文件來說,這個列表保存的就是每個Partition所在的塊的位置。按照“移動數據不如移動計算”的理念,Spark在進行任務調度的時候,會盡可能地將計算任務分配到其所要處理數據塊的存儲位置。

 

思考:為什么會產生RDD?

(1)    傳統的MapReduce雖然具有自動容錯、平衡負載和可拓展性的優點,但是其最大缺點是采用非循環式的數據流模型,使得在迭代計算式要進行大量的磁盤IO操作。RDD正是解決這一缺點的抽象方法

(2)     RDD是Spark提供的最重要的抽象的概念,它是一種有容錯機制的特殊集合,可以分布在集群的節點上,以函數式編操作集合的方式,進行各種並行操作。可以將RDD理 解為一個具有容錯機制的特殊集合,它提供了一種只讀、只能有已存在的RDD變換而來的共享內存,然后將所有數據都加載到內存中,方便進行多次重用。

 

    a. 他是分布式的,可以分布在多台機器上,進行計算。

              b. 他是彈性的,計算過程中內存不夠時它會和磁盤進行數據交換。

              c. 這些限制可以極大的降低自動容錯開銷

              d. 實質是一種更為通用的迭代並行計算框架,用戶可以顯示的控制計算的中間結果,然后將其自由運用於之后的計算。

(3)    RDD的容錯機制實現分布式數據集容錯方法有兩種:數據檢查點和記錄更新RDD

  采用記錄更新的方式:記錄所有更新點的成本很高。所以,RDD只支持粗  顆粒變換,即只記錄單個塊上執行的單個操作,然后創建某個RDD的變換序列(血統)存儲下來;變換序列指,每個RDD都包含了他是如何由其他RDD變換過來的以及如何重建某一塊數據的信息。因此RDD的容錯機制又稱“血統”容錯。 要實現這種“血統”容錯機制,最大的難題就是如何表達父RDD和子RDD之間的依賴關系。實際上依賴關系可以分兩種,窄依賴和寬依賴:窄依賴:子RDD中的每個數據塊只依賴於父RDD中對應的有限個固定的數據塊;寬依賴:子RDD中的一個數據塊可以依賴於父RDD中的所有數據塊。

(4)RDD內部的設計每個RDD都需要包含以下四個部分:

    a. 源數據分割后的數據塊,源代碼中的splits變量

              b. 關於“血統”的信息,源碼中的dependencies變量

              c. 一個計算函數(該RDD如何通過父RDD計算得到),源碼中的iterator(split)和compute函數

              d. 一些關於如何分塊和數據存放位置的元信息,如源碼中的partitioner和preferredLocations

思考:RDD在spark中的地位及作用

(1)    為什么會有Spark?

因為傳統的並行計算模型無法有效的解決迭代計算(iterative)和交互式計算(interactive);而Spark的使命便是解決這兩個問題,這也是他存在的價值和理由。

(2)    Spark如何解決迭代計算?

其主要實現思想就是RDD,把所有計算的數據保存在分布式的內存中。迭代計算通常情況下都是對同一個數據集做反復的迭代計算,數據在內存中將大大提升IO操作。這也是Spark涉及的核心:內存計算。

(3)    Spark如何實現交互式計算?

因為Spark是用scala語言實現的,Spark和scala能夠緊密的集成,所以Spark可以完美的運用scala的解釋器,使得其中的scala可以向操作本地集合對象一樣輕松操作分布式數據集。

(4)    Spark和RDD的關系?

可以理解為:RDD是一種具有容錯性基於內存的集群計算抽象方法,RDD是Spark Core的底層核心,Spark則是這個抽象方法的實現。

創建RDD 的方式

1)由一個已經存在的Scala集合創建。

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

2)由外部存儲系統的數據集創建,包括本地的文件系統,還有所有Hadoop支持的數據集,比如HDFS、Cassandra、HBase等

val rdd2 = sc.textFile("/words.txt")

3)通過已有的RDD經過算子轉換生成新的RDD

val rdd3=rdd2.flatMap(_.split(" "))

RDD編程API之RDD的算子分類

RDD的算子分類可以分為2種,TransformationAction類 。

Transformation:根據數據集創建一個新的數據集,計算后返回一個新RDD;例如:Map將數據的每個元素經過某個函數計算后,返回一個新的分布式數據集。

Action:對數據集計算后返回一個數值value給驅動程序;例如:collect將數據集的所有元素收集完成返回給程序。

 

關於Transformation:

RDD中的所有轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正運行。這種設計讓Spark更加有效率地運行。

常用的Transformation:

轉換

含義

map(func)

返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換后組成

filter(func)

返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成

flatMap(func)

類似於map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)

mapPartitions(func)

類似於map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]

mapPartitionsWithIndex(func)

類似於mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是

(Int, Interator[T]) => Iterator[U]

sample(withReplacement, fraction, seed)

根據fraction指定的比例對數據進行采樣,可以選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子

union(otherDataset)

對源RDD和參數RDD求並集后返回一個新的RDD

intersection(otherDataset)

對源RDD和參數RDD求交集后返回一個新的RDD

distinct([numTasks]))

對源RDD進行去重后返回一個新的RDD

groupByKey([numTasks])

在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD

reduceByKey(func, [numTasks])

在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設置

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

 

sortByKey([ascending], [numTasks])

在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD

sortBy(func,[ascending], [numTasks])

與sortByKey類似,但是更靈活

join(otherDataset, [numTasks])

在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD

cogroup(otherDataset, [numTasks])

在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD

cartesian(otherDataset)

笛卡爾積

pipe(command, [envVars])

對rdd進行管道操作

coalesce(numPartitions)

減少 RDD 的分區數到指定值。在過濾大量數據之后,可以執行此操作

repartition(numPartitions)

重新給 RDD 分區

repartitionAndSortWithinPartitions(partitioner)

重新給 RDD 分區,並且每個分區內以記錄的 key 排序

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

常用的Action:

動作

含義

reduce(func)

通過func函數聚集RDD中的所有元素,這個功能必須是課交換且可並聯的

collect()

在驅動程序中,以數組的形式返回數據集的所有元素

count()

返回RDD的元素個數

first()

返回RDD的第一個元素(類似於take(1))

take(n)

返回一個由數據集的前n個元素組成的數組

takeSample(withReplacement,num, [seed])

返回一個數組,該數組由從數據集中隨機采樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子

takeOrdered(n, [ordering])

返回自然順序或者自定義順序的前 n 個元素

saveAsTextFile(path)

將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對於每個元素,Spark將會調用toString方法,將它裝換為文件中的文本

saveAsSequenceFile(path) 

將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統。

saveAsObjectFile(path) 

將數據集的元素,以 Java 序列化的方式保存到指定的目錄下

countByKey()

針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。

foreach(func)

在數據集的每一個元素上,運行函數func進行更新。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

RDD的依賴關系:

RDD和它依賴的父RDD(s)的關系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。

窄依賴:窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用

總結:窄依賴我們形象的比喻為獨生子女

寬依賴:寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition

總結:寬依賴我們形象的比喻為超生

Lineage血統:

RDD只支持粗粒度轉換,即在大量記錄上執行的單個操作。將創建RDD的一系列Lineage(即血統)記錄下來,以便恢復丟失的分區。RDD的Lineage會記錄RDD的元數據信息和轉換行為,當該RDD的部分分區數據丟失時,它可以根據這些信息來重新運算和恢復丟失的數據分區。

RDD的緩存:

 

Spark速度非常快的原因之一,就是在不同操作中可以在內存中持久化或緩存個數據集。當持久化某個RDD后,每一個節點都將把計算的分片結果保存在內存中,並在對此RDD或衍生出的RDD進行的其他動作中重用。這使得后續的動作變得更加迅速。RDD相關的持久化和緩存,是Spark最重要的特征之一。可以說,緩存是Spark構建迭代式算法和快速交互式查詢的關鍵。  

RDD緩存方式:

RDD通過persist方法或cache方法可以將前面的計算結果緩存,但是並不是這兩個方法被調用時立即緩存,而是觸發后面的action時,該RDD將會被緩存在計算節點的內存中,並供后面重用。

通過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。

緩存有可能丟失,或者存儲於內存的數據由於內存不足而被刪除,RDD的緩存容錯機制保證了即使緩存丟失也能保證計算的正確執行。通過基於RDD的一系列轉換,丟失的數據會被重算,由於RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,並不需要重算全部Partition。

什么是DAG:

DAG(Directed Acyclic Graph)叫做有向無環圖,原始的RDD通過一系列的轉換就就形成了DAG,根據RDD之間的依賴關系的不同將DAG划分成不同的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,由於有Shuffle的存在,只能在parent RDD處理完成后,才能開始接下來的計算,因此寬依賴是划分Stage的依據。

 

spark的任務調度流程圖:

各個RDD之間存在着依賴關系,這些依賴關系形成有向無環圖DAG,DAGScheduler對這些依賴關系形成的DAG進行Stage划分,划分的規則很簡單,從后往前回溯,遇到窄依賴加入本stage,遇見寬依賴進行Stage切分。完成了Stage的划分,DAGScheduler基於每個Stage生成TaskSet,並將TaskSet提交給TaskScheduler。TaskScheduler 負責具體的task調度,最后在Worker節點上啟動task。

DAGScheduler的功能 :

(1)DAGScheduler構建Stage

(2)記錄哪個RDD或者 Stage 輸出被物化(緩存),通常在一個復雜的shuffle之后,通常物化一下(cache、persist),方便之后的計算。

(3)重新提交shuffle輸出丟失的stage(stage內部計算出錯)

(4)將 Taskset 傳給底層調度器

   a)– spark-cluster TaskScheduler

   b)– yarn-cluster YarnClusterScheduler

   c)– yarn-client YarnClientClusterScheduler

TaskScheduler的 功能:

(1)為每一個TaskSet構建一個TaskSetManager 實例管理這個TaskSet 的生命周期

(2)數據本地性決定每個Task最佳位置

(3)提交 taskset( 一組task) 到集群運行並監控

(4)推測執行,碰到 straggle(計算緩慢)任務需要放到別的節點上重試

(5)重新提交Shuffle輸出丟失的Stage給DAGScheduler

RDD容錯機制之checkpoint

 

checkpoint是什么:

1)Spark 在生產環境下經常會面臨transformation的RDD非常多(例如一個Job中包含1萬個RDD)或者具體transformation的RDD本身計算特別復雜或者耗時(例如計算時長超過1個小時),這個時候就要考慮對計算結果數據的持久化;

2)Spark是擅長多步驟迭代的,同時擅長基於Job的復用,這個時候如果能夠對曾經計算的過程產生的數據進行復用,就可以極大的提升效率;

3)如果采用persist把數據放在內存中,雖然是快速的,但是也是最不可靠的;如果把數據放在磁盤上,也不是完全可靠的!例如磁盤會損壞,系統管理員可能清空磁盤

4)Checkpoint的產生就是為了相對而言更加可靠的持久化數據,在Checkpoint的時候可以指定把數據放在本地,並且是多副本的方式,但是在生產環境下是放在HDFS上,這就天然的借助了HDFS高容錯、高可靠的特征來完成了最大化的可靠的持久化數據的方式;

5)Checkpoint是為了最大程度保證絕對可靠的復用RDD計算數據的Spark高級功能,通過checkpoint我們通常把數據持久化到HDFS來保證數據最大程度的安全性;

6)Checkpoint就是針對整個RDD計算鏈條中特別需要數據持久化的環節(后面會反復使用當前環節的RDD)開始基於HDFS等的數據持久化復用策略,通過對RDD啟動checkpoint機制來實現容錯和高可用;由此當加入進行一個1萬個步驟,在9000個步驟的時候persist,數據還是有可能丟失的,但是如果checkpoint,數據丟失的概率幾乎為0。

checkpoint原理機制:

 

(1) RDD使用cache機制從內存中讀取數據,如果數據沒有讀到,會使用checkpoint機制讀取數據。此時如果沒有checkpoint機制,那么就需要找到父RDD重新計算數據了,因此checkpoint是個很重要的容錯機制。checkpoint就是對於一個RDD chain(鏈),如果中間某些中間結果RDD,后面需要反復使用該數據,可能因為一些故障導致該中間數據丟失,那么就可以針對該RDD啟動checkpoint機制,checkpoint,首先需要調用sparkContext的setCheckpoint方法,設置一個容錯文件系統目錄,比如hdfs,然后對RDD調用checkpoint方法。之后再RDD所處的job運行結束后,會啟動一個單獨的job,來將checkpoint過的數據寫入之前設置的文件系統持久化,進行高可用。所以后面的計算在使用該RDD時,如果數據丟失了,但是還是可以從它的checkpoint中讀取數據,不需要重新計算。

 

(2) persist或者cache與checkpoint的區別在於,前者持久化只是將數據保存在BlockManager中但是其lineage是不變的,但是后者checkpoint執行完后,rdd已經沒有依賴RDD,只有一個checkpointRDD,checkpoint之后,RDD的lineage就改變了。而且,持久化的數據丟失的可能性更大,因為可能磁盤或內存被清理,但是checkpoint的數據通常保存到hdfs上,放在了高容錯文件系統。

Spark運行架構

 

 

Spark運行基本流程參見下面示意圖:

1) 構建Spark Application的運行環境(啟動SparkContext),SparkContext向資源管理器(可以是Standalone、Mesos或YARN)注冊並申請運行Executor資源;

2) 資源管理器分配Executor資源並啟動StandaloneExecutorBackend,Executor運行情況將隨着心跳發送到資源管理器上;

3) SparkContext構建成DAG圖,將DAG圖分解成Stage,並把Taskset發送給Task Scheduler。Executor向SparkContext申請Task,Task Scheduler將Task發放給Executor運行同時SparkContext將應用程序代碼發放給Executor。

4) Task在Executor上運行,運行完畢釋放所有資源。

Spark運行架構特點:

  • 每個Application獲取專屬的executor進程,該進程在Application期間一直駐留,並以多線程方式運行tasks。這種Application隔離機制有其優勢的,無論是從調度角度看(每個Driver調度它自己的任務),還是從運行角度看(來自不同Application的Task運行在不同的JVM中)。當然,這也意味着Spark Application不能跨應用程序共享數據,除非將數據寫入到外部存儲系統。
  • Spark與資源管理器無關,只要能夠獲取executor進程,並能保持相互通信就可以了。

  • 提交SparkContext的Client應該靠近Worker節點(運行Executor的節點),最好是在同一個Rack里,因為Spark Application運行過程中SparkContext和Executor之間有大量的信息交換;如果想在遠程集群中運行,最好使用RPC將SparkContext提交給集群,不要遠離Worker運行SparkContext。

  • Task采用了數據本地性和推測執行的優化機制。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM