從官方的文檔我們可以知道,Spark的部署方式有很多種:local、Standalone、Mesos、YARN.....不同部署方式的后台處理進程是不一樣的,但是如果我們從代碼的角度來看,其實流程都差不多。
從代碼中,我們可以得知其實Spark的部署方式其實比官方文檔中介紹的還要多,這里我來列舉一下:
1、local:這種方式是在本地啟動一個線程來運行作業;
2、local[N]:也是本地模式,但是啟動了N個線程;
3、local[*]:還是本地模式,但是用了系統中所有的核;
4、local[N,M]:這里有兩個參數,第一個代表的是用到的核個數;第二個參數代表的是容許該作業失敗M次。上面的幾種模式沒有指定M參數,其默認值都是1;
5、local-cluster[N, cores, memory]:本地偽集群模式,參數的含義我就不說了,看名字就知道;式;
6、spark:// :這是用到了
Spark的Standalone模
7、(mesos|zk)://:這是Mesos模式;
8、yarn-standalone\yarn-cluster\yarn-client:這是YARN模式。前面兩種代表的是集群模式;后面代表的是客戶端模式;
9、simr://:這種你就不知道了吧?simr其實是Spark In MapReduce的縮寫。我們知道MapReduce 1中是沒有YARN的,如果你在MapReduce 1中使用Spark,那么就用這種模式吧。
總體來說,上面列出的各種部署方式運行的流程大致一樣:都是從SparkContext切入,在SparkContext的初始化過程中主要做了以下幾件事:
1、根據SparkConf創建SparkEnv
02 |
private [spark] val env = SparkEnv.create( |
05 |
conf.get( "spark.driver.host" ), |
06 |
conf.get( "spark.driver.port" ).toInt, |
09 |
listenerBus = listenerBus) |
2、初始化executor的環境變量executorEnvs
這個步驟代碼太多了,我就不貼出來。
3、創建TaskScheduler
2 |
private [spark] var taskScheduler = SparkContext.createTaskScheduler( this , master) |
4、創建DAGScheduler
1 |
@ volatile private [spark] var dagScheduler : DAGScheduler = _ |
3 |
dagScheduler = new DAGScheduler( this ) |
5 |
case e : Exception = > throw |
6 |
new SparkException( "DAGScheduler |
7 |
cannot be initialized due to %s" .format(e.getMessage)) |
5、啟動TaskScheduler
那么,DAGScheduler和TaskScheduler都是什么?
DAGScheduler稱為作業調度,它基於Stage的高層調度模塊的實現,它為每個Job的Stages計算DAG,記錄哪些RDD和Stage的輸出已經實物化,然后找到最小的調度方式來運行這個Job。然后以Task Sets的形式提交給底層的任務調度模塊來具體執行。
TaskScheduler稱為任務調度。它是低層次的task調度接口,目前僅僅被TaskSchedulerImpl實現。這個接口可以以插件的形式應用在不同的task調度器中。每個TaskScheduler只給一個SparkContext調度task,這些調度器接受來自DAGScheduler中的每個stage提交的tasks,並負責將這些tasks提交給cluster運行。如果提交失敗了,它將會重試;並處理stragglers。所有的事件都返回到DAGScheduler中。
在創建DAGScheduler的時候,程序已經將taskScheduler作為參數傳進去了,代碼如下:
01 |
def this (sc : SparkContext, taskScheduler : TaskScheduler) = { |
06 |
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], |
07 |
sc.env.blockManager.master, |
11 |
def this (sc : SparkContext) = this (sc, sc.taskScheduler) |
也就是DAGScheduler封裝了TaskScheduler。TaskScheduler中有兩個比較重要的方法:
2 |
def submitTasks(taskSet : TaskSet) : Unit |
5 |
def cancelTasks(stageId : Int, interruptThread : Boolean) |
這些方法在DAGScheduler中被調用,而TaskSchedulerImpl實現了TaskScheduler,為各種調度模式提供了任務調度接口,在TaskSchedulerImpl中還實現了resourceOffers和statusUpdate兩個接口給Backend調用,用於提供調度資源和更新任務狀態。
在YARN模式中,還提供了YarnClusterScheduler類,他只是簡單地繼承TaskSchedulerImpl類,主要重寫了getRackForHost(hostPort: String)和postStartHook() 方法。繼承圖如下:
如果想及時了解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:
iteblog_hadoop
在下篇文章中,我將介紹上面九種部署模式涉及到的各種類及其之間的關系。歡迎關注本博客!這里先列出下篇文章用到的類圖
在《Spark源碼分析:多種部署方式之間的區別與聯系(1)》我們談到了SparkContext的初始化過程會做好幾件事情(這里就不再列出,可以去《Spark源碼分析:多種部署方式之間的區別與聯系(1)》查看),其中做了一件重要的事情就是創建TaskScheduler。
2 |
private [spark] var taskScheduler = < span class = "wp_keywordlink_affiliate" >< a href = "http://www.iteblog.com/archives/tag/spark" title = "" target = "_blank" data-original-title = "View all posts in Spark" > Spark < /a >< /span > Context.createTaskScheduler( this , master) |
在createTaskScheduler方法中,會根據用戶傳進來的master URL分別初始化不同的SchedulerBackend和ExecutorBackend。而且從代碼中我們可以看到master URL多大九種格式。但是在代碼中SchedulerBackend的種類可沒九種,只有五種;而ExecutorBackend只有三種,我們先來看看這些SchedulerBackend和ExecutorBackend的類繼承關系:
每一個Application都對應了一個SchedulerBackend和多個ExecutorBackend。下面我們分別介紹各種運行模式所涉及到的類
1、Local模式
local模式出了偽集群模式(local-cluster),所有的local都是用到了LocalBackend和TaskSchedulerImpl類。LocalBackend接收來自TaskSchedulerImpl的receiveOffers()調用,並根據運行Application傳進來的CPU核生成WorkerOffer,並調用scheduler.resourceOffers(offers)生成Task,最后通過 executor.launchTask來執行這些Task。
2、Standalone
Standalone模式使用SparkDeploySchedulerBackend和TaskSchedulerImpl,SparkDeploySchedulerBackend是繼承自CoarseGrainedSchedulerBackend類,並重寫了其中的一些方法。
CoarseGrainedSchedulerBackend是一個粗粒度的資源調度類,在Spark job運行的整個期間,它會保存所有的Executor,在task運行完的時候,並不釋放該Executor,也不向Scheduler申請一個新的Executor。Executor的啟動方式有很多中,需要根據Application提交的Master URL進行判斷。在CoarseGrainedSchedulerBackend中封裝了一個DriverActor類,它接受Executor注冊(RegisterExecutor)、狀態更新(StatusUpdate)、響應Scheduler的ReviveOffers請求、殺死Task等等。
在本模式中將會啟動一個或者多個CoarseGrainedExecutorBackend。具體是通過AppClient類向Master請求注冊Application。當注冊成功之后,Master會向Client進行反饋,並調用schedule啟動Driver和CoarseGrainedExecutorBackend,啟動的Executor會向DriverActor進行注冊。然后CoarseGrainedExecutorBackend通過aunchTask方法啟動已經提交的Task。
3、yarn-cluster
yarn-cluster集群模式涉及到的類有YarnClusterScheduler和YarnClusterSchedulerBackend。YarnClusterSchedulerBackend同樣是繼承自CoarseGrainedSchedulerBackend。而YarnClusterScheduler繼承自TaskSchedulerImpl,它只是簡單地對TaskSchedulerImpl進行封裝,並重寫了getRackForHost和postStartHook方法。
Client類通過YarnClient在Hadoop集群上啟動一個Container,並在其中運行ApplicationMaster,並通過Yarn提供的接口在集群中啟動多個Container用於運行CoarseGrainedExecutorBackend,並向CoarseGrainedSchedulerBackend中的DriverActor進行注冊。
4、yarn-client
yarn-cluster集群模式涉及到的類有YarnClientClusterScheduler和YarnClientSchedulerBackend。YarnClientClusterScheduler繼承自TaskSchedulerImpl,並對其中的getRackForHost方法進行了重寫。
Yarn-client模式下,會在集群外面啟動一個ExecutorLauncher來作為driver,並想集群申請Container,來啟動CoarseGrainedExecutorBackend,並向CoarseGrainedSchedulerBackend中的DriverActor進行注冊。
5、Mesos
Mesos模式調度方式有兩種:粗粒度和細粒度。粗粒度涉及到的類有CoarseMesosSchedulerBackend和TaskSchedulerImpl類;而細粒度涉及到的類有MesosSchedulerBackend和TaskSchedulerImpl類。CoarseMesosSchedulerBackend和 MesosSchedulerBackend都繼承了MScheduler(其實是Mesos的Scheduler),便於注冊到Mesos資源調度的框架中。選擇哪種模式可以通過spark.mesos.coarse參數配置。默認的是MesosSchedulerBackend。
上面涉及到Spark的許多部署模式,究竟哪種模式好這個很難說,需要根據你的需求,如果你只是測試Spark Application,你可以選擇local模式。而如果你數據量不是很多,Standalone 是個不錯的選擇。當你需要統一管理集群資源(Hadoop、Spark等)那么你可以選擇Yarn,但是這樣維護成本就會變高。
yarn-cluster和yarn-client模式內部實現還是有很大的區別。如果你需要用於生產環境,那么請選擇yarn-cluster;而如果你僅僅是Debug程序,可以選擇yarn-client。