一、基本介紹
是什么?
快速,通用,可擴展的分布式計算引擎。
彈性分布式數據集RDD
RDD(Resilient Distributed Dataset)彈性分布式數據集,是Spark中最基本的數據(邏輯)抽象,它代表一個不可變、可分區、里面的元素可並行計算的集合。 RDD具有數據流模型的特點:自動容錯、位置感知性調度和可伸縮性。RDD允許用戶在執行多個查詢時顯式地將工作集緩存在內存中,后續的查詢能夠重用工作集,這極大地提升了查詢速度。
基本概念
基本流程
二、Hadoop和Spark的區別
Spark 是類Hadoop MapReduce的通用並行框架, 專門用於大數據量下的迭代式計算.是為了跟 Hadoop 配合而開發出來的,不是為了取代 Hadoop, Spark 運算比 Hadoop 的 MapReduce 框架快的原因是因為 Hadoop 在一次 MapReduce 運算之后,會將數據的運算結果從內存寫入到磁盤中,第二次 Mapredue 運算時在從磁盤中讀取數據,所以其瓶頸在2次運算間的多余 IO 消耗. Spark 則是將數據一直緩存在內存中,直到計算得到最后的結果,再將結果寫入到磁盤,所以多次運算的情況下, Spark 是比較快的. 其優化了迭代式工作負載。
Hadoop的局限 | Spark的改進 |
---|---|
抽象層次低,編碼難以上手。 | 通過使用RDD的統一抽象,實現數據處理邏輯的代碼非常簡潔。 |
只提供Map和Reduce兩個操作,欠缺表達力。 | 通過RDD提供了許多轉換和動作,實現了很多基本操作,如sort、join等。 |
一個job只有map和reduce兩個階段,復雜的程序需要大量的job來完成。且job之間的依賴關系需要應用開發者自行管理。 | 一個job可以包含多個RDD的轉換操作,只需要在調度時生成多個stage。一個stage中也可以包含多個map操作,只需要map操作所使用的RDD分區保持不變。 |
處理邏輯隱藏在代碼細節中,缺少整體邏輯視圖。 | RDD的轉換支持流式API,提供處理邏輯的整體視圖。 |
對迭代式數據的處理性能比較差,reduce與下一步map的中間結果只能存放在HDFS的文件系統中。 | 通過內存緩存數據,可大大提高迭代式計算的性能,內存不足時可溢寫到磁盤上。 |
reduce task需要等所有的map task全部執行完畢才能開始執行。 | 分區相同的轉換可以在一個task中以流水線的形式執行。只有分區不同的轉換需要shuffle操作。 |
時延高,只適合批數據處理,對交互式數據處理和實時數據處理支持不夠。 | 將流拆成小的batch,提供discretized stream處理流數據 |
三、RDD操作
兩種類型: transformation和action
Transformation
主要做的是就是將一個已有的RDD生成另外一個RDD。Transformation具有lazy特性(延遲加載)。
Transformation算子的代碼不會真正被執行。只有當我們的程序里面遇到一個action算子的時候,代碼才會真正的被執行。這種設計讓Spark更加有效率地運行。
常用的Transformation:
動作 | 說明 | 示例 |
---|---|---|
map(func) | 返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換后組成 (每一個輸入元素只能被映射為一個) | var rdd = sc.parallelize(List(“hello world”, “hello spark”, “hello hdfs”)) var rdd2 = rdd.map(x => x + “_1”) rdd2.foreach(println) |
filter(func) | 返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成 | var rdd3 = rdd2.filter(x => x.contains(“world”)) rdd3.foreach(println) |
flatMap(func) | 類似於map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素) | var rdd4 = rdd2.flatMap(x => x.split(" ")) rdd4.foreach(println) |
sample(withReplacement, fraction, seed) | 根據fraction指定的比例對數據進行采樣,可以選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子 | |
groupByKey([numTasks]) | 在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD | var rdd5 = rdd4.map(x => (x, 1)) var rdd6 = rdd5.groupByKey() rdd6.foreach(println) |
sample(withReplacement, fraction, seed) | 根據fraction指定的比例對數據進行采樣,可以選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子 | var rdd = sc.parallelize(1 to 10)rdd.sample(false,0.4).collect() rdd.sample(false,0.4, 9).collect() |
combineByKey | 合並相同的key的值 rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) | jake 80.0 jake 90.0 jake 85.0 mike 86.0 mike 90 求分數的平均值 |
單Value類型算子補充:
1. mapPartitions: 將待處理的數據以分區為單位發送到計算節點進行處理;
2. mapPartintions: 將待處理的數據以分區為單位發送到計算節點進行處理 ;
3. glom: 將同一個分區的數據直接轉換為相同類型的內存數組進行處理,分區不變 ;
4. groupBy: 將數據根據指定的規則進行分組, 分區默認不變,但是數據會被打亂重新組合 ;
5. distinct: 將數據集中重復的數據去重 ;
6. coalesce: 根據數據量縮減分區,用於大數據集過濾后,提高小數據集的執行效率
當 spark 程序中,存在過多的小任務的時候,可以通過 coalesce 方法,收縮合並分區,減少
分區的個數,減小任務調度成本 ;
7. repartition: 該操作內部其實執行的是 coalesce 操作,參數 shuffle 的默認值為 true。
8. sortBy: 該操作用於排序數據。在排序之前,可以將數據通過 f 函數進行處理,之后按照 f 函數處理
的結果進行排序,默認為升序排列
雙Value類型算子補充:
1. intersection: 對源 RDD 和參數 RDD 求交集后返回一個新的 RDD
2. union: 對源 RDD 和參數 RDD 求並集后返回一個新的 RDD
3. subtract: 以一個 RDD 元素為主, 去除兩個 RDD 中重復元素,將其他元素保留下來
Action
觸發代碼的運行,我們一段spark代碼里面至少需要有一個action操作。
常用的Action:
動作 | 含義 | 示例 |
---|---|---|
reduce(func) | 通過func函數聚集RDD中的所有元素,可以實現,RDD中元素的累加,計數和其他類型的聚集操作 | var rdd = sc.parallelize(1 to 10) rdd.reduce((x, y) => x+y) |
reduceByKey(func) | 按key進行reduce,讓key合並 | wordcount示例: var rdd = sc.parallelize(List(“hello world”, “hello spark”, “hello hdfs”)) rdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((x,y) => x+y).collect() |
collect() | 在驅動程序中,以數組的形式返回數據集的所有元素 | |
count() | 返回RDD的元素個數 | |
first() | 返回RDD的第一個元素(類似於take(1)) | |
take(n) | 返回一個由數據集的前n個元素組成的數組 | |
saveAsTextFile(path) | 將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對於每個元素,Spark將會調用toString方法,將它裝換為文件中的文本 | rdd.saveAsTextFile("/user/jd_ad/ads_platform/outergd/0124/demo2.csv") |
foreach(func) | 在數據集的每一個元素上,運行函數func進行更新。 | |
takeSample | 抽樣返回一個dateset中的num個元素 | var rdd = sc.parallelize(1 to 10) rdd.takeSample(false,10) |
四、Block與RDD生成過程
輸入可能以多個文件的形式存儲在HDFS上,每個File都包含了很多塊,稱為Block。
當Spark讀取這些文件作為輸入時,會根據具體數據格式對應的InputFormat進行解析,一般是將若干個Block合並成一個輸入分片,稱為InputSplit,注意InputSplit不能跨越文件。
隨后將為這些輸入分片生成具體的Task。InputSplit與Task是一一對應的關系。
隨后這些具體的Task每個都會被分配到集群上的某個節點的某個Executor去執行。
- 每個節點可以起一個或多個Executor。
- 每個Executor由若干core組成,每個Executor的每個core一次只能執行一個Task。
- 每個Task執行的結果就是生成了目標RDD的一個partiton。
注意: 這里的core是虛擬的core而不是機器的物理CPU核,可以理解為就是Executor的一個工作線程。
而 Task被執行的並發度 = Executor數目 * 每個Executor核數。
至於partition的數目:
- 對於數據讀入階段,例如sc.textFile,輸入文件被划分為多少InputSplit就會需要多少初始Task。
- 在Map階段partition數目保持不變。
- 在Reduce階段,RDD的聚合會觸發shuffle操作,聚合后的RDD的partition數目跟具體操作有關,例如repartition操作會聚合成指定分區數,還有一些算子是可配置的。
五、依賴關系與Stage划分
RDD之間有一系列的依賴關系,依賴關系又分為窄依賴和寬依賴。簡單的區分發,可以看一下父RDD中的數據是否進入不同的子RDD,如果只進入到一個子RDD則是窄依賴,否則就是寬依賴。如下圖
窄依賴( narrow dependencies )
- 子RDD 的每個分區依賴於常數個父分區(即與數據規模無關)
- 輸入輸出一對一的算子,且結果RDD 的分區結構不變,主要是map 、flatMap
- 輸入輸出一對一,但結果RDD 的分區結構發生了變化,如union 、coalesce
- 從輸入中選擇部分元素的算子,如filter 、distinct 、subtract 、sample
寬依賴( wide dependencies )
- 子RDD 的每個分區依賴於所有父RDD 分區
- 對單個RDD 基於key 進行重組和reduce ,如groupByKey 、reduceByKey ;
- 對兩個RDD 基於key 進行join 和重組,如join
Spark任務會根據RDD之間的依賴關系,形成一個DAG有向無環圖,DAG會提交給DAGScheduler,DAGScheduler會把DAG划分相互依賴的多個stage,划分stage的依據就是RDD之間的寬窄依賴。遇到寬依賴就划分stage,每個stage包含一個或多個task任務。然后將這些task以taskSet的形式提交給TaskScheduler運行。 stage是由一組並行的task組成。切割規則:從后往前,遇到寬依賴就切割stage,遇到窄依賴就將這個RDD加入該stage中。 如下圖
六、Spark流程
調度流程(粗粒度圖解)
- 1、DriverProgram即用戶提交的程序定義並創建了SparkContext的實例,SparkContext會根據RDD對象構建DAG圖,然后將作業(job)提交(runJob)給DAGScheduler。
- 2、DAGScheduler對作業的DAG圖進行切分成不同的stage[stage是根據shuffle為單位進行划分],每個stage都是任務的集合(taskset)並以taskset為單位提交(submitTasks)給TaskScheduler。
- 3、TaskScheduler通過TaskSetManager管理任務(task)並通過集群中的資源管理器(Cluster Manager)[standalone模式下是Master,yarn模式下是ResourceManager]把任務(task)發給集群中的Worker的Executor, 期間如果某個任務(task)失敗, TaskScheduler會重試,TaskScheduler發現某個任務(task)一直未運行完成,有可能在不同機器啟動一個推測執行任務(與原任務一樣),哪個任務(task)先運行完就用哪個任務(task)的結果。無論任務(task)運行成功或者失敗,TaskScheduler都會向DAGScheduler匯報當前狀態,如果某個stage運行失敗,TaskScheduler會通知DAGScheduler可能會重新提交任務。
- 4、Worker接收到的是任務(task),執行任務(task)的是進程中的線程,一個進程中可以有多個線程工作進而可以處理多個數據分片,執行任務(task)、讀取或存儲數據。
執行流程(細粒度圖解)
- 1、通過SparkSubmit提交job后,Client就開始構建 spark context,即 application 的運行環境(使用本地的Client類的main函數來創建spark context並初始化它)
- 2、yarn client提交任務,Driver在客戶端本地運行;yarn cluster提交任務的時候,Driver是運行在集群上
- 3、SparkContext連接到ClusterManager(Master),向資源管理器注冊並申請運行Executor的資源(內核和內存)
- 4、Master根據SparkContext提出的申請,根據worker的心跳報告,來決定到底在那個worker上啟動executor
- 5、Worker節點收到請求后會啟動executor
- 6、executor向SparkContext注冊,這樣driver就知道哪些executor運行該應用
- 7、SparkContext將Application代碼發送給executor(如果是standalone模式就是StandaloneExecutorBackend)
- 8、同時SparkContext解析Application代碼,構建DAG圖,提交給DAGScheduler進行分解成stage,stage被發送到TaskScheduler。
- 9、TaskScheduler負責將Task分配到相應的worker上,最后提交給executor執行
- 10、executor會建立Executor線程池,開始執行Task,並向SparkContext匯報,直到所有的task執行完成
- 11、所有Task完成后,SparkContext向Master注銷
七、spark在yarn上的兩種運行模式(yarn-client和yarn-cluster)
1、Yarn-Client
- 1.Spark Yarn Client向YARN的ResourceManager申請啟動Application Master。同時在SparkContent初始化中將創建DAGScheduler和TASKScheduler等,由於我們選擇的是Yarn-Client模式,程序會選擇YarnClientClusterScheduler和YarnClientSchedulerBackend;
- 2.ResourceManager收到請求后,在集群中選擇一個NodeManager,為該應用程序分配第一個Container,要求它在這個Container中啟動應用程序的ApplicationMaster,與YARN-Cluster區別的是在該ApplicationMaster不運行SparkContext,只與SparkContext進行聯系進行資源的分派;
- 3.Client中的SparkContext初始化完畢后,與ApplicationMaster建立通訊,向ResourceManager注冊,根據任務信息向ResourceManager申請資源(Container);
- 4.一旦ApplicationMaster申請到資源(也就是Container)后,便與對應的NodeManager通信,要求它在獲得的Container中啟動啟動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動后會向Client中的SparkContext注冊並申請Task;
- 5.Client中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend運行Task並向Driver匯報運行的狀態和進度,以讓Client隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務;
- 6.應用程序運行完成后,Client的SparkContext向ResourceManager申請注銷並關閉自己。
2、Yarn-Cluster(企業中主要使用)
- 1.Spark Yarn Client向YARN中提交應用程序,包括ApplicationMaster程序、啟動ApplicationMaster的命令、需要在Executor中運行的程序等;
- 2.ResourceManager收到請求后,在集群中選擇一個NodeManager,為該應用程序分配第一個Container,要求它在這個Container中啟動應用程序的ApplicationMaster,其中ApplicationMaster進行SparkContext等的初始化;
- 3.ApplicationMaster向ResourceManager注冊,這樣用戶可以直接通過ResourceManage查看應用程序的運行狀態,然后它將采用輪詢的方式通過RPC協議為各個任務申請資源,並監控它們的運行狀態直到運行結束;
- 4.一旦ApplicationMaster申請到資源(也就是Container)后,便與對應的NodeManager通信,要求它在獲得的Container中啟動啟動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動后會向ApplicationMaster中的SparkContext注冊並申請Task。這一點和Standalone模式一樣,只不過SparkContext在Spark Application中初始化時,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler進行任務的調度,其中YarnClusterScheduler只是對TaskSchedulerImpl的一個簡單包裝,增加了對Executor的等待邏輯等;
- 5.ApplicationMaster中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend運行Task並向ApplicationMaster匯報運行的狀態和進度,以讓ApplicationMaster隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務;
- 6.應用程序運行完成后,ApplicationMaster向ResourceManager申請注銷並關閉自己。
3、兩種模式區別
理解YARN-Client和YARN-Cluster深層次的區別之前先清楚一個概念:Application Master。在YARN中,每個Application實例都有一個ApplicationMaster進程,它是Application啟動的第一個容器。它負責和ResourceManager打交道並請求資源,獲取資源之后告訴NodeManager為其啟動Container。從深層次的含義講YARN-Cluster和YARN-Client模式的區別其實就是ApplicationMaster進程的區別
YARN-Cluster模式下,Driver運行在AM(Application Master)中,它負責向YARN申請資源,並監督作業的運行狀況。當用戶提交了作業之后,就可以關掉Client,作業會繼續在YARN上運行,因而YARN-Cluster模式不適合運行交互類型的作業
YARN-Client模式下,Application Master僅僅向YARN請求Executor,Client會和請求的Container通信來調度他們工作,也就是說Client不能離開
下圖是幾種模式下的比較:
八、MapReduce的Shuffle和Spark中的Shuffle區別和聯系
Spark在DAG調度階段會將一個Job划分為多個Stage,上游Stage做map工作,下游Stage做reduce工作,其本質上還是MapReduce計算框架。Shuffle是連接map和reduce之間的橋梁,它將map的輸出對應到reduce輸入中,這期間涉及到序列化反序列化、跨節點網絡IO以及磁盤讀寫IO等,所以說Shuffle是整個應用程序運行過程中非常昂貴的一個階段,理解Spark Shuffle原理有助於優化Spark應用程序。
注:
1.什么是大數據處理的Shuffle?
無論是Hadoop還是Spark,都要實現Shuffle。Shuffle描述數據從map tasks的輸出到reduce tasks輸入的這段過程。
2.為什么需要進行Shuffle呢?
map tasks的output向着reduce tasks的輸入input映射的時候,並非節點一一對應的,在節點A上做map任務的輸出結果,可能要分散跑到reduce節點A、B、C、D ,就好像shuffle的字面意思“洗牌”一樣,這些map的輸出數據要打散然后根據新的路由算法(比如對key進行某種hash算法),發送到不同的reduce節點上去。
MapReduce的Shuffle
MapReduce 是 sort-based,進入 combine() 和 reduce() 的 records 必須先partition、key對中間結果進行排序合並。這樣的好處在於 combine/reduce() 可以處理大規模的數據,因為其輸入數據可以通過外排得到(mapper 對每段數據先做排序,reducer 的 shuffle 對排好序的每段數據做歸並)。
Spark中的Shuffle
前面已經提到,在DAG調度的過程中,Stage階段的划分是根據是否有shuffle過程,也就是存在ShuffleDependency寬依賴的時候,需要進行shuffle,這時候會將作業job划分成多個Stage;
Spark的Shuffle實現大致如下圖所示,在DAG階段以shuffle為界,划分stage,上游stage做map task,每個map task將計算結果數據分成多份,每一份對應到下游stage的每個partition中,並將其臨時寫到磁盤,該過程叫做shuffle write;下游stage做reduce task,每個reduce task通過網絡拉取上游stage中所有map task的指定分區結果數據,該過程叫做shuffle read,最后完成reduce的業務邏輯。
下圖是spark shuffle實現的一個版本演進。
基於Hash的Shuffle實現
基於Sort的Shuffle實現(現在采用的機制)
九、spark中的持久化(cache()、persist()、checkpoint())
RDD持久化級別
持久化級別 | 含義解釋 |
---|---|
MEMORY_ONLY | 使用未序列化的Java對象格式,將數據保存在內存中。如果內存不夠存放所有的數據,則數據可能就不會進行持久化。那么下次對這個RDD執行算子操作時,那些沒有被持久化的數據,需要從源頭處重新計算一遍。這是默認的持久化策略,使用cache()方法時,實際就是使用的這種持久化策略。 |
DISK_ONLY | 使用未序列化的Java對象格式,將數據全部寫入磁盤文件中。 |
MEMORY_ONLY_SER | 基本含義同MEMORY_ONLY。唯一的區別是,會將RDD中的數據進行序列化,RDD的每個partition會被序列化成一個字節數組。這種方式更加節省內存,從而可以避免持久化的數據占用過多內存導致頻繁GC。 |
MEMORY_AND_DISK | 使用未序列化的Java對象格式,優先嘗試將數據保存在內存中。如果內存不夠存放所有的數據,會將數據寫入磁盤文件中,下次對這個RDD執行算子時,持久化在磁盤文件中的數據會被讀取出來使用。 |
MEMORY_AND_DISK_SER | 基本含義同MEMORY_AND_DISK。唯一的區別是,會將RDD中的數據進行序列化,RDD的每個partition會被序列化成一個字節數組。這種方式更加節省內存,從而可以避免持久化的數據占用過多內存導致頻繁GC。 |
- cache和persist都是用於將一個RDD進行緩存,這樣在之后使用的過程中就不需要重新計算,可以大大節省程序運行時間。
- cache和persist的區別:cache只有一個默認的緩存級別
MEMORY_ONLY
,而persist可以根據情況設置其它的緩存級別
。 - checkpoint接口是將RDD持久化到HDFS中,與persist的區別是checkpoint會切斷此RDD之前的依賴關系,而persist會保留依賴關系。
checkpoint的兩大作用:
一是spark程序長期駐留,過長的依賴會占用很多的系統資源,定期checkpoint可以有效的節省資源;
二是維護過長的依賴關系可能會出現問題,一旦spark程序運行失敗,RDD的容錯成本會很高。
(注:checkpoint執行前要先進行cache,避免兩次計算。)