一、應用執行機制
一個應用的生命周期即,用戶提交自定義的作業之后,Spark框架進行處理的一系列過程。
在這個過程中,不同的時間段里,應用會被拆分為不同的形態來執行。
1、應用執行過程中的基本組件和形態
Driver:
運行在客戶端或者集群中,執行Application的main方法並創建SparkContext,調控整個應用的執行。
Application:
用戶自定義並提交的Spark程序。
Job:
一個Application可以包含多個Job,每個Job由Action操作觸發。
Stage:
比Job更小的單位,一個Job會根據RDD之間的依賴關系被划分為多個Stage,每個Stage中只存有RDD之間的窄依賴,即Transformation算子。
TaskSet:
每個Stage中包含的一組相同的Task。
Task:
最后被分發到Executor中執行的具體任務,執行Stage中包含的算子。
明確了一個應用的生命周期中會有哪些組件參與之后,再來看看用戶是怎么提交Spark程序的。
2、應用的兩種提交方式
Driver進程運行在客戶端(Client模式):
即用戶在客戶端直接運行程序。
程序的提交過程大致會經過以下階段:
- 用戶運行程序。
- 啟動Driver進行(包括DriverRunner和SchedulerBackend),並向集群的Master注冊。
- Driver在客戶端初始化DAGScheduler等組件。
- Woker節點向Master節點注冊並啟動Executor(包括ExecutorRunner和ExecutorBackend)。
- ExecutorBackend啟動后,向Driver內部的SchedulerBackend注冊,使得Driver可以找到計算節點。
- Driver中的DAGScheduler解析RDD生成Stage等操作。
- Driver將Task分配到各個Executor中並行執行。
Driver進程運行在集群中(某個Worker節點,Cluster模式):
即用戶將Spark程序提交給Master分配執行。
大致會經過一下流程:
- 用戶啟動客戶端,提交Spark程序給Master。
- Master針對每個應用分發給指定的Worker啟動Driver進行。
- Worker收到命令之后啟動Driver進程(即DriverRunner和其中的SchedulerBackend),並向Master注冊。
- Master指定其他Worker啟動Executor(即ExecutorRunner和其內部的ExecutorBackend)。
- ExecutorBackend向Driver中的SchedulerBackend注冊。
- Driver中的DAGScheduler解析RDD生產Stage等。
- Executor內部啟動線程池並行化執行Task。
可以看到,兩種程序的提交方式在處理過程中,僅僅是在哪個地方啟動Driver進程的區別而已。
為Client模式中時(使用Spark Shell直接執行的程序),Driver就在客戶端上。
為Cluster模式時(提交Spark程序到Master),Driver運行與集群中的某個Worker節點。
二、調度與任務分配模塊
Spark框架就像一個操作系統一樣,有着自己的作業調度策略,當集群運行在不同的模式下,調度不同級別的單位,使用的策略也是有所不同的。
1、Application之間的調度
當有多個用戶提交多個Spark程序時,Spark是如何調度這些應用並合理地分配資源呢?
Standalone模式下,默認使用FIFO,每個app會獨占所有資源
可以通過以下幾個參數調整集群相關的資源:
- spark.cores.max:調整app可以在整個集群中申請的CPU core數量
- spark.deploy.defaultCores:默認的CPU core數量
- spark.executor.memory:限制每個Executor可用的內存
在Mesos模式下,可以使用
- spark.mesos.coarse=true設置靜態配置資源的策略
- 使用mesos://URL且不配置spark.mesos.coarse=true(每個app會有獨立固定的內存分配,空閑時其他機器可以使用其資源)
在Yarn模式下,提交作業時可以使用
- 通過–num-executors控制分配多少個Executor給app
- –executor-memory和–executor-cores分別控制Executor的內存和CPU core
2、Application內部的Job調度機制
一個Application中,由各個Action觸發的多個Job之間也是存在調度關系的。
Action操作實現上是調用了SparkContext的runJob方法提交Job。
Spark中調度Job有兩種策略
FIFO:
- 第一個Job分配其所需的所有資源
- 第二個Job如果還有剩余資源的話就分配,否則等待
FAIR:
- 使用輪詢的方式調度Job
可以通過配置spark.scheduler.mode調整Job的調度方式
另外也可以配置調度池,具體參考官方文檔
或者參考conf/fairscheduler.xml.template文件。
3、Job中的Stage調度
Stage是由DAGScheduler組件生產的,在源碼中,有三個比較特殊的變量:
- waitingStages:存儲等待執行的Stages
- runningStages:存儲正在執行的Stages
- failedStages:存儲執行失敗的Stage
Spark會通過廣度優先遍歷找到最開始的Stage執行,若有父Stage沒有執行完則等待。
4、Stage中的Task調度
暫未了解。。。
三、I/O制度
Spark雖然是基於內存計算的框架,但是不可避免的也會接觸到一些存儲層,那么在和存儲層交互的時候,Spark做了哪些工作?
1、序列化
序列化的本質就是將對象轉換為字節流,可以理解為將鏈表中存儲的非連續空間的數據存儲轉化為連續空間存儲的數組中
Spark為什么要做序列化操作?
內存或磁盤中RDD會含有對象的存儲,而在節點間數據的傳輸時,序列化之后的數據可以節約空間和提高效率。
2、壓縮
壓縮是日常生活中的一個常見操作,好處顯而易見,節約空間,從而就可以獲得時間上的效率。
Spark中序列化之后的數據可以進行壓縮以減少空間開銷。
Spark支持兩種壓縮算法
- Snappy算法:高壓縮速度
- LZF算法:高壓縮比
在不同的場景中選擇不同的壓縮算法可以有效的提高程序運行的效率。
壓縮配置方式:
- 啟動前在spark-env.sh中設置:export SPARK_JAVA_OPTS=”-Dspark.broadcast.compress”
- 在應用程序中配置
conf.getBoolean(“spark.broadcast.compress,true”)
conf.set(“spark.broadcast.compress”,true)
3、塊管理
RDD從物理上看是一個元數據結構,記錄着Block和Node之間的映射關系。
存儲RDD是以Block塊為單位的,每個分區對應一個塊,PartitionID通過元數據信息可以映射到Block。
BlockManager管理和接口、塊讀寫流程、數據塊讀寫管理等細節待繼續深入了解。
四、通信模塊
Spark中使用Akka作為通信框架
- Actors是一組包含狀態和行為的對象
- 一個Actor接收到其他Actor的信息之后可以根據需求做出各種反應
- Client、Master、Worker等都是一個Actor
Spark各個組件的之間協調工作都是基於Akka機制來的,待深入了解的有:
- Client Actor通信代碼邏輯
- Master Actor通信代碼邏輯
- Worker Actor消息處理邏輯
五、容錯機制
之前講過,RDD之間的算子操作會形成DAG圖,RDD之間的依賴關系會形成Lineage。
要理解Lineage機制首先要明確兩種依賴的概念:
Shuffle Dependencies(寬依賴)
父分區可以被多個子分區所用
即多對多的關系Narrow Dependencies(窄依賴)
父分區最多被一個子分區所用
即一對一或者多對一的關系
當出現某個節點計算錯誤的時候,會順着RDD的操作順序往回走
一旦是Narrow Dependencies錯誤,重新計算父RDD分區即可,因為其不依賴其他節點
而如果Shuffle Dependencies錯誤,重算代價較高,因為一旦重新計算其依賴的父RDD分區,會造成冗余計算
這時候就需要人為的添加檢查點來提高容錯機制的執行效率
什么情況下需要加CheckPoint
- DAG中的Lineage過長,如果重算開銷太大,故在特定幾個Shuffle Dependencies上做CheckPoint是有價值的。
- Checkpoint會產生磁盤開銷,因為其就是將數據持久化到磁盤中,所以做檢查點的RDD最好是已經在內存中緩存了。
六、Shuffle機制
Shuffle的定義:對無規則的數據進行重組排序等過程
為什么要Shuffle:分布式計算中數據是分布在各個節點上計算的,而匯總統計等操作需要在所有數據上執行
Spark中Shuffle經歷的階段:
Shuffle Write
將各個節點數據寫入到指定分區
1、根據下一個Stage分區數分成相應的Bucket
2、將Bucket寫入磁盤
Shuffle Fetch
獲取各個分區發送的數據
1、在存儲有Shuffle數據節點的磁盤Fetch需要的數據
2、Fetch到本地之后進行自定義的聚集函數操作
最后記錄一下提交Spark作業的方法
在spark的bin目錄下
執行spark-submit腳本
./spark-submit \
–class 入口函數所在的類名全稱 \
–master spark master節點的地址(默認端口7077)\
–executor-memory 指定worker中Executor的內存 \
–total-executor-cores 100 \
jar文件所在的目錄 \