Spark學習(四) -- Spark作業提交


標簽(空格分隔): Spark


作業提交

先回顧一下WordCount的過程:

sc.textFile("README.rd").flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_)
  • 步驟一:val rawFile = sc.textFile("README.rd")
  • texyFile先生成HadoopRDD --> MappedRDD
  • 步驟二:val splittedText = rawFile.flatMap(line => line.split(" "))
  • flatMap將原來的MappedRDD --> FlatMappedRDD;
  • 步驟三:val wordCount = splittedText.map(word => (word, 1))
  • 將詞語生成相應的鍵值對,FlatMappedRDD -- > MappedRDD;
  • 步驟四:val reduceJob = wordCount.reduceByKey(_+_)
  • 其中,reduceByKey不是MappedRDD的方法。
  • Scala將MappedRDD隱式轉換為PairRDDFunctions
  • 步驟五:觸發執行reduceJob.foreach(println)
  • foreach會調用sc.runjob,從而生成Job並提交到Spark集群中運行。

ClosureCleaner的主要功能

當Scala在創建一個閉包時,需要先判定那些變量會被閉包所使用並將這些需要使用的變量存儲在閉包之內。但是有時會捕捉太多不必要的變量,造成帶寬浪費和資源浪費,ClosureCleaner則可以移除這些不必要的外部變量。

經常會遇到Task Not Serializable錯誤,產生無法序列化的原因就是在RDD的操作中引用了無法序列化的變量。

作業執行

作業的提交過程主要涉及Driver和Executor兩個節點。
在Driver中主要解決一下問題:

  • RDD依賴性分析,以生成DAG;
  • 根據RDD DAG將Job分割為多個Stage;
  • Stage一經確認,即生成相應的Task,將生成的Task分發到Executor執行。

此處輸入圖片的描述

(對於WordCount程序來說,一直到foreach()階段才會被提交,分析,執行!!)

依賴性分析及Stage划分

Spark中的RDD之間的依賴分為窄依賴和寬依賴。

  • 窄依賴是指父RDD的所有輸出都會被指定的子RDD使用,也就是輸出路徑是指定的;
  • 寬依賴是指父RDD的輸出由不同的子RDD使用,輸出路徑不固定。

此處輸入圖片的描述

將會導致窄依賴的Transformation有:

  • map
  • flatmap
  • filter
  • sample

將會導致寬依賴的Transformation有:

  • sortByKey
  • reduceByKey
  • groupByKey
  • cogroupByKey
  • join
  • cartensian

Scheduler會計算RDD之間的依賴關系,將擁有持續窄依賴的RDD歸並到同一個Stage中,而寬依賴則作為划分不同Stage的判斷標准。其中,handleJobSubmittedsubmitStage主要負責依賴性分析,對其處理邏輯做進一步的分析。

handleJobSubmitted -- 生成finalStage並產生ActiveJob

finalStage = new Stage(finalRDD, partitions.size, None, jobId, callSite); //生成finalStage
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) //根據finalStage產生ActiveJob

newStage -- 創建一個新的Stage

private def newStage(rdd:RDD[_], numTasks:Int, shuffleDep:Option[shuffleDependency[_,_,_]], jobId:Int, callSite:CallSite) : Stage = {
    val stage = new Stage(id,rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
}
//參數含義:id -- Stage的序號,數字越大,優先級越高
//rdd:Rdd[_] -- 歸屬本Stage的最后一個rdd
//numTasks -- 創建的Task數目,等於父rdd的輸出Partition的數目
//parents -- 父Stage列表

也就是說,在創建Stage的時候,已經清楚該Stage需要從多少不同的Partition讀入數據,並寫出到多少個不同的Partition中,即輸入與輸出的個數已經明確。

submitStage -- 遞歸完成所依賴的Stage然后提交

  1. 所依賴的Stage是否都已經完成,如果沒有則先執行所依賴的Stage;

  2. 如果所依賴的Stage已經完成,則提交自身所處的Stage。

    private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if(jobId.isDefined) {
    ....
    //依次處理所依賴的沒有完成的Stage
    } else {
    abortStage(stage, "No active job for stage " + stage.id) //提交自身的Stage
    }
    }

getMissingParentStage -- 通過圖的遍歷,找出依賴的所有父Stage

private def getMissingParentStage(stage: Stage) : List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
}

Stage的划分是如何確定的呢? -- 重要的判斷依據是是否存在ShuffleDependency,如果有則創建一個新的Stage。
如何判斷是否存在ShuffleDependency呢? -- 取決於RDD的轉換。ShuffledRDD, CoGroupedRDD, SubtractedRDD都會返回ShuffleDependency

getDependencies -- 對於所創建的RDD,明確其Dependency類型

override def getDependencies: Seq[Dependency[_]] = {
    List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
}

Stage划分完畢就會明確以下內容:
1) 產生的Stage需要從多少個Partition中讀取數據;
2) 產生的Stage會生成多少個Partition -- 決定需要產生多少不同的Task;
3) 產生的Stage是否屬於ShuffleMap類型 -- 決定生成的Task類型。

Spark中共分2種不同的Task:ShuffleMap和ResultTask。

Actor Model和Akka -- 消息交互機制

在作業提交及執行期間,Spark會產生大量的消息交互,那么這些信息如何進行交互的呢?

Actor Model

  • Actor Model最適合用於解決並發編程問題。
  • 每個Actor都是一個獨立的個體,它們之間沒有任何繼承關系,所有的交互通過消息傳遞完成;
  • 每個Actor的行為只有3種:消息接收;消息處理;消息發送;
  • 為啥不適用共享內存的方式來進行信息交互呢?
  • 共享內存會導致並發問題,為了解決狀態不一致,要引入鎖,對鎖的申請處理不好又容易形成死鎖,同時性能會下降!

HelloWorld in Akka:

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props

class HelloActor extends Actor {
    def receive = {
        case "hello" => println("hello back at you")
        case _       => println("huh?")
    }
}

object Main extends App {
    val system = ActorSystem("HelloSystem")
    //default Actor constructor
    val helloActor = System.actorOf(Props[HelloActor], name = "helloactor")
    helloActor ! "hello"
    helloActor ! "dias"
}

注意:

  1. 首先要創建一個Actor;
  2. 消息發送要使用!
  3. Actor中必須實現receive函數來處理接收到的消息。

任務創建和分發

  • Spark將由Executor執行的Task分為ShuffleMapTask(Map)ResultTask(Reduce)兩種;
  • 每個Stage生成Task的時候,根據Stage中的isShuffleMap標記確定Task的類型,如果標記為True則創建shuffleMapTask,否則創建ResultTask
  • submitMissingTasks負責創建新的Task(根據isShuffleMap標志來確定是哪種Task,然后確定Stage的輸出和輸出Partition);
  • 一旦任務任務類型及任務個數確定后,由Executor啟動相應的線程來執行;

makeOffers -- 處理DriverActor接收到的消息信號

TaskschedulerImpl發送ReviveOffers消息給DriverActor,DriverActor接收到消息后,調用makeOffers處理消息;

def makeOffers() {
    launchTasks(scheduler.resourceOffers(
    executorHost.toArray.map{case(id, host) => new WorkerOffer(id, host, freeCores(id))}))
}

makeOffers的處理邏輯為:

  1. 找到空閑的Executor,分發的策略是隨機分發,盡可能的將任務平攤到每個Executor;
  2. 如果有空閑額Executor,就將任務列表中的部分任務利用launchTasks發送給指定的Executor。

resourceOffers -- 任務分發

SchedulerBackend -- 將新創建的Task分發給Executor

LaunchTasks -- 發送指令

TaskDescription -- 完成序列化

任務執行

  • LaunchTask消息被Executor接收,Executor會使用launchTask對給消息進行處理;
  • 如果Executor沒有被注冊到Driver,即使接收到launchTask指令,也不會做任何處理。

launchTask

//CoarseGrainedSchedulerBackend.launchTasks
def launchTasks(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
    val tr = new TaskRunner(context, taskId, serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
}

TaskRunner -- 反序列化

updateDependencies -- 解決依賴性問題

Shuffle Task

TaskRunner會啟動一個新的線程,如何在run中調用用戶自己定義的處理函數呢?作用於RDD上的Operation是如何真正起作用的呢?

TaskRunner.run
       |_Task.run
            |_Task.runTask
                    |_RDD.iterator
                            |_RDD.computeOrReadCheckpoint
                                    |_RDD.compute

Reduce Task

Task在執行的時候,會產生大量的數據交互,這些數據可以分成3種不同的類型:
1)狀態相關,如StatusUpdate;
2)中間結果;
3)計算相關的數據Metrics Data.

ShuffleMapTask和ResultTask返回的結果有什么不同:

  • ShuffleMapTask需要返回MapStatus,而ResultTask只需要告知是否已經成功完成執行;
  • ScheduleBack接收到Executor發送過來的StatusUpdate;
  • ScheduleBackend接收到StatusUpdate之后:如果任務已經成功處理,則將其從監視列表中刪除。如果整個作業都完成,將占用的資源釋放;
  • TaskSchedulerImpl將當前順利完成的任務放入完成隊列,同時取出下一個等待運行的Task;
  • DAGSchedule中的handleTaskCompletion,會針對ResultTask和ShuffleMapTask區別對待結果:
  • 如果ResultTask執行成功,DAGSchedule會發出TaskSucced來通知對整個作業執行情況感興趣的監聽者

Checkpoint和Cache -- 存儲中間結果

出於容錯性及效率方面的考慮,有時需要將中間結果進行持久化保存,可以方便后面再次利用到該RDD時不需要重新計算。

中間結果的存儲有兩種方式:Checkpoint 和 Cache

  • Checkpoint將計算結果寫入到HDFS文件系統中,但不會保存RDD Lineage;
  • Checkpoint有兩種類型:Data Checkepoint 和 Metadata Checkpoint;
  • Cache則將數據緩存到內存,如果內存不足時寫入到磁盤,同時將Lineage也保存下來。

WebUI和Metrics -- 可視化觀察工具

當用戶在使用Spark時,無論對Spark Cluster的運行情況還是Spark Application運行時的一些細節,希望能夠可視化的觀察。

WebUI

瀏覽器輸入:http://localhost:8080

Http Server是如何啟動的,網頁中顯示的數據是從哪里得到的?

1) Spark用到的Http Server是Jetty,用Java編寫,能夠嵌入到用戶程序中執行,不用想Tomcat或JBoss那樣需要自己獨立的JVM進程。
2) SparkUI在SparkContext初始化時創建。

//Initial the spark UI, registering all asociated listeners
private[spark] val ui = new SparkUI(this)
ui.bind() //bind()函數真正啟動JettyServer

3) SparkListener持續監聽Stage和Task相關事件的發生,並進行數據更新(典型的觀察者設計模式)。

Metrics

測量模塊是不可或缺的,通過測量數據來感知系統的運行情況。在Spark中,由MetricsSystem來擔任這個任務。

  • Instance:表示誰在使用MetricSystem -- Master,Worker,Executor,Client Driver;
  • Source:表示數據源;
  • Sinks:數據目的地:
  • ConsoleSink -- 輸出到控制台;
  • CSVSink -- 定期保存為CSV文件;
  • JmxSink -- 注冊到Jmx;
  • MetricsServlet -- 在SparkUI中添加MetricsServlet,以查看Task運行時的測量數據;
  • GraphiteSink -- 發送給Grapgite以對整個系統進行監控。

存儲機制

在WordCount程序中,在JobTracker提交之后,被DAGScheduler分為兩個Stage:ShuffleMapTask和ResultTask。ShuffleMapTask的輸出數據是ResultTask的輸入。

ShuffleMapTask.runTask ---|   |-->ShuffledRDD.compute ---|
                          |   |                          |
                          V-Store                        V-Store    

那么問題來了,ShuffleMapTask的計算結果是如何被ResultTask獲得的呢?
1)ShuffleMapTask將計算的狀態(不是具體的計算數值)包裝為MapStatus返回給DAGScheduler;
2)DAGScheduler將MapStatus保存到MapOutputTrackerMaster中;
3)ResultTask在調用ShuffledRDD時會利用BlockStoreShuffleFetcher中的fetch方法獲取數據:
a. 首先要咨詢MapOutputTrackerMaster所要獲取數據的location;
b. 根據返回的結果調用BlockManager.getMultiple獲取到真正的數據。

此處輸入圖片的描述

其中,MapStatus的結構如上圖所示,由blockmanager_id 和 byteSize構成,blockmanager_id表示計算的中間結果數據實際存儲在哪個BlockManager,byteSize表示不同reduceid所要讀取的數據的大小。

Shuffle結果寫入

寫入過程:

ShuffleMapTask.runTask
    HashShuffleWriter.write
        BlockObjectWriter.write

HashShuffleWriter.write主要完成兩件事情:

  1. 判斷是否要進行聚合,比如<hello, 1><hello, 1>都要寫入的話,要先生成<hello, 2>,再進行后續的寫入工作;
  2. 利用Partitioner函數來決定<k, val>寫入哪一個文件中。
  3. 每一個臨時文件由三元組(shuffle_id, map_id, reduce_id)決定,;

shuffle結果讀取

ShuffledRDD的compute函數式讀取ShuffleMapTask計算結果的觸點。

ShuffleRDD.compute() -- 觸發讀取ShuffleMapTask的計算結果

override def compute(split:Partition, context:TaskContext) : Iterator[P] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K,V,C]]
    SparkEnv.get.shuffleManager.getReader().**read()**.asInstanceOf[Iterator[P]] //getReader()返回HashShuffleReader
    ......
}

HashShuffleReader.read()

override def read() : Iterator[Product2[K,C]] = {
    val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, Serializer.getSerializer(dep.serializer))
    .....
}

BlockStoreShuffleFetcher.fetch()

BlockStoreShuffleFetcher需要解決的問題:

  • 所要獲取的mapid的MapStatus的內容是什么;

  • 如何根據獲得的MapStatus取相應的BlockManager獲取數據。

  • 一個ShuffleMapTask會產生一個MapStatus,MapStatus中含有當前ShuffleMapTask產生的數據落到各個Partition中的大小,如果為0則表示該分區沒有數據產生;

  • 索引為reduceId,如果array(0) == 0則表示上一個ShuffleMapTask中生成的數據中沒有任何內容可以作為reduceId為0的ResultTask的輸入;

  • 如果所要獲取的文件落在本地,則調用getLocal讀取;否則發送請求到遠端BlockManager。

Spark內存的消耗。
Spark對內存的要求較高,在ShuffleMapTask和ResultTask中,由於需要先將計算結果保存在內存,然后寫入磁盤,如果每個數據分區的數據很大則會消耗大量的內存。

  • 每個Writer開啟100KB的緩存;
  • Records會占用大量內存;
  • 在ResultTask的combine階段,利用HashMap來緩存數據。如果讀取的數據量很大或則分區很多,都會導致內存不足。

Memory Store -- 獲取緩存的數據

在Spark運行過程中,可以將結果顯示地保存下來,那么如果想獲取緩存中的數據該怎么辦?
此處輸入圖片的描述

  • CacheManager:RDD在進行計算轉換的時候,通過CacheManager來獲取數據,並通過CacheManager來存儲計算結果;
  • BlockManager:CacheManager在讀取和存儲數據的時候主要依賴BlockManager來操作,它決定數據是從內存還是磁盤讀取數據;
  • MemoryStore:負責將數據保存在或從內存中讀取數據;
  • DiskStore:復雜將數據保存在或從內存中讀取數據;
  • BlockManagerWorker:數據寫入本地的MemoryStore或DiskStore是一個同步操作,為了保證容錯性還需要將數據復制到其他節點,由BlockManagerWorker異步完成數據復制操作;
  • ConnectionManager:負責與其他計算節點建立連接,並負責數據的發送和接收;
  • BlockManagerMaster:該模塊只運行在Driver Application所在的Executor,功能是負責記錄下所有BlockId存儲在哪個SlaveWorker上。

存儲子模塊啟動過程分析

每個存儲子模塊有SparkEnv來創建,創建過程在SparkEnv.create中完成。

數據寫入過程

此處輸入圖片的描述

① RDD.iterator是與Storage子系統交互的入口;
② CacheManager.getOrCompute調用BlockManager中的put接口來寫入數據;
③ 數據優先寫入到MemoryStore,如果內存已滿,則將最近使用次數較少的數據寫入磁盤;
④ 通知BlockManagerMaster有新的數據寫入,在BlockManagerMaster中保存元數據;
⑤ 如果數據備份數目大於1,則將寫入的數據與其他Slave Worker同步。

數據讀取過程

  • 數據讀取的入口是BlockManager.get(),先嘗試從本地獲取,如果所要獲取的內容不在本地,則發起遠程獲取。
  • 遠程獲取的代碼調用路徑為:getRemote -> doGetRemote;

TachyonStore

Spark優先將計算結果存儲到內存中,當內存不足的時候,寫到外部磁盤,到底是怎樣做的呢?

  • Spark實際上將中間結果放在了當前JVM的內存中,也就是JVM既是計算引擎,又是存儲引擎。
  • 當計算引擎中的錯誤導致JVM進程退出時,會導致所有存儲的內存全部消失;
  • 大量的Cache又會使得JVM發生GC的概率增大,嚴重影響計算性能。
  • 因此,使用Tachyon代替JVM的存儲功能。

Tachyon以Master/Worker的方式組織集群,由Master負責管理、維護文件系統,文件數據存儲在Worker節點中。

  • 底層支持Plugable的文件系統,如HDFS用於用戶指定文件的持久化;
  • 使用Journal機制持久化文件系統中的Metadata;
  • 利用ZooKeeper構件Master的HA;
  • 采用和Spark RDD類似的Lineage的思想用於災難恢復。

在最新的Spark中,Storage子系統引入了TachyonStore,在內存中實現了HDFS文件系統的接口,主要目的是盡可能的利用內存來作為數據持久層,避免過多的磁盤讀寫操作。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM