Spark源碼分析:多種部署方式之間的區別與聯系(轉)


原文鏈接:Spark源碼分析:多種部署方式之間的區別與聯系(1)

從官方的文檔我們可以知道,Spark的部署方式有很多種:local、Standalone、Mesos、YARN.....不同部署方式的后台處理進程是不一樣的,但是如果我們從代碼的角度來看,其實流程都差不多。
  從代碼中,我們可以得知其實Spark的部署方式其實比官方文檔中介紹的還要多,這里我來列舉一下:

  1、local:這種方式是在本地啟動一個線程來運行作業;
  2、local[N]:也是本地模式,但是啟動了N個線程;
  3、local[*]:還是本地模式,但是用了系統中所有的核;
  4、local[N,M]:這里有兩個參數,第一個代表的是用到的核個數;第二個參數代表的是容許該作業失敗M次。上面的幾種模式沒有指定M參數,其默認值都是1;
  5、local-cluster[N, cores, memory]:本地偽集群模式,參數的含義我就不說了,看名字就知道;式;
  6、spark:// :這是用到了 Spark的Standalone模
  7、(mesos|zk)://:這是Mesos模式;
  8、yarn-standalone\yarn-cluster\yarn-client:這是YARN模式。前面兩種代表的是集群模式;后面代表的是客戶端模式;
  9、simr://:這種你就不知道了吧?simr其實是Spark In MapReduce的縮寫。我們知道MapReduce 1中是沒有YARN的,如果你在MapReduce 1中使用Spark,那么就用這種模式吧。

  總體來說,上面列出的各種部署方式運行的流程大致一樣:都是從SparkContext切入,在SparkContext的初始化過程中主要做了以下幾件事:
  1、根據SparkConf創建SparkEnv

01 // Create the Spark execution environment (cache, map output tracker, etc)
02   private[spark] val env = SparkEnv.create(
03     conf,
04     "<driver>",
05     conf.get("spark.driver.host"),
06     conf.get("spark.driver.port").toInt,
07     isDriver = true,
08     isLocal = isLocal,
09     listenerBus = listenerBus)
10   SparkEnv.set(env)

  2、初始化executor的環境變量executorEnvs
  這個步驟代碼太多了,我就不貼出來。
  3、創建TaskScheduler

1 // Create and start the scheduler
2   private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)

  4、創建DAGScheduler

1 @volatile private[spark] var dagScheduler: DAGScheduler = _
2   try {
3     dagScheduler = new DAGScheduler(this)
4   catch {
5     case e: Exception =throw
6       newSparkException("DAGScheduler
7                      cannot be initialized due to %s".format(e.getMessage))
8   }

  5、啟動TaskScheduler

1 // start TaskScheduler after taskScheduler
2 // sets DAGScheduler reference in DAGScheduler's
3   // constructor
4   taskScheduler.start()

  那么,DAGScheduler和TaskScheduler都是什么?
  DAGScheduler稱為作業調度,它基於Stage的高層調度模塊的實現,它為每個Job的Stages計算DAG,記錄哪些RDD和Stage的輸出已經實物化,然后找到最小的調度方式來運行這個Job。然后以Task Sets的形式提交給底層的任務調度模塊來具體執行。
  TaskScheduler稱為任務調度。它是低層次的task調度接口,目前僅僅被TaskSchedulerImpl實現。這個接口可以以插件的形式應用在不同的task調度器中。每個TaskScheduler只給一個SparkContext調度task,這些調度器接受來自DAGScheduler中的每個stage提交的tasks,並負責將這些tasks提交給cluster運行。如果提交失敗了,它將會重試;並處理stragglers。所有的事件都返回到DAGScheduler中。
  在創建DAGScheduler的時候,程序已經將taskScheduler作為參數傳進去了,代碼如下:

01 def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
02     this(
03       sc,
04       taskScheduler,
05       sc.listenerBus,
06       sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
07       sc.env.blockManager.master,
08       sc.env)
09   }
10  
11   def this(sc: SparkContext) = this(sc, sc.taskScheduler)

也就是DAGScheduler封裝了TaskScheduler。TaskScheduler中有兩個比較重要的方法:

1 // Submit a sequence of tasks to run.
2 def submitTasks(taskSet: TaskSet): Unit
3  
4 // Cancel a stage.
5 def cancelTasks(stageId: Int, interruptThread: Boolean)

  這些方法在DAGScheduler中被調用,而TaskSchedulerImpl實現了TaskScheduler,為各種調度模式提供了任務調度接口,在TaskSchedulerImpl中還實現了resourceOffers和statusUpdate兩個接口給Backend調用,用於提供調度資源和更新任務狀態。
  在YARN模式中,還提供了YarnClusterScheduler類,他只是簡單地繼承TaskSchedulerImpl類,主要重寫了getRackForHost(hostPort: String)和postStartHook() 方法。繼承圖如下:


如果想及時了解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號: iteblog_hadoop

  

在下篇文章中,我將介紹上面九種部署模式涉及到的各種類及其之間的關系。歡迎關注本博客!這里先列出下篇文章用到的類圖

 



 

原文鏈接:Spark源碼分析:多種部署方式之間的區別與聯系(2)

 在《Spark源碼分析:多種部署方式之間的區別與聯系(1)》我們談到了SparkContext的初始化過程會做好幾件事情(這里就不再列出,可以去《Spark源碼分析:多種部署方式之間的區別與聯系(1)》查看),其中做了一件重要的事情就是創建TaskScheduler

1 // Create and start the scheduler
2   private[spark] var taskScheduler =<span class="wp_keywordlink_affiliate"><a href="http://www.iteblog.com/archives/tag/spark" title=""target="_blank"data-original-title="View all posts in Spark">Spark</a></span>Context.createTaskScheduler(this, master)

  在createTaskScheduler方法中,會根據用戶傳進來的master URL分別初始化不同的SchedulerBackend和ExecutorBackend。而且從代碼中我們可以看到master URL多大九種格式。但是在代碼中SchedulerBackend的種類可沒九種,只有五種;而ExecutorBackend只有三種,我們先來看看這些SchedulerBackend和ExecutorBackend的類繼承關系:

每一個Application都對應了一個SchedulerBackend和多個ExecutorBackend。下面我們分別介紹各種運行模式所涉及到的類

1、Local模式

  local模式出了偽集群模式(local-cluster),所有的local都是用到了LocalBackend和TaskSchedulerImpl類。LocalBackend接收來自TaskSchedulerImpl的receiveOffers()調用,並根據運行Application傳進來的CPU核生成WorkerOffer,並調用scheduler.resourceOffers(offers)生成Task,最后通過 executor.launchTask來執行這些Task。

2、Standalone

  Standalone模式使用SparkDeploySchedulerBackend和TaskSchedulerImpl,SparkDeploySchedulerBackend是繼承自CoarseGrainedSchedulerBackend類,並重寫了其中的一些方法。

  CoarseGrainedSchedulerBackend是一個粗粒度的資源調度類,在Spark job運行的整個期間,它會保存所有的Executor,在task運行完的時候,並不釋放該Executor,也不向Scheduler申請一個新的Executor。Executor的啟動方式有很多中,需要根據Application提交的Master URL進行判斷。在CoarseGrainedSchedulerBackend中封裝了一個DriverActor類,它接受Executor注冊(RegisterExecutor)、狀態更新(StatusUpdate)、響應Scheduler的ReviveOffers請求、殺死Task等等。

  在本模式中將會啟動一個或者多個CoarseGrainedExecutorBackend。具體是通過AppClient類向Master請求注冊Application。當注冊成功之后,Master會向Client進行反饋,並調用schedule啟動Driver和CoarseGrainedExecutorBackend,啟動的Executor會向DriverActor進行注冊。然后CoarseGrainedExecutorBackend通過aunchTask方法啟動已經提交的Task。

3、yarn-cluster

  yarn-cluster集群模式涉及到的類有YarnClusterScheduler和YarnClusterSchedulerBackend。YarnClusterSchedulerBackend同樣是繼承自CoarseGrainedSchedulerBackend。而YarnClusterScheduler繼承自TaskSchedulerImpl,它只是簡單地對TaskSchedulerImpl進行封裝,並重寫了getRackForHost和postStartHook方法。

  Client類通過YarnClient在Hadoop集群上啟動一個Container,並在其中運行ApplicationMaster,並通過Yarn提供的接口在集群中啟動多個Container用於運行CoarseGrainedExecutorBackend,並向CoarseGrainedSchedulerBackend中的DriverActor進行注冊。

  yarn-cluster模式作業從提交到運行的整個過程請參見本博客文章: 《Spark on YARN集群模式作業運行全過程分析》

4、yarn-client

  yarn-cluster集群模式涉及到的類有YarnClientClusterScheduler和YarnClientSchedulerBackend。YarnClientClusterScheduler繼承自TaskSchedulerImpl,並對其中的getRackForHost方法進行了重寫。
  Yarn-client模式下,會在集群外面啟動一個ExecutorLauncher來作為driver,並想集群申請Container,來啟動CoarseGrainedExecutorBackend,並向CoarseGrainedSchedulerBackend中的DriverActor進行注冊。

  yarn-client模式作業從提交到運行的整個過程請參見本博客文章: 《Spark on YARN客戶端模式作業運行全過程分析》

5、Mesos

  Mesos模式調度方式有兩種:粗粒度和細粒度。粗粒度涉及到的類有CoarseMesosSchedulerBackend和TaskSchedulerImpl類;而細粒度涉及到的類有MesosSchedulerBackend和TaskSchedulerImpl類。CoarseMesosSchedulerBackend和 MesosSchedulerBackend都繼承了MScheduler(其實是Mesos的Scheduler),便於注冊到Mesos資源調度的框架中。選擇哪種模式可以通過spark.mesos.coarse參數配置。默認的是MesosSchedulerBackend。

  上面涉及到Spark的許多部署模式,究竟哪種模式好這個很難說,需要根據你的需求,如果你只是測試Spark Application,你可以選擇local模式。而如果你數據量不是很多,Standalone 是個不錯的選擇。當你需要統一管理集群資源(Hadoop、Spark等)那么你可以選擇Yarn,但是這樣維護成本就會變高。
  yarn-cluster和yarn-client模式內部實現還是有很大的區別。如果你需要用於生產環境,那么請選擇yarn-cluster;而如果你僅僅是Debug程序,可以選擇yarn-client。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM