RDD(Resiliennt Distributed Datasets)抽象彈性分布式數據集對於Spark來說的彈性計算到底提現在什么地方?
自動進行內存和磁盤數據這兩種存儲方式的切換
Spark 可以使用 persist 和 cache 方法將任意 RDD 緩存到內存或者磁盤文件系統中。數據會優先存儲到內存中,當內存不足以存放RDD中的數據的時候,就會持久化到磁盤上。這樣,就可以最大化的利益內存以達到最高的計算效率;同時又有磁盤作為兜底存儲方案以確保計算結果的正確性。
基於Linage的高效容錯機制
Linage是用於記錄RDD的父子依賴關系,子RDD會記錄父RDD,且各個分片之間的數據互不影響。當出現錯誤的時候,只需要恢復單個Split的特定部分即可。常規容錯方式有兩種:第一種是數據Check Poin檢查點;第二個是記錄數據的更新。一般意義上CheckPoin的基本工作方式是通過數據中心的網絡鏈接到不同的機器上,然后每次操作的時候都要復制數據集。相當於每個更新都對應一個記錄且同步到分布式集群中的各個節點上。由此集群間網絡和磁盤資源耗損比較大。但是Spark的RDD只有在Action操作的時候才會真正觸發計算,而Transform操作是惰性的,所以期間只有在Action操作的時候才會記錄到CheckPoint中。
Task失敗自動重試
缺省情況下,會自動重試4次。也可以在spark-submit的時候指定spark.task.maxFailures參數來修改缺省值
private[spark] class TaskSchedulerImpl( val sc: SparkContext, val maxTaskFailures: Int, isLocal: Boolean = false) extends TaskScheduler with Logging { import TaskSchedulerImpl._ def this(sc: SparkContext) = { this(sc, sc.conf.get(config.TASK_MAX_FAILURES)) } ...
而其中的config.TASK_MAX_FAILURES來自:https://github.com/apache/spark/blob/e1ea806b3075d279b5f08a29fe4c1ad6d3c4191a/core/src/main/scala/org/apache/spark/internal/config/package.scala
private[spark] val TASK_MAX_FAILURES = ConfigBuilder("spark.task.maxFailures") .intConf .createWithDefault(4)
TaskScheduler從每一個Stage的DAGScheduler中獲取TaskSet,運行並校驗是否存在故障。如果存在故障則會重試指定測試
Stage失敗自動重試
Stage對象可以記錄並跟蹤多個StageInfo,缺省的重試次數也是4次。且可以直接運行計算失敗的階段,值計算失敗的數據分片。 Stage是Spark Job運行時均有相同邏輯功能和並行計算任務的一個基本單元。其中的所有任務都依賴同樣的Shuffle,每個DAG任務都是通過DAGScheduler在Stage的邊界處發生Shuffle形成Stage,然后DAGScheduler按照這些拓撲結構執行
/** * Number of consecutive stage attempts allowed before a stage is aborted. */ private[scheduler] val maxConsecutiveStageAttempts = sc.getConf.getInt("spark.stage.maxConsecutiveAttempts", DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)
源碼:https://github.com/apache/spark/blob/094563384478a402c36415edf04ee7b884a34fc9/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Checkpoint和Persist
Checkpoin和Persist都可以由我們自己來調用。checkpoint是對RDD進行標記,並且產生對應的一系列文件,且所有父依賴都會被刪除,是整個Linage的終點。persist工作的主體RDD會把計算的分片結果保存在內存或磁盤上,以確保下次針對同一個RDD的調用可以重用。 兩種的區別如下:
- Persist將RDD緩存之后,其Linage關系任然存在,在節點宕機或RDD部分緩存丟失的時候,RDD任然可以根據Linage關系來重新運算;而Checkpoin將RDD寫入到文件系統之后,將不再維護Linage
- rdd.persist即使調用的是DISK_ONLY操作,也就是只寫入文件系統,該寫入rdd是由BlockManager管理,executor程序停止后BlockManager也就停止了,所以其持久化到磁盤中的數據也會被清理掉;而checkpoint持久化到文件系統(HDFS文件或者是本地文件系統),不會被刪除,還可以供其他程序調用。
數據調度彈性
上面所提到的DAGScheduler和TaskScheduler和資源無關。Spark將執行模型抽象為DAG,可以讓多個Stage任務串聯或者並行執行,而無需將中間結果寫入到HDFS中。這樣當某個節點故障的時候,可以由其他節點來執行出錯的任務。
數據分片coalesce
Spark進行數據分片的時候,默認將數據存放在內存中,當內存不夠的時候,會將一部分存放到磁盤上。如經過分片后,某個Partition非常小,就可以合並多個小的partition來計算,而不用每個partition都起一個線程。這樣就提高了效率,也不會因為大量的線程而導致OOM