第1章 Spark 整體概述1.1 整體概念1.2 RDD 抽象1.3 計算抽象1.4 集群模式1.5 RPC 網絡通信抽象1.6 啟動 Standalone 集群1.7 核心組件1.8 核心組件交互流程1.9 Block 管理1.10整體應用第2章 Spark 通信架構2.1 通信組件概覽2.2 Endpoint 啟動過程2.3 Endpoint Send&Ask 流程2.4 Endpoint Receive 流程2.5 Endpoint Inbox 處理流程2.6 Endpoint 畫像第3章 腳本解析3.1 start-daemon.sh3.2 spark-class3.3 start-master.sh3.4 start-slaves.sh3.5 start-all.sh3.6 spark-submit第4章 Master 節點啟動4.1 腳本概覽4.2 啟動流程4.3 OnStart 監聽事件4.4 RpcMessage 處理 (receiveAndReply)4.5 OneWayMessage 處理 (receive)4.6 Master 對 RpcMessage/OneWayMessage 處理邏輯第5章 Worker 節點啟動5.1 腳本概覽5.2 啟動流程5.3 OnStart 監聽事件5.4 RpcMessage 處理 (receiveAndReply)5.5 OneWayMessage 處理 (receive)第6章 Client 啟動流程6.1 腳本概覽6.2 SparkSubmit 啟動流程6.3 Client 啟動流程6.4 Client 的 OnStart 監聽事件6.5 RpcMessage 處理 (receiveAndReply)6.6 OneWayMessage 處理(receive)第7章 Driver 和 DriverRunner7.1 Master 對 Driver 資源分配7.2 Worker 運行 DriverRunner7.3 DriverRunner 創建並運行 DriverWrapper第8章 SparkContext 解析8.1 SparkContext 解析8.2 SparkContext 創建過程8.3 SparkContext 簡易結構與交互關系8.4 Master 對 Application 資源分配8.5 Worker 創建 Executor第9章 Job 提交和 Task 的拆分9.1 整體預覽9.2 Code 轉化為初始 RDDs9.3 RDD 分解為待執行任務集合(TaskSet)9.4 TaskSet 封裝為 TaskSetManager 並提交至 Driver9.5 Driver 將 TaskSetManager 分解為 TaskDescriptions 並發布任務到 Executor第10章 Task 執行和回饋10.1 Task 的執行流程10.2 Task 的回饋流程10.3 Task 的迭代流程10.4 精彩圖解第11章 Spark 的數據存儲11.1 存儲子系統概覽11.2 啟動過程分析11.3 通信層11.4 存儲層11.4.1 Disk Store11.4.2 Memory Store11.5 數據寫入過程分析11.5.1 序列化與否11.6 數據讀取過程分析11.6.1 本地讀取11.6.2 遠程讀取11.7 Partition 如何轉化為 Block11.8 partition 和 block 的對應關系第12章 Spark Shuffle 過程12.1 MapReduce 的 Shuffle 過程介紹12.1.1 Spill 過程(刷寫過程)12.1.2 Merge12.1.3 Copy12.1.4 Merge Sort12.2 HashShuffle 過程介紹12.3 SortShuffle 過程介紹12.4 TungstenShuffle 過程介紹12.5 MapReduce 與 Spark 過程對比第13章 Spark 內存管理13.1 堆內和堆外內存規划13.1.1 堆內內存13.1.2 堆外內存13.1.3 內存管理接口13.2 內存空間分配13.2.1 靜態內存管理13.2.2 統一內存管理13.3 存儲內存管理13.3.1 RDD 的持久化機制13.3.2 RDD 緩存的過程13.3.3 淘汰和落盤13.4 執行內存管理13.4.1 多任務間內存分配13.4.2 Shuffle 的內存占用第14章 部署模式解析14.1 部署模式概述14.2 standalone 框架14.2.1 Standalone 模式下任務運行過程14.2.2 總結14.3 yarn 集群模式14.4 mesos 集群模式14.5 spark 三種部署模式的區別14.6 異常場景分析14.6.1 異常分析1:Worker 異常退出14.6.2 異常分析2:Executor 異常退出14.6.3 異常分析3:Master 異常退出第15章 wordcount 程序運行原理窺探15.1 spark 之 scala 實現 wordcount15.2 原理窺探
第1章 Spark 整體概述

1.1 整體概念
Apache Spark 是一個開源的通用集群計算系統,它提供了 High-level 編程 API,支持 Scala、Java 和 Python 三種編程語言。Spark 內核使用 Scala 語言編寫,通過基於 Scala 的函數式編程特性,在不同的計算層面進行抽象,
代碼設計非常優秀
。
1.2 RDD 抽象
RDD(Resilient Distributed Datasets),彈性分布式數據集,它是對分布式數據集的一種內存抽象,通過受限的共享內存方式來提供容錯性,同時這種內存模型使得計算比傳統的數據流模型要高效。RDD 具有 5 個重要的特性,如下圖所示:
![]()
上圖展示了 2 個 RDD 進行 JOIN 操作,體現了 RDD 所具備的 5 個主要特性,如下所示:
• 1)一組分區
• 2)計算每一個數據分片的函數
• 3)RDD 上的一組依賴
• 4)可選,對於鍵值對 RDD,有一個 Partitioner(通常是 HashPartitioner)
• 5)可選,一組 Preferred location 信息(例如,HDFS 文件的 Block 所在 location 信息)
有了上述特性,能夠非常好地通過 RDD 來表達分布式數據集,並作為構建 DAG 圖的基礎:首先抽象一個分布式計算任務的邏輯表示,最終將任務在實際的物理計算環境中進行處理執行。
1.3 計算抽象
在描述 Spark 中的計算抽象,我們首先需要了解如下幾個概念:
1)Application
• 用戶編寫的 Spark 程序,完成一個計算任務的處理。它是由一個 Driver 程序和一組運行於 Spark 集群上的 Executor 組成。
2)Job
• 用戶程序中,每次調用 Action 時,邏輯上會生成一個 Job,一個 Job 包含了多個 Stage 。
3)Stage
• Stage 包括兩類:ShuffleMapStage 和 ResultStage,如果用戶程序中調用了需要進行 Shuffle 計算的 Operator,如 groupByKey 等,就會以 Shuffle 為邊界分成 ShuffleMapStage 和 ResultStage。
4)TaskSet
• 基於 Stage 可以直接映射為 TaskSet,一個 TaskSet 封裝了一次需要運算的、具有相同處理邏輯的 Task,這些 Task 可以並行計算,粗粒度的調度是以 TaskSet 為單位的。
5)Task
• Task 是在物理節點上運行的基本單位,Task 包含兩類:ShuffleMapTask 和 ResultTask,分別對應於 Stage 中 ShuffleMapStage 和 ResultStage 中的一個執行基本單元。
下面,我們看一下,上面這些基本概念之間的關系,如下圖所示:![]()
上圖,為了簡單,每個 Job 假設都很簡單,並且只需要進行一次 Shuffle 處理,所以都對應 2 個 Stage。實際應用中,一個 Job 可能包含若干個 Stage,或者是一個相對復雜的 Stage DAG。
在 Standalone 模式下,默認使用的是 FIFO 這種簡單的調度策略,在進行調度的過程中,大概流程如下圖所示:
![]()
從用戶提交 Spark 程序,最終生成 TaskSet,而在調度時,通過 TaskSetManager 來管理一個 TaskSet(包含一組可在物理節點上執行的 Task),這里面 TaskSet 必須要按照順序執行才能保證計算結果的正確性,因為 TaskSet 之間是有序依賴的(上溯到 ShuffleMapStage 和 ResultStage),只有一個 TaskSet 中的所有 Task 都運行完成后,才能調度下一個 TaskSet 中的 Task 去執行。
1.4 集群模式
Spark 集群在設計的時候,並沒有在資源管理的設計上對外封閉,而是充分考慮了未來對接一些更強大的資源管理系統,如 YARN、Mesos 等,所以 Spark 架構設計將資源管理單獨抽象出一層,通過這種抽象能夠構建一種適合企業當前技術棧的插件式資源管理模塊,從而為不同的計算場景提供不同的資源分配與調度策略。Spark 集群模式架構,如下圖所示:
![]()
上圖中,Spark集群Cluster Manager目前支持如下三種模式:
1)Standalone 模式
• Standalone 模式是 Spark 內部默認實現的一種集群管理模式,這種模式是通過集群中的 Master 來統一管理資源,而與 Master 進行資源請求協商的是 Driver 內部的 StandaloneSchedulerBackend(實際上是其內部的 StandaloneAppClient 真正與 Master 通信),后面會詳細說明。
2)YARN 模式
• YARN 模式下,可以將資源的管理統一交給 YARN 集群的 ResourceManager 去管理,選擇這種模式,可以更大限度的適應企業內部已有的技術棧,如果企業內部已經在使用 Hadoop 技術構建大數據處理平台。
3)Mesos 模式
• 隨着 Apache Mesos 的不斷成熟,一些企業已經在嘗試使用 Mesos 構建數據中心的操作系統(DCOS),Spark 構建在 Mesos 之上,能夠支持細粒度、粗粒度的資源調度策略(Mesos 的優勢),也可以更好地適應企業內部已有技術棧。
• 那么,Spark 中是怎么考慮滿足這一重要的設計決策的呢?也就是說,如何能夠保證 Spark 非常容易的讓第三方資源管理系統輕松地接入進來。我們深入到類設計的層面看一下,如下類圖所示:
![]()
• 可以看出,Task 調度直接依賴 SchedulerBackend,SchedulerBackend 與實際資源管理模塊交互實現資源請求。這里面,CoarseGrainedSchedulerBackend 是 Spark 中與資源調度相關的最重要的抽象,它需要抽象出與 TaskScheduler 通信的邏輯,同時還要能夠與各種不同的第三方資源管理系統無縫地交互。實際上,CoarseGrainedSchedulerBackend 內部采用了一種 ResourceOffer 的方式來處理資源請求。
1.5 RPC 網絡通信抽象
Spark RPC 層是基於優秀的網絡通信框架 Netty 設計開發的,但是 Spark 提供了一種很好地抽象方式,將底層的通信細節屏蔽起來,而且也能夠基於此來設計滿足擴展性,比如,如果有其他不基於 Netty 的網絡通信框架的新的RPC接入需求,可以很好地擴展而不影響上層的設計。RPC 層設計,如下圖類圖所示:
![]()
任何兩個 Endpoint 只能通過消息進行通信,可以實現一個 RpcEndpoint 和一個 RpcEndpointRef。想要與 RpcEndpoint 通信,需要獲取到該 RpcEndpoint 對應的 RpcEndpointRef 即可,而且管理 RpcEndpoint 和 RpcEndpointRef 創建及其通信的邏輯,統一在 RpcEnv 對象中管理。
1.6 啟動 Standalone 集群
Standalone 模式下,Spark 集群采用了簡單的 Master-Slave 架構模式,Master 統一管理所有的 Worker,這種模式很常見,我們簡單地看下 Spark Standalone 集群啟動的基本流程,如下圖所示:
![]()
可以看到,Spark 集群采用的消息的模式進行通信,也就是 EDA 架構模式,借助於 RPC 層的優雅設計,任何兩個 Endpoint 想要通信,發送消息並攜帶數據即可。上圖的流程描述如下所示:
• 1)Master 啟動時首先創一個 RpcEnv 對象,負責管理所有通信邏輯。
• 2)Master 通過 RpcEnv 對象創建一個 Endpoint,Master 就是一個 Endpoint,Worker 可以與其進行通信。
• 3)Worker 啟動時也是創一個 RpcEnv 對象。
• 4)Worker 通過 RpcEnv 對象創建一個 Endpoint。
• 5)Worker 通過 RpcEnv 對,建立到 Master 的連接,獲取到一個 RpcEndpointRef 對象,通過該對象可以與 Master 通信。
• 6)Worker 向 Master 注冊,注冊內容包括主機名、端口、CPU Core 數量、內存數量。
• 7)Master 接收到 Worker 的注冊,將注冊信息維護在內存中的 Table 中,其中還包含了一個到 Worker 的 RpcEndpointRef 對象引用。
• 8)Master 回復 Worker 已經接收到注冊,告知 Worker 已經注冊成功。
• 9)此時如果有用戶提交 Spark 程序,Master 需要協調啟動 Driver;而 Worker 端收到成功注冊響應后,開始周期性向 Master 發送心跳。
1.7 核心組件
集群處理計算任務的運行時(即用戶提交了 Spark 程序),最核心的頂層組件就是 Driver 和 Executor,它們內部管理很多重要的組件來協同完成計算任務,核心組件棧如下圖所示:
![]()
Driver 和 Executor 都是運行時創建的組件,一旦用戶程序運行結束,他們都會釋放資源,等待下一個用戶程序提交到集群而進行后續調度。上圖,我們列出了大多數組件,其中 SparkEnv 是一個重量級組件,他們內部包含計算過程中需要的主要組件,而且,Driver 和 Executor 共同需要的組件在 SparkEnv 中也包含了很多。這里,我們不做過多詳述,后面交互流程等處會說明大部分組件負責的功能。
1.8 核心組件交互流程
在 Standalone 模式下,Spark 中各個組件之間交互還是比較復雜的,但是對於一個通用的分布式計算系統來說,這些都是非常重要而且比較基礎的交互。首先,為了理解組件之間的主要交互流程,我們給出一些基本要點:
• 一個 Application 會啟動一個 Driver
• 一個 Driver 負責跟蹤管理該 Application 運行過程中所有的資源狀態和任務狀態
• 一個 Driver 會管理一組 Executor
• 一個 Executor 只執行屬於一個 Driver 的 Task
核心組件之間的主要交互流程,如下圖所示:![]()
上圖中,通過不同顏色或類型的線條,給出了如下 6 個核心的交互流程,我們會詳細說明:
橙色:提交用戶 Spark 程序
用戶提交一個 Spark 程序,主要的流程如下所示:
•1)用戶 spark-submit 腳本提交一個 Spark 程序,會創建一個 ClientEndpoint 對象,該對象負責與 Master 通信交互
•2)ClientEndpoint 向 Master 發送一個 RequestSubmitDriver 消息,表示提交用戶程序
•3)Master 收到 RequestSubmitDriver 消息,向 ClientEndpoint 回復 SubmitDriverResponse,表示用戶程序已經完成注冊
•4)ClientEndpoint 向 Master 發送 RequestDriverStatus 消息,請求 Driver 狀態
•5)如果當前用戶程序對應的 Driver 已經啟動,則 ClientEndpoint 直接退出,完成提交用戶程序
紫色:啟動 Driver 進程
當用戶提交用戶 Spark 程序后,需要啟動 Driver 來處理用戶程序的計算邏輯,完成計算任務,這時 Master 協調需要啟動一個 Driver,具體流程如下所示:
•1)Maser 內存中維護着用戶提交計算的任務 Application,每次內存結構變更都會觸發調度,向 Worker 發送 LaunchDriver 請求
•2)Worker 收到 LaunchDriver 消息,會啟動一個 DriverRunner 線程去執行 LaunchDriver 的任務
•3)DriverRunner 線程在 Worker 上啟動一個新的 JVM 實例,該 JVM 實例內運行一個 Driver 進程,該 Driver 會創建 SparkContext 對象
紅色:注冊 Application
Dirver 啟動以后,它會創建 SparkContext 對象,初始化計算過程中必需的基本組件,並向 Master 注冊 Application,流程描述如下:
•1)創建 SparkEnv 對象,創建並管理一些數基本組件
•2)創建 TaskScheduler,負責 Task 調度
•3)創建 StandaloneSchedulerBackend,負責與 ClusterManager 進行資源協商
•4)創建 DriverEndpoint,其它組件可以與 Driver 進行通信
•5)在 StandaloneSchedulerBackend 內部創建一個 StandaloneAppClient,負責處理與 Master 的通信交互
•6)StandaloneAppClient 創建一個 ClientEndpoint,實際負責與 Master 通信
•7)ClientEndpoint 向 Master 發送 RegisterApplication 消息,注冊 Application
•8)Master 收到 RegisterApplication 請求后,回復 ClientEndpoint 一個 RegisteredApplication 消息,表示已經注冊成功
藍色:啟動 Executor 進程
•1)Master 向 Worker 發送 LaunchExecutor 消息,請求啟動 Executor;同時 Master 會向 Driver 發送 ExecutorAdded 消息,表示 Master 已經新增了一個 Executor(此時還未啟動)
•2)Worker 收到 LaunchExecutor 消息,會啟動一個 ExecutorRunner 線程去執行 LaunchExecutor 的任務
•3)Worker 向 Master 發送 ExecutorStageChanged 消息,通知 Executor 狀態已發生變化
•4)Master 向 Driver 發送 ExecutorUpdated 消息,此時 Executor 已經啟動
粉色:啟動 Task 執行
•1)StandaloneSchedulerBackend 啟動一個 DriverEndpoint
•2)DriverEndpoint 啟動后,會周期性地檢查 Driver 維護的 Executor 的狀態,如果有空閑的 Executor 便會調度任務執行
•3)DriverEndpoint 向 TaskScheduler 發送 Resource Offer 請求
•4)如果有可用資源啟動 Task,則 DriverEndpoint 向 Executor 發送 LaunchTask 請求
•5)Executor 進程內部的 CoarseGrainedExecutorBackend 調用內部的 Executor 線程的 launchTask 方法啟動 Task
•6)Executor 線程內部維護一個線程池,創建一個 TaskRunner 線程並提交到線程池執行
綠色:Task 運行完成
•1)Executor 進程內部的 Executor 線程通知 CoarseGrainedExecutorBackend,Task 運行完成
•2)CoarseGrainedExecutorBackend 向 DriverEndpoint 發送 StatusUpdated 消息,通知 Driver 運行的 Task 狀態發生變更
•3)StandaloneSchedulerBackend 調用 TaskScheduler 的 updateStatus 方法更新 Task 狀態
•4)StandaloneSchedulerBackend 繼續調用 TaskScheduler 的 resourceOffers 方法,調度其他任務運行
1.9 Block 管理
Block 管理,主要是為 Spark 提供的 Broadcast 機制提供服務支撐的。Spark 中內置采用 TorrentBroadcast 實現,該 Broadcast 變量對應的數據(Task 數據)或數據集(如 RDD),默認會被切分成若干 4M 大小的 Block,Task 運行過程中讀取到該 Broadcast 變量,會以 4M 為單位的 Block 為拉取數據的最小單位,最后將所有的 Block 合並成 Broadcast 變量對應的完整數據或數據集。將數據切分成 4M 大小的 Block,Task 從多個 Executor 拉取 Block,可以非常好地均衡網絡傳輸負載,提高整個計算集群的穩定性。
通常,用戶程序在編寫過程中,會對某個變量進行 Broadcast,該變量稱為 Broadcast 變量。在實際物理節點的 Executor 上執行 Task 時,需要讀取 Broadcast 變量對應的數據集,那么此時會根據需要拉取 DAG 執行流上游已經生成的數據集。采用 Broadcast 機制,可以有效地降低數據在計算集群環境中傳輸的開銷。具體地,如果一個用戶對應的程序中的 Broadcast 變量,對應着一個數據集,它在計算過程中需要拉取對應的數據,如果在同一個物理節點上運行着多個 Task,多個 Task 都需要該數據,有了 Broadcast 機制,只需要拉取一份存儲在本地物理機磁盤即可,供多個 Task 計算共享。
另外,用戶程序在進行調度過程中,會根據調度策略將 Task 計算邏輯數據(代碼)移動到對應的 Worker 節點上,最優情況是對本地數據進行處理,那么代碼(序列化格式)也需要在網絡上傳輸,也是通過 Broadcast 機制進行傳輸,不過這種方式是首先將代碼序列化到 Driver 所在 Worker 節點,后續如果 Task 在其他 Worker 中執行,需要讀取對應代碼的 Broadcast 變量,首先就是從 Driver 上拉取代碼數據,接着其他晚一些被調度的 Task 可能直接從其他 Worker 上的 Executor 中拉取代碼數據。
我們通過以 Broadcast 變量 taskBinary 為例,說明 Block 是如何管理的,如下圖所示:![]()
上圖中,Driver 負責管理所有的 Broadcast 變量對應的數據所在的 Executor,即一個 Executor 維護一個 Block 列表。在 Executor 中運行一個 Task 時,執行到對應的 Broadcast 變量 taskBinary,如果本地沒有對應的數據,則會向 Driver 請求獲取 Broadcast 變量對應的數據,包括一個或多個 Block 所在的 Executor 列表,然后該 Executor 根據 Driver 返回的 Executor 列表,直接通過底層的 BlockTransferService 組件向對應 Executor 請求拉取 Block。Executor 拉取到的 Block 會緩存到本地,同時向 Driver 報告該 Executor 上存在的 Block 信息,以供其他 Executor 執行 Task 時獲取 Broadcast 變量對應的數據。
1.10整體應用
用戶通過 spark-submit 提交或者運行 spark-shell REPL,集群創建 Driver,Driver 加載 Application,最后 Application 根據用戶代碼轉化為 RDD,RDD 分解為 Tasks,Executor 執行 Task 等系列知識,整體交互藍圖如下:
![]()
第2章 Spark 通信架構
Spark作為分布式計算框架,多個節點的設計與相互通信模式是其重要的組成部分。Spark 一開始使用 Akka 作為內部通信部件。在 Spark 1.3 年代,為了解決大塊數據(如 Shuffle)的傳輸問題,Spark 引入了 Netty 通信框架。到了 Spark 1.6,Spark 可以配置使用 Akka 或者 Netty 了,這意味着 Netty 可以完全替代 Akka了。再到 Spark 2,Spark 已經完全拋棄 Akka了,全部使用 Netty 了。
為什么呢?官方的解釋是:
•1)很多 Spark 用戶也使用 Akka,但是由於 Akka 不同版本之間無法互相通信,這就要求用戶必須使用跟 Spark 完全一樣的 Akka 版本,導致用戶無法升級 Akka。
•2)Spark 的 Akka 配置是針對 Spark 自身來調優的,可能跟用戶自己代碼中的 Akka 配置沖突。
•3)Spark 用的 Akka 特性很少,這部分特性很容易自己實現。同時,這部分代碼量相比 Akka 來說少很多,debug 比較容易。如果遇到什么 bug,也可以自己馬上 fix,不需要等 Akka 上游發布新版本。而且,Spark 升級 Akka 本身又因為第一點會強制要求用戶升級他們使用的 Akka,對於某些用戶來說是不現實的。
SPARK 的通信架構 - Actor 比較,如下圖所示:![]()
2.1 通信組件概覽
對源碼分析,對於設計思路理解如下:
![]()
•1)RpcEndpoint:RPC 端點,Spark 針對於每個節點(Client/Master/Worker)都稱之一個 Rpc 端點且都實現 RpcEndpoint 接口,內部根據不同端點的需求,設計不同的消息和不同的業務處理,如果需要發送(詢問)則調用 Dispatcher。
•2)RpcEnv:RPC 上下文環境,每個 Rpc 端點運行時依賴的上下文環境稱之為 RpcEnv。
•3)Dispatcher:消息分發器,針對於 RPC 端點需要發送消息或者從遠程 RPC 接收到的消息,分發至對應的指令收件箱/發件箱。如果指令接收方是自己存入收件箱,如果指令接收方為非自身端點,則放入發件箱。
•4)Inbox:指令消息收件箱,一個本地端點對應一個收件箱,Dispatcher 在每次向 Inbox 存入消息時,都將對應 EndpointData 加入內部待 Receiver Queue中,另外 Dispatcher 創建時會啟動一個單獨線程進行輪詢 Receiver Queue,進行收件箱消息消費。
•5)OutBox:指令消息發件箱,一個遠程端點對應一個發件箱,當消息放入 Outbox 后,緊接着將消息通過 TransportClient 發送出去。消息放入發件箱以及發送過程是在同一個線程中進行,這樣做的主要原因是遠程消息分為 RpcOutboxMessage,OneWayOutboxMessage 兩種消息,而針對於需要應答的消息直接發送且需要得到結果進行處理
•6)TransportClient:Netty 通信客戶端,根據 OutBox 消息的 receiver 信息,請求對應遠程 TransportServer。
•7)TransportServer:Netty 通信服務端,一個 RPC 端點一個 TransportServer,接受遠程消息后調用 Dispatcher 分發消息至對應收發件箱。
注意
:
TransportClient 與 TransportServer 通信虛線表示兩個 RpcEnv 之間的通信,圖示沒有單獨表達式。
一個 Outbox 一個 TransportClient,圖示沒有單獨表達式。
一個 RpcEnv 中存在兩個 RpcEndpoint,一個代表本身啟動的 RPC 端點,另外一個為 RpcEndpointVerifier。
Spark的通信架構 – 高層視圖
![]()
Spark 的通信架構 – 類圖
![]()
2.2 Endpoint 啟動過程
啟動的流程如下:
![]()
Endpoint 啟動后,默認會向 Inbox 中添加 OnStart 消息,不同的端點(Master/Worker/Client)消費 OnStart 指令時,進行相關端點的啟動額外處理。
Endpoint 啟動時,會默認啟動 TransportServer,且啟動結束后會進行一次同步測試 rpc 可用性(askSync-BoundPortsRequest)。
Dispatcher 作為一個分發器,內部存放了 Inbox,Outbox 的等相關句柄和存放了相關處理狀態數據,結構大致如下:
![]()
2.3 Endpoint Send&Ask 流程
Endpoint 的消息發送與請求流程,如下:
![]()
Endpoint 根據業務需要存入兩個維度的消息組合:send/ask 某個消息,receiver 是自身與非自身
•1)OneWayMessage:send + 自身,直接存入收件箱
•2)OneWayOutboxMessage:send + 非自身,存入發件箱並直接發送
•3)RpcMessage:ask + 自身,直接存入收件箱,另外還需要存入 LocalNettyRpcCallContext,需要回調后再返回
•4)RpcOutboxMessage:ask + 非自身,存入發件箱並直接發送,需要回調后再返回
2.4 Endpoint Receive 流程
Endpoint 的消息的接收,流程如下:
![]()
上圖 ServerBootstrap 為 Netty 啟動服務,SocketChanel為Netty 數據通道。
上述包含 TransportSever 啟動與消息接收兩個流程。
2.5 Endpoint Inbox 處理流程
Spark 在 Endpoint 的設計上核心設計即為 Inbox 與 Outbox,其中 Inbox 核心要點為:
•1)內部的處理流程拆分為多個消息指令(InboxMessage)存放入 Inbox。
•2)當 Dispatcher 啟動最后,會啟動一個名為【dispatcher-event-loop】的線程掃描 Inbox 待處理 InboxMessage,並調用 Endpoint 根據 InboxMessage 類型做相應處理
•3)當 Dispatcher 啟動最后,默認會向 Inbox 存入 OnStart 類型的 InboxMessage,Endpoint 在根據 OnStart 指令做相關的額外啟動工作,三端啟動后所有的工作都是對 OnStart 指令處理衍生出來的,因此可以說 OnStart 指令是相互通信的源頭。![]()
消息指令類型大致如下三類:
•1)OnStart/OnStop
•2)RpcMessage/OneWayMessage
•3)RemoteProcessDisconnected/RemoteProcessConnected/RemoteProcessConnectionError
2.6 Endpoint 畫像

第3章 腳本解析
在看源碼之前,我們一般會看相關腳本了解其初始化信息以及 Bootstrap 類,Spark 也不例外,而 Spark 中相關的腳本如下:
%SPARK_HOME%/sbin/start-master.sh
%SPARK_HOME%/sbin/start-slaves.sh
%SPARK_HOME%/sbin/start-all.sh
%SPARK_HOME%/bin/spark-submit
啟動腳本中對於公共處理部分進行抽取為獨立的腳本,如下:
腳本 | 說明 |
---|---|
sbin/spark-config.sh | 初始化環境變量 SPARK_CONF_DIR, PYTHONPATH |
bin/load-spark-env.sh | 初始化環境變量 SPARK_SCALA_VERSION,調用 %SPARK_HOME% |
conf/spark-env.sh | 加載用戶自定義環境變量 |
3.1 start-daemon.sh
主要完成進程相關基本信息初始化,然后調用 bin/spark-class 進行守護進程啟動,該腳本是創建端點的通用腳本,三端各自腳本都會調用 spark-daemon.sh 腳本啟動各自進程

詳解如下:
1)初始化 SPRK_HOME、SPARK_CONF_DIR、SPARK_IDENT_STRING、SPARK_LOG_DIR 環境變量 (如果不存在)
2)初始化日志並測試日志文件夾讀寫權限,初始化 PID 目錄並校驗 PID 信息
3)調用 /bin/spark-class 腳本,/bin/spark-class 見下面
3.2 spark-class
Master 調用舉例:
bin/spark-class \
--class org.apache.spark.deploy.master.Master \
--host $SPARK_MASTER_HOST \
--port $SPARK_MASTER_PORT \
--webui-port $SPARK_MASTER_WEBUI_PORT $ORIGINAL_ARGS
1)初始化 RUNNER(java)、SPARK_JARS_DIR (%SPARK_HOME%/jars)、LAUNCH_CLASSPATH 信息
2)調用 ("$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@") 獲取最終執行的 shell 語句
3)執行最終的 shell 語句,示例如下:
/opt/module/jdk1.8.0_144 \
-cp /opt/module/spark-2.1.1-bin-hadoop2.7/conf/:/opt/module/spark-2.1.1-bin-hadoop2.7/jars/*:/opt/module/hadoop-2.7.2/etc/hadoop/ \
-Xmx1g \
-XX:MaxPermSize=256m \
org.apache.spark.deploy.master.Master \
--host hadoop102 \
--port 7077 \
--webui-port 8080
如果是 Client,那么可能為 r,或者 python 腳本。
3.3 start-master.sh
啟動 Master 的腳本,流程如下:

詳解如下:
1)用戶執行 start-master.sh 腳本,初始化環境變量 SPARK_HOME (如果 PATH 不存在 SPARK_HOME,初始化腳本的上級目錄為 SPARK_HOME),調用 spark-config.sh,調用 load-spark-env.sh
2)如果環境變量 SPARK_MASTER_HOST、SPARK_MASTER_PORT、SPARK_MASTER_WEBUI_PORT 不存在,進行初始化 7077,hostname -f,8080
3)調用 spark-daemon.sh 腳本啟動 master 進程,如下:
spark-daemon.sh start org.apache.spark.deploy.master.Master 1 \
--host $SPARK_MASTER_HOST \
--port $SPARK_MASTER_PORT \
--webui-port $SPARK_MASTER_WEBUI_PORT $ORIGINAL_ARGS)
3.4 start-slaves.sh
啟動 Worker 的腳本,流程如下:

詳解如下:
1)用戶執行 start-slaves.sh 腳本,初始化環境變量 SPARK_HOME,調用 spark-config.sh,調用 load-spark-env.sh,初始化 Master host/port 信息
2)調用 slaves.sh 腳本,讀取 conf/slaves 文件並遍歷,通過 ssh 連接到對應 slave 節點,啟動 ${SPARK_HOME}/sbin/start-slave.sh spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT
3)start-slave.sh 在各個節點中,初始化環境變量 SPARK_HOME,調用 spark-config.sh,調用 load-spark-env.sh,根據 $SPARK_WORKER_INSTANCES 計算 WEBUI_PORT 端口 (worker 端口號依次遞增) 並啟動 Worker 進程,如下:
${SPARK_HOME}/sbin/spark-daemon.sh \
start org.apache.spark.deploy.worker.Worker $WORKER_NUM \
--webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"
3.5 start-all.sh
屬於快捷腳本,內部調用 start-master.sh 與 start-slaves.sh 腳本,並無額外工作。
3.6 spark-submit
任務提交的基本腳本,流程如下:

詳解如下:
1)直接調用 spark-class 腳本進行進程創建,示例如下:
./spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop102:7077 \
../examples/jars/spark-examples_2.11-2.1.0.jar 10
2)如果是 java/scala 任務,那么最終調用 SparkSubmit.scala 進行任務處理,示例如下:
/opt/module/jdk1.8.0_144 -cp \
/opt/module/spark-2.1.1-bin-hadoop2.7/conf/:/opt/module/spark-2.1.1-bin-hadoop2.7/jars/*:/opt/module/hadoop-2.7.2/etc/hadoop/ \
-Xmx1g -XX:MaxPermSize=256m \
org.apache.spark.deploy.SparkSubmit \
--master spark://hadoop102:7077 \
--class org.apache.spark.examples.SparkPi \
../examples/jars/spark-examples_2.11-2.1.0.jar 10
第4章 Master 節點啟動
Master 作為 Endpoint 的具體實例,下面我們介紹一下 Master 啟動以及 OnStart 指令后的相關工作。
4.1 腳本概覽
下面是一個舉例:
/opt/module/jdk1.8.0_144 \
-cp /opt/module/spark-2.1.1-bin-hadoop2.7/conf/:/opt/module/spark-2.1.1-bin-hadoop2.7/jars/*:/opt/module/hadoop-2.7.2/etc/hadoop/ \
-Xmx1g \
-XX:MaxPermSize=256m \
org.apache.spark.deploy.master.Master \
--host hadoop102 \
--port 7077 \
4.2 啟動流程
Master 的啟動流程如下:

詳解如下:
1)SparkConf:加載 key 以 spark. 開頭的系統屬性 (Utils.getSystemProperties)。
2)MasterArguments:
a) 解析 Master 啟動的參數:
--ip -i --host -h --port -p --webui-port --properties-file
b)將 --properties-file (沒有配置默認為 conf/spark-defaults.conf) 中以 spark. 開頭的配置存入 SparkConf。
3)NettyRpcEnv 中的內部處理遵循 RpcEndpoint 統一處理,這里不再贅述。
4)BoundPortsResponse 返回 rpcEndpointPort、webUIPort、restPort 真實端口。
5)最終守護進程會一直存在等待結束信 awaitTermination。
4.3 OnStart 監聽事件
Master 的啟動完成后異步執行工作如下:

詳解如下:
1)【dispatcher-event-loop】線程掃描到 OnStart 指令后會啟動相關 MasterWebUI (默認端口 8080),根據配置選擇安裝 ResetServer (默認端口 6066)。
2)另外新起【master-forward-message-thread】線程定期檢查 Worker 心跳是否超時。
3)如果 Worker 心跳檢測超時,那么對 Worker 下的發布的所有任務所屬 Driver 進行 ExecutorUpdated 發送,同時自己再重新 LaunchDriver。
4.4 RpcMessage 處理 (receiveAndReply)

4.5 OneWayMessage 處理 (receive)

4.6 Master 對 RpcMessage/OneWayMessage 處理邏輯
這部分對整體 Master 理解作用不是很大且理解比較抽象,可以先讀后續內容,回頭再考慮看這部分內容,或者不讀。

第5章 Worker 節點啟動
Worker 作為 Endpoint 的具體實例,下面我們介紹一下 Worker 啟動以及 OnStart 指令后的額外工作。
5.1 腳本概覽
下面是一個舉例:
/opt/module/jdk1.8.0_144 \
-cp /opt/module/spark-2.1.1-bin-hadoop2.7/conf/:/opt/module/spark-2.1.1-bin-hadoop2.7/jars/*:/opt/module/hadoop-2.7.2/etc/hadoop/ \
-Xmx1g \
-XX:MaxPermSize=256m \
org.apache.spark.deploy.worker.Worker \
--webui-port 8081
spark://hadoop102:7077
5.2 啟動流程
Worker 的啟動流程如下:

詳解如下:
1)SparkConf:加載 key 以 spark. 開頭的系統屬性 (Utils.getSystemProperties)。
2)WorkerArguments:
a) 解析 Master 啟動的參數:
--ip -i --host -h --port -p --cores -c --memory -m --work-dir --webui-port --properties-file
b) 將 --properties-file (沒有配置默認為conf/spark-defaults.conf) 中以 spark. 開頭的配置存入 SparkConf。
c) 在沒有配置情況下,cores 默認為服務器 CPU 核數。
d) 在沒有配置情況下,memory 默認為服務器內存減 1G,如果低於 1G 取 1G。
e) webUiPort 默認為 8081。
3)NettyRpcEnv 中的內部處理遵循 RpcEndpoint 統一處理,這里不再贅述。
4)最終守護進程會一直存在等待結束信 awaitTermination。
5.3 OnStart 監聽事件
Worker 的啟動完成后異步執行工作如下:

詳解如下:
1)【dispatcher-event-loop】線程掃描到 OnStart 指令后會啟動相關 WorkerWebUI (默認端口 8081)。
2)Worker 向 Master 發起一次 RegisterWorker 指令。
3)另起【master-forward-message-thread】線程定期執行 ReregisterWithMaster 任務,如果注冊成功 (RegisteredWorker) 則跳過,否則再次向 Master 發起 RegisterWorker 指令,直到超過最大次數報錯 (默認16次)。
4)Master 如果可以注冊,則維護對應的 WorkerInfo 對象並持久化,完成后向 Worker 發起一條 RegisteredWorker 指令,如果 Master 為 standby 狀態,則向 Worker 發起一條 MasterInStandby 指令。
5)Worker 接受 RegisteredWorker 后,提交【master-forward-message-thread】線程定期執行 SendHeartbeat 任務,完成后向 Worker 發起一條 WorkerLatestState 指令。
6)Worker 發心跳檢測,會觸發更新 Master 對應 WorkerInfo 對象,如果 Master 檢測到異常,則發起 ReconnectWorker 指令至 Worker,Worker 則再次執行 ReregisterWithMaster 工作。
5.4 RpcMessage 處理 (receiveAndReply)

5.5 OneWayMessage 處理 (receive)

第6章 Client 啟動流程
Client 作為 Endpoint 的具體實例,下面我們介紹一下 Client 啟動以及 OnStart 指令后的額外工作。
6.1 腳本概覽
下面是一個舉例:
/opt/module/jdk1.8.0_144 \
-cp /opt/module/spark-2.1.1-bin-hadoop2.7/conf/:/opt/module/spark-2.1.1-bin-hadoop2.7/jars/*:/opt/module/hadoop-2.7.2/etc/hadoop/ \
-Xmx1g
-XX:MaxPermSize=256m
org.apache.spark.deploy.SparkSubmit
--master spark://hadoop102:7077
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.1.0.jar 10
6.2 SparkSubmit 啟動流程
SparkSubmit 的啟動流程如下:

詳解如下:
1)SparkSubmitArguments:
a) 解析 Client 啟動的參數
--name --master --class --deploy-mode
--num-executors --executor-cores --total-executor-cores --executor-memory
--driver-memory --driver-cores --driver-class-path --driver-java-options --driver-library-path
--properties-file
--kill --status --supervise --queue
--files --py-files
--archives --jars --packages --exclude-packages --repositories
--conf (解析存入 Map:sparkProperties 中)
--proxy-user --principal --keytab --help --verbose --version --usage-error
b) 合並 --properties-file (沒有配置默認為 conf/spark-defaults.conf) 文件配置項 (不在 --conf 中的配置 ) 至 sparkProperties
c) 刪除 sparkProperties 中不以 spark. 開頭的配置項目
d) 啟動參數為空的配置項從 sparkProperties 中合並
e) 根據 action (SUBMIT、KILL、REQUEST_STATUS) 校驗各自必需參數是否有值
2)Case Submit:
a) 獲取childMainClass
[--deploy-mode] = clent(默認):用戶任務啟動類 mainClass (--class)
[--deploy-mode] = cluster & [--master] = spark:* & useRest org.apache.spark.deploy.rest.RestSubmissionClient
[--deploy-mode] = cluster & [--master] = spark:* & !useRest org.apache.spark.deploy.Client
[--deploy-mode] = cluster & [--master] = yarn org.apache.spark.deploy.yarn.Client
[--deploy-mode] = cluster & [--master] = mesos:* org.apache.spark.deploy.rest.RestSubmissionClient
b) 獲取 childArgs (子運行時對應命令行組裝參數)
[--deploy-mode] = cluster & [--master] = spark:* & useRest 包含 primaryResource 與 mainClass
[--deploy-mode] = cluster & [--master] = spark:* & !useRest 包含 --supervise --memory --cores launch childArg, primaryResource, mainClass
[--deploy-mode] = cluster & [--master] = yarn --class --arg --jar/--primary-py-file/--primary-r-file
[--deploy-mode] = cluster & [--master] = mesos:* primaryResource
c) 獲取 childClasspath
[--deploy-mode] = clent 讀取 --jars 配置,與 primaryResource 信息 (../examples/jars/spark-examples_2.11-2.1.0.jar)
d) 獲取 sysProps
將 sparkPropertie 中的所有配置封裝成新的 sysProps 對象,另外還增加了一下額外的配置項目
e) 將 childClasspath 通過當前的類加載器加載中
f) 將 sysProps 設置到當前 jvm 環境中
g) 最終反射執行 childMainClass,傳參為 childArgs
6.3 Client 啟動流程
Client 的啟動流程如下:

詳解如下:
1)SparkConf:加載 key 以 spark. 開頭的系統屬性 (Utils.getSystemProperties)。
2)ClientArguments:
a) 解析 Client 啟動的參數:
--cores -c --memory -m --supervise -s --verbose -v
launch jarUrl master mainClass
kill master driverId
b) 將 --properties-file (沒有配置默認為 conf/spark-defaults.conf) 中以 spark. 開頭的配置存入 SparkConf。
c) 在沒有配置情況下,cores 默認為 1 核。
d) 在沒有配置情況下,memory 默認為 1G。
e) NettyRpcEnv 中的內部處理遵循 RpcEndpoint 統一處理,這里不再贅述。
3)最終守護進程會一直存在等待結束信 awaitTermination。
6.4 Client 的 OnStart 監聽事件
Client 的啟動完成后異步執行工作如下:

詳解如下:
1)如果是發布任務(case launch),Client 創建一個 DriverDescription,並向 Master 發起 RequestSubmitDriver 請求。
a) Command 中的 mainClass 為: org.apache.spark.deploy.worker.DriverWrapper
b) Command 中的 arguments 為: Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass)
2)Master 接受 RequestSubmitDriver 請求后,將 DriverDescription 封裝為 一個DriverInfo。
a) startTime 與 submitDate 都為當前時間
b) driverId 格式為:driver-yyyyMMddHHmmss-nextId,nextId 是全局唯一的
3)Master 持久化 DriverInfo,並加入待調度列表中 (waitingDrivers),觸發公共資源調度邏輯。
4)Master 公共資源調度結束后,返回 SubmitDriverResponse給Client。
6.5 RpcMessage 處理 (receiveAndReply)
無。
6.6 OneWayMessage 處理(receive)

第7章 Driver 和 DriverRunner
Client 向 Master 發起 RequestSubmitDriver 請求,Master 將 DriverInfo 添加待調度列表中 (waitingDrivers),下面針對於 Driver 進一步梳理。
7.1 Master 對 Driver 資源分配
大致流程如下:

詳解如下:
waitingDrivers 與 aliveWorkers 進行資源匹配:
1)在 waitingDrivers 循環內,輪詢所有 aliveWorker。
2)如果 aliveWorker 滿足當前 waitingDriver 資源要求,給 Worker 發送 LaunchDriver 指令並將 waitingDriver 移除 waitingDrivers,則進行下一次 waitingDriver 的輪詢工作。
3)如果輪詢完所有 aliveWorker 都不滿足 waitingDriver 資源要求,則進行下一次 waitingDriver 的輪詢工作。
4)所有發起的輪詢開始點都上次輪詢結束點的下一個點位開始。
7.2 Worker 運行 DriverRunner
Driver 的啟動,流程如下:

詳解如下:
1)當 Worker 遇到 LaunchDriver 指令時,創建並啟動一個 DriverRunner。
2)DriverRunner 啟動一個線程 DriverRunner for [driverId] 處理 Driver 啟動工作。
3)DriverRunner for [driverId]:
a) 添加 JVM 鈎子,針對於每個 diriverId 創建一個臨時目錄。
b) 將 DriverDesc.jarUrl 通過 Netty 從 Driver 機器遠程拷貝過來。
c) 根據 DriverDesc.command 模板構建本地執行的 command 命令,並啟動該 command 對應的 Process 進程。
d) 將 Process 的輸出流輸出到文件 stdout/stderror,如果 Process 啟動失敗,進行 1-5 的秒的反復啟動工作,直到啟動成功,在釋放 Worker 節點的 DriverRunner 的資源。
7.3 DriverRunner 創建並運行 DriverWrapper
DriverWrapper 的運行,流程如下:

詳解如下:
1)DriverWapper 創建了一個 RpcEndpoint 與 RpcEnv。
2)RpcEndpoint 為 WorkerWatcher,主要目的為監控 Worker 節點是否正常,如果出現異常就直接退出。
3)然后當前的 ClassLoader 加載 userJar,同時執行 userMainClass。
4)執行用戶的 main 方法后關閉 workerWatcher。
第8章 SparkContext 解析
8.1 SparkContext 解析
SparkContext 是用戶通往 Spark 集群的唯一入口,任何需要使用 Spark 的地方都需要先創建 SparkContext,那么 SparkContext 做了什么?
首先 SparkContext 是在 Driver 程序里面啟動的,可以看做 Driver 程序和 Spark 集群的一個連接,SparkContext 在初始化的時候,創建了很多對象,如下圖所示:

上圖列出了 SparkContext 在初始化創建的時候的一些主要組件的構建。
8.2 SparkContext 創建過程

詳解如下:
SparkContext 在新建時:
1)內部創建一個 SparkEnv,SparkEnv 內部創建一個 RpcEnv。
a) RpcEnv 內部創建並注冊一個 MapOutputTrackerMasterEndpoint(該 Endpoint 暫不介紹)
2)接着創建 DAGScheduler、TaskSchedulerImpl、SchedulerBackend。
a) TaskSchedulerImpl 創建時創建 SchedulableBuilder,SchedulableBuilder 根據類型分為 FIFOSchedulableBuilder、FairSchedulableBuilder 兩類
3)最后啟動 TaskSchedulerImpl,TaskSchedulerImpl 啟動 SchedulerBackend。
a) SchedulerBackend 啟動時創建 ApplicationDescription、DriverEndpoint、StandloneAppClient
b) StandloneAppClient 內部包括一個 ClientEndpoint
8.3 SparkContext 簡易結構與交互關系

詳解如下:
1)SparkContext:是用戶 Spark 執行任務的上下文,用戶程序內部使用 Spark 提供的 Api 直接或間接創建一個 SparkContext。
2)SparkEnv:用戶執行的環境信息,包括通信相關的端點。
3)RpcEnv:SparkContext 中遠程通信環境。
4)ApplicationDescription:應用程序描述信息,主要包含 appName、maxCores、memoryPerExecutorMB、coresPerExecutor、Command (CoarseGrainedExecutorBackend)、appUiUrl 等。
5)ClientEndpoint:客戶端端點,啟動后向 Master 發起注冊 RegisterApplication 請求。
6)Master:接受 RegisterApplication 請求后,進行 Worker 資源分配,並向分配的資源發起 LaunchExecutor 指令。
7)Worker:接受 LaunchExecutor 指令后,運行 ExecutorRunner。
8)ExecutorRunner:運行 applicationDescription 的 Command 命令,最終 Executor,同時向 DriverEndpoint 注冊 Executor 信息。
8.4 Master 對 Application 資源分配
當 Master 接受 Driver 的 RegisterApplication 請求后,放入 waitingDrivers 隊列中,在同一調度中進行資源分配,分配過程如下:

詳解如下:
waitingApps 與 aliveWorkers 進行資源匹配:
1)如果 waitingApp 配置了 app.desc.coresPerExecutor:
a) 輪詢所有有效可分配的 worker,每次分配一個 executor,executor 的核數為 minCoresPerExecutor(app.desc.coresPerExecutor),直到不存在有效可分配資源或者 app 依賴的資源已全部被分配。
2)如果 waitingApp 沒有配置 app.desc.coresPerExecutor:
a) 輪詢所有有效可分配的 worker,每個 worker 分配一個 executor,executor 的核數為從 minCoresPerExecutor(為固定值1) 開始遞增,直到不存在有效可分配資源或者 app 依賴的資源已全部被分配。
3)其中有效可分配 worker 定義為滿足一次資源分配的 worker:
a) cores 滿足:usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
b) memory 滿足(如果是新的 Executor):usableWorkers(pos).memoryFree - assignedExecutors(pos) * memoryPerExecutor >= memoryPerExecutor
注意:Master 針對於 applicationInfo 進行資源分配時,只有存在有效可用的資源就直接分配,而分配剩余的 app.coresLeft 則等下一次再進行分配。
8.5 Worker 創建 Executor

(圖解:橙色組件是 Endpoint 組件)
詳解如下:
Worker 啟動 Executor
1)在 Worker 的 tempDir 下面創建 application 以及 executor 的目錄,並 chmod 700 操作權限。
2)創建並啟動 ExecutorRunner 進行 Executor 的創建。
3)向 Master 發送 Executor 的狀態情況。
ExecutorRnner
1)新線程【ExecutorRunner for [executorId]】讀取 ApplicationDescription 將其中 Command 轉化為本地的 Command 命令。
2)調用 Command 並將日志輸出至 executor 目錄下的 stdout 和 stderr 日志文件中,Command 對應的 java 類為 CoarseGrainedExecutorBackend。
CoarseGrainedExecutorBackend
1)創建一個 SparkEnv,創建 ExecutorEndpoint(CoarseGrainedExecutorBackend)以及 WorkerWatcher。
2)ExecutorEndpoint 創建並啟動后,向 DriverEndpoint 發送 RegisterExecutor 請求並等待返回。
3)DriverEndpoint 處理 RegisterExecutor 請求,返回 ExecutorEndpointRegister 的結果。
4)如果注冊成功,ExecutorEndpoint 內部再創建 Executor 的處理對象。
至此,Spark 運行任務的容器框架就搭建完成。
第9章 Job 提交和 Task 的拆分
在前面的章節 Client 的加載中,Spark 的 DriverRunner 已開始執行用戶任務類(比如:org.apache.spark.examples.SparkPi),下面我們開始針對於用戶任務類(或者任務代碼)進行分析:
9.1 整體預覽

詳解如下:
1)Code:指的用戶編寫的代碼
2)RDD:彈性分布式數據集,用戶編碼根據 SparkContext 與 RDD 的 api 能夠很好的將 Code 轉化為 RDD 數據結構(下文將做轉化細節介紹)。
3)DAGScheduler:有向無環圖調度器,將 RDD 封裝為 JobSubmitted 對象存入 EventLoop (實現類DAGSchedulerEventProcessLoop) 隊列中。
4)EventLoop: 定時掃描未處理 JobSubmitted 對象,將 JobSubmitted 對象提交給 DAGScheduler。
5)DAGScheduler:針對於 JobSubmitted 進行處理,最終將 RDD 轉化為執行 TaskSet,並將 TaskSet 提交至 TaskScheduler。
6)TaskScheduler: 根據 TaskSet 創建 TaskSetManager 對象存入 SchedulableBuilder 的數據池(Pool)中,並調用 DriverEndpoint 喚起消費(ReviveOffers)操作。
7)DriverEndpoint:接受 ReviveOffers 指令后將 TaskSet 中的 Tasks 根據相關規則均勻分配給Executor。
8)Executor:啟動一個 TaskRunner 執行一個 Task。
9.2 Code 轉化為初始 RDDs
我們的用戶代碼通過調用 Spark 的 Api(比如:SparkSession.builder.appName("Spark Pi").getOrCreate()),該 Api 會創建 Spark 的上下文(SparkContext),當我們調用 transform 類方法(如:parallelize(),map())都會創建(或者裝飾已有的)Spark 數據結構(RDD),如果是 action 類操作(如:reduce()),那么將最后封裝的 RDD 作為一次 Job 提交,存入待調度隊列中(DAGSchedulerEventProcessLoop )待后續異步處理。
如果多次調用 action 類操作,那么封裝的多個 RDD 作為多個 Job 提交。
流程如下:

詳解如下:
ExecuteEnv(執行環境)
1)這里可以是通過 spark-submit 提交的 MainClass,也可以是 spark-shell 腳本。
2)MainClass:代碼中必定會創建或者獲取一個 SparkContext。
3)spark-shell:默認會創建一個 SparkContext。
RDD(彈性分布式數據集)
1)create:可以直接創建(如:sc.parallelize(1 until n, slices) ),也可以在其他地方讀取(如:sc.textFile("README.md"))等。
2)transformation:rdd 提供了一組 api 可以進行對已有 RDD 進行反復封裝成為新的 RDD,這里采用的是`裝飾者設計模式`,下面為部分裝飾器類圖。
3)action:當調用 RDD 的 action 類操作方法時(collect、reduce、lookup、save ),這觸發 DAGScheduler 的 Job 提交。
4)DAGScheduler:創建一個名為 JobSubmitted 的消息至 DAGSchedulerEventProcessLoop 阻塞消息隊列(LinkedBlockingDeque)中。
5)DAGSchedulerEventProcessLoop:啟動名為【dag-scheduler-event-loop】的線程實時消費消息隊列。
6)【dag-scheduler-event-loop】處理完成后回調 JobWaiter。
7)DAGScheduler:打印 Job 執行結果。
8)JobSubmitted:相關代碼如下(其中 jobId 為 DAGScheduler 全局遞增 Id)。
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
部分裝飾器類圖

最終示例:

最終轉化的 RDD 分為四層,每層都依賴於上層 RDD,將 ShffleRDD 封裝為一個 Job 存入 DAGSchedulerEventProcessLoop 待處理,如果我們的代碼中存在幾段上面示例代碼,那么就會創建對應對的幾個 ShffleRDD 分別存入 DAGSchedulerEventProcessLoop 中。
9.3 RDD 分解為待執行任務集合(TaskSet)
Job 提交后,DAGScheduler 根據 RDD 層次關系解析為對應的 Stages,同時維護 Job 與 Stage 的關系。
將最上層的 Stage 根據並發關系(findMissingPartitions)分解為多個 Task,將這個多個 Task 封裝為 TaskSet 提交給 TaskScheduler。非最上層的 Stage 的存入處理的列表中(waitingStages += stage)
流程如下:

詳解如下:
1)DAGSchedulerEventProcessLoop中,線程【dag-scheduler-event-loop】處理到 JobSubmitted
2)調用 DAGScheduler 進行 handleJobSubmitted
a) 首先根據 RDD 依賴關系依次創建 Stage 族,Stage 分為 ShuffleMapStage、ResultStage 兩類,如下圖所示:
b) 更新 jobId 與 StageId 關系 Map
c) 創建 ActiveJob,調用 LiveListenerBug,發送 SparkListenerJobStart 指令
d) 找到最上層 Stage 進行提交,下層 Stage 存入 waitingStage 中待后續處理
1) 調用 OutputCommitCoordinator 進行 stageStart() 處理
2) 調用 LiveListenerBug,發送 SparkListenerStageSubmitted 指令
3) 調用 SparkContext的broadcast 方法獲取 Broadcast 對象,根據 Stage 類型創建對應多個 Task,一個 Stage 根據 findMissingPartitions 分為多個對應的 Task,Task 分為 ShuffleMapTask、ResultTask
4) 將 Task 封裝為 TaskSet,調用 TaskScheduler.submitTasks(taskSet) 進行 Task 調度,關鍵代碼如下:
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
ShuffleMapStage、ResultStage 兩類

9.4 TaskSet 封裝為 TaskSetManager 並提交至 Driver
TaskScheduler 將 TaskSet 封裝為 TaskSetManager(new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)),存入待處理任務池(Pool)中,發送 DriverEndpoint 喚起消費(ReviveOffers)指令。

詳解如下:
1)DAGSheduler 將 TaskSet 提交給 TaskScheduler 的實現類,這里是 TaskChedulerImpl。
2)TaskSchedulerImpl 創建一個 TaskSetManager 管理 TaskSet,關鍵代碼如下:
new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
3)同時將 TaskSetManager 添加 SchedduableBuilder 的任務池 Poll 中。
4)調用 SchedulerBackend 的實現類進行 reviveOffers,這里是 standlone 模式的實現類 StandaloneSchedulerBackend。
5)SchedulerBackend 發送 ReviveOffers 指令至 DriverEndpoint。
9.5 Driver 將 TaskSetManager 分解為 TaskDescriptions 並發布任務到 Executor
Driver 接受喚起消費指令后,將所有待處理的 TaskSetManager 與 Driver 中注冊的 Executor 資源進行匹配,最終一個 TaskSetManager 得到多個 TaskDescription 對象,按照 TaskDescription 相對應的 Executor 發送 LaunchTask 指令。

詳解如下:
當 Driver 獲取到 ReviveOffers(請求消費)指令時
1)首先根據 executorDataMap 緩存信息得到可用的 Executor 資源信息(WorkerOffer),關鍵代碼如下:
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toIndexedSeq
2)接着調用 TaskScheduler 進行資源匹配,方法定義如下:
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {..}
a) 將 WorkerOffer 資源打亂,如:val shuffledOffers = Random.shuffle(offers)
b) 將 Pool 中待處理的 TaskSetManager 取出,如:val sortedTaskSets = rootPool.getSortedTaskSetQueue
c) 並循環處理 sortedTaskSets 並與 shuffledOffers 循環匹配,如果 shuffledOffers(i) 有足夠的 CPU 資源( if (availableCpus(i) >= CPUS_PER_TASK)),調用 TaskSetManager 創建 TaskDescription 對象(taskSet.resourceOffer(execId, host, maxLocality)),最終創建了多個 TaskDescription,TaskDescription 定義如下:
new TaskDescription(
taskId,
attemptNum,
execId,
taskName,
index,
sched.sc.addedFiles,
sched.sc.addedJars,
task.localProperties,
serializedTask)
3)如果 TaskDescriptions 不為空,循環 TaskDescriptions,序列化 TaskDescription 對象,並向 ExecutorEndpoint 發送 LaunchTask 指令,關鍵代碼如下:
for (task <- taskDescriptions.flatten) {
val serializedTask = TaskDescription.encode(task)
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
第10章 Task 執行和回饋
DriverEndpoint 最終生成多個可執行的 TaskDescription 對象,並向各個 ExecutorEndpoint 發送 LaunchTask 指令,本節內容將關注 ExecutorEndpoint 如何處理 LaunchTask 指令,處理完成后如何回饋給 DriverEndpoint,以及整個 job 最終如何多次調度直至結束。
10.1 Task 的執行流程
Executor 接受 LaunchTask 指令后,開啟一個新線程 TaskRunner 解析 RDD,並調用 RDD 的 compute 方法,歸並函數得到最終任務執行結果。

詳解如下:
1)ExecutorEndpoint 接受到 LaunchTask 指令后,解碼出 TaskDescription,調用 Executor 的 launchTask 方法。
2)Executor 創建一個 TaskRunner 線程,並啟動線程,同時將改線程添加到 Executor 的成員對象中,代碼如下:
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
runningTasks.put(taskDescription.taskId, taskRunner)
TaskRunner
1)首先向 DriverEndpoint 發送任務最新狀態為 RUNNING。
2)從 TaskDescription 解析出 Task,並調用 Task 的 run 方法。
Task
1)創建 TaskContext 以及 CallerContext (與 HDFS 交互的上下文對象)。
2)執行 Task 的 runTask 方法:
a) 如果 Task 實例為 ShuffleMapTask:解析出 RDD 以及 ShuffleDependency 信息,調用 RDD 的 compute() 方法將結果寫 Writer 中(Writer 這里不介紹,可以作為黑盒理解,比如寫入一個文件中),返回 MapStatus 對象。
b) 如果 Task 實例為 ResultTask:解析出 RDD 以及合並函數信息,調用函數將調用后的結果返回。
TaskRunner 將 Task 執行的結果序列化,再次向 DriverEndpoint 發送任務最新狀態為 FINISHED。
10.2 Task 的回饋流程
TaskRunner 執行結束后,都將執行狀態發送至 DriverEndpoint,DriverEndpoint 最終反饋指令 CompletionEvent 發送至 DAGSchedulerEventProcessLoop 中。

詳解如下:
1)DriverEndpoint 接收到 StatusUpdate 消息后,調用 TaskScheduler 的 statusUpdate(taskId, state, result) 方法
2)TaskScheduler 如果任務結果是完成,那么清除該任務處理中的狀態,並調動 TaskResultGetter 相關方法,關鍵代碼如下:
val taskSet = taskIdToTaskSetManager.get(tid)
taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid).foreach { executorId =>
executorIdToRunningTaskIds.get(executorId).foreach { _.remove(tid) }
}
taskSet.removeRunningTask(tid)
if (state == TaskState.FINISHED) {
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
TaskResultGetter 啟動線程啟動線程【task-result-getter】進行相關處理:
1)通過解析或者遠程獲取得到 Task 的 TaskResult 對象。
2)調用 TaskSet 的 handleSuccessfulTask 方法,TaskSet 的 handleSuccessfulTask 方法直接調用 TaskSetManager 的 handleSuccessfulTask 方法。
TaskSetManager
1)更新內部 TaskInfo 對象狀態,並將該 Task 從運行中 Task 的集合刪除,代碼如下:
val info = taskInfos(tid)
info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
removeRunningTask(tid)
2)調用 DAGScheduler 的 taskEnded 方法,關鍵代碼如下:
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
DAGScheduler 向 DAGSchedulerEventProcessLoop 存入 CompletionEvent 指令,CompletionEvent 對象定義如下:
private[scheduler] case class CompletionEvent(
task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: Seq[AccumulatorV2[_, _]],
taskInfo: TaskInfo) extends DAGSchedulerEvent
10.3 Task 的迭代流程
DAGSchedulerEventProcessLoop 中針對於 CompletionEvent 指令,調用 DAGScheduler 進行處理,DAGScheduler 更新 Stage 與該 Task 的關系狀態,如果 Stage 下 Task 都返回,則做下一層 Stage 的任務拆解與運算工作,直至 Job 被執行完畢:

詳解如下:
1)DAGSchedulerEventProcessLoop 接收到 CompletionEvent 指令后,調用 DAGScheduler 的 handleTaskCompletion 方法。
2)DAGScheduler 根據 Task 的類型分別處理。
3)如果 Task 為 ShuffleMapTask
a) 等待回饋的 Partitions 減去當前 partitionId
b) 如果所有 task 都返回,則 markStageAsFinished(shuffleStage),同時向 MapOutputTrackerMaster 注冊 MapOutputs 信息,且 markMapStageJobAsFinished
c) 調用 submitWaitingChildStages(shuffleStage) 進行下層 Stages 的處理,從而迭代處理,最終處理到 ResultTask,job 結束,關鍵代碼如下:
private def submitWaitingChildStages(parent: Stage) {
...
val childStages = waitingStages.filter(_.parents.contains(parent)).toArray
waitingStages --= childStages
for (stage <- childStages.sortBy(_.firstJobId)) {
submitStage(stage)
}
}
4)如果 Task 為 ResultTask
a) 該 job 的 partitions 都已返回,則 markStageAsFinished(resultStage),並 cleanupStateForJobAndIndependentStages(job),關鍵代碼如下:
for (stage <- stageIdToStage.get(stageId)) {
if (runningStages.contains(stage)) {
logDebug("Removing running stage %d".format(stageId))
runningStages -= stage
}
for ((k, v) <- shuffleIdToMapStage.find(_._2 == stage)) {
shuffleIdToMapStage.remove(k)
}
if (waitingStages.contains(stage)) {
logDebug("Removing stage %d from waiting set.".format(stageId))
waitingStages -= stage
}
if (failedStages.contains(stage)) {
logDebug("Removing stage %d from failed set.".format(stageId))
failedStages -= stage
}
}
// data structures based on StageId
stageIdToStage -= stageId
jobIdToStageIds -= job.jobId
jobIdToActiveJob -= job.jobId
activeJobs -= job
至此,用戶編寫的代碼最終調用 Spark 分布式計算完畢。
10.4 精彩圖解
Spark的交互流程 – 節點啟動

Spark的交互流程 – 應用提交

Spark的交互流程 – 任務運行

Spark的交互流程 – 任務運行

第11章 Spark 的數據存儲
Spark 計算速度遠勝於 Hadoop 的原因之一就在於中間結果是緩存在內存而不是直接寫入到 disk,本文嘗試分析 Spark 中存儲子系統的構成,並以數據寫入和數據讀取為例,講述清楚存儲子系統中各部件的交互關系。
11.1 存儲子系統概覽
Storage 模塊主要分為兩層:
1) 通信層:storage 模塊采用的是 master-slave 結構來實現通信層,master 和 slave 之間傳輸控制信息、狀態信息,這些都是通過通信層來實現的。
2) 存儲層:storage 模塊需要把數據存儲到 disk 或是 memory 上面,有可能還需 replicate(復制) 到遠端,這都是由存儲層來實現和提供相應接口。
而其他模塊若要和 storage 模塊進行交互,storage 模塊提供了統一的操作類 BlockManager,外部類與 storage 模塊打交道都需要通過調用 BlockManager 相應接口來實現。

上圖是Spark存儲子系統中幾個主要模塊的關系示意圖,現簡要說明如下:
1)CacheManager RDD 在進行計算的時候,通過 CacheManager 來獲取數據,並通過 CacheManager 來存儲計算結果。
2)BlockManager CacheManager 在進行數據讀取和存取的時候主要是依賴 BlockManager 接口來操作,BlockManager 決定數據是從內存(MemoryStore) 還是從磁盤(DiskStore) 中獲取。
3)MemoryStore 負責將數據保存在內存或從內存讀取。
4)DiskStore 負責將數據寫入磁盤或從磁盤讀入。
5)BlockManagerWorker 數據寫入本地的 MemoryStore 或 DiskStore 是一個同步操作,為了容錯還需要將數據復制到別的計算結點,以防止數據丟失的時候還能夠恢復,數據復制的操作是異步完成,由 BlockManagerWorker 來處理這一部分事情。
6)ConnectionManager 負責與其它計算結點建立連接,並負責數據的發送和接收。
7)BlockManagerMaster 注意該模塊只運行在 Driver Application 所在的 Executor,功能是負責記錄下所有 BlockIds 存儲在哪個 SlaveWorker 上,比如 RDD Task 運行在機器 A,所需要的 BlockId 為 3,但在機器 A 上沒有 BlockId 為 3 的數值,這個時候 Slave worker 需要通過 BlockManager 向 BlockManagerMaster 詢問數據存儲的位置,然后再通過 ConnectionManager 去獲取。
11.2 啟動過程分析
上述的各個模塊由 SparkEnv 來創建,創建過程在 SparkEnv.create 中完成,代碼如下:
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf)), conf)
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf)
val connectionManager = blockManager.connectionManager
val broadcastManager = new BroadcastManager(isDriver, conf)
val cacheManager = new CacheManager(blockManager)
下面這段代碼容易讓人疑惑,看起來像是在所有的 cluster node 上都創建了 BlockManagerMasterActor,其實不然,仔細看 registerOrLookup 函數的實現。如果當前節點是 driver 則創建這個 actor,否則建立到 driver 的連接。代碼如下:
def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
if (isDriver) {
logInfo("Registering " + name)
actorSystem.actorOf(Props(newActor), name = name)
} else {
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
val timeout = AkkaUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}
}
初始化過程中一個主要的動作就是 BlockManager 需要向 BlockManagerMaster 發起注冊。
11.3 通信層

BlockManager 包裝了 BlockManagerMaster,發送信息包裝成 BlockManagerInfo。Spark 在 Driver 和 Worker 端都創建各自的 BlockManager,並通過 BlockManagerMaster 進行通信,通過 BlockManager 對 Storage 模塊進行操作。
BlockManager 對象在 SparkEnv.create 函數中進行創建,代碼如下:
def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef = {
if (isDriver) {
logInfo("Registering " + name)
rpcEnv.setupEndpoint(name, endpointCreator)
} else {
RpcUtils.makeDriverRef(name, conf, rpcEnv)
}
}
......
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
conf, isDriver)
// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,numUsableCores)
並且在創建之前對當前節點是否是 Driver 進行了判斷。如果是,則創建這個 Endpoint;否則,創建 Driver 的連接。
在創建 BlockManager 之后,BlockManager 會調用 initialize 方法初始化自己。並且初始化的時候,會調用 BlockManagerMaster 向 Driver 注冊自己,同時,在注冊時也啟動了Slave Endpoint。另外,向本地 shuffle 服務器注冊 Executor 配置,如果存在的話。代碼如下:
def initialize(appId: String): Unit = {
......
master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
// Register Executors' configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
registerWithExternalShuffleServer()
}
}
而 BlockManagerMaster 將注冊請求包裝成 RegisterBlockManager 注冊到 Driver。Driver 的 BlockManagerMasterEndpoint 會調用 register 方法,通過對消息 BlockManagerInfo 檢查,向 Driver 注冊,代碼如下:
private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) {
val time = System.currentTimeMillis()
if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
case Some(oldId) =>
// A block manager of the same executor already exists, so remove it (assumed dead)
logError("Got two different block manager registrations on same executor - "
+ s" will replace old one $oldId with new one $id")
removeExecutor(id.executorId)
case None =>
}
logInfo("Registering block manager %s with %s RAM, %s".format(
id.hostPort, Utils.bytesToString(maxMemSize), id))
blockManagerIdByExecutor(id.executorId) = id
blockManagerInfo(id) = new BlockManagerInfo(
id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
}
不難發現 BlockManagerInfo 對象被保存到 Map 映射中。在通信層中 BlockManagerMaster 控制着消息的流向,這里采用了模式匹配,所有的消息模式都在 BlockManagerMessage 中。
11.4 存儲層

Spark Storage 的最小存儲單位是 block,所有的操作都是以 block 為單位進行的。
在 BlockManager 被創建的時候 MemoryStore 和 DiskStore 對象就被創建出來了。代碼如下:
val diskBlockManager = new DiskBlockManager(this, conf)
private[spark] val memoryStore = new MemoryStore(this, maxMemory)
private[spark] val diskStore = new DiskStore(this, diskBlockManager)
11.4.1 Disk Store
由於當前的 Spark 版本對 Disk Store 進行了更細粒度的分工,把對文件的操作提取出來放到了 DiskBlockManager 中,DiskStore 僅僅負責數據的存儲和讀取。
Disk Store 會配置多個文件目錄,Spark 會在不同的文件目錄下創建文件夾,其中文件夾的命名方式是:spark-UUID(隨機UUID碼)。Disk Store 在存儲的時候創建文件夾。並且根據【高內聚,低耦合】原則,這種服務型的工具代碼就放到了 Utils 中(調用路徑:DiskStore.putBytes —> DiskBlockManager.createLocalDirs —> Utils.createDirectory),代碼如下:
def createDirectory(root: String, namePrefix: String = "spark"): File = {
var attempts = 0
val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
var dir: File = null
while (dir == null) {
attempts += 1
if (attempts > maxAttempts) {
throw new IOException("Failed to create a temp directory (under " + root + ") after " +
maxAttempts + " attempts!")
}
try {
dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString)
if (dir.exists() || !dir.mkdirs()) {
dir = null
}
} catch { case e: SecurityException => dir = null; }
}
dir.getCanonicalFile
}
在 DiskBlockManager 里,每個 block 都被存儲為一個 file,通過計算 blockId 的 hash 值,將 block 映射到文件中。
def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
// Create the subdirectory if it doesn't already exist
val subDir = subDirs(dirId).synchronized {
val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() && !newDir.mkdir()) {
throw new IOException(s"Failed to create local dir in $newDir.")
}
subDirs(dirId)(subDirId) = newDir
newDir
}
}
new File(subDir, filename)
}
def getFile(blockId: BlockId): File = getFile(blockId.name)
通過 hash 值的取模運算,求出 dirId 和 subDirId。然后,在從 subDirs 中找到 subDir,如果 subDir 不存在,則創建一個新 subDir。最后,以 subDir 為路徑,blockId 的 name 屬性為文件名,新建該文件。
文件創建完之后,那么 Spark 就會在 DiskStore 中向文件寫與之映射的 block,代碼如下:
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
val bytes = _bytes.duplicate()
logDebug(s"Attempting to put block $blockId")
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
val channel = new FileOutputStream(file).getChannel
Utils.tryWithSafeFinally {
while (bytes.remaining > 0) {
channel.write(bytes)
}
} {
channel.close()
}
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
PutResult(bytes.limit(), Right(bytes.duplicate()))
}
讀取過程就簡單了,DiskStore 根據 blockId 讀取與之映射的 file 內容,當然,這中間需要從 DiskBlockManager 中得到文件信息。代碼如下:
private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
val channel = new RandomAccessFile(file, "r").getChannel
Utils.tryWithSafeFinally {
// For small files, directly read rather than memory map
if (length < minMemoryMapBytes) {
val buf = ByteBuffer.allocate(length.toInt)
channel.position(offset)
while (buf.remaining() != 0) {
if (channel.read(buf) == -1) {
throw new IOException("Reached EOF before filling buffer\n" +
s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
}
}
buf.flip()
Some(buf)
} else {
Some(channel.map(MapMode.READ_ONLY, offset, length))
}
} {
channel.close()
}
}
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
val file = diskManager.getFile(blockId.name)
getBytes(file, 0, file.length)
}
11.4.2 Memory Store
相對 Disk Store,Memory Store 就顯得容易很多。Memory Store 用一個 LinkedHashMap 來管理,其中 Key 是 blockId,Value 是 MemoryEntry 樣例類,MemoryEntry 存儲着數據信息。代碼如下:
private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
在 MemoryStore 中存儲 block 的前提是當前內存有足夠的空間存放。通過對 tryToPut 函數的調用對內存空間進行判斷。代碼如下:
def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = {
// Work on a duplicate - since the original input might be used elsewhere.
lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
val putAttempt = tryToPut(blockId, () => bytes, size, deserialized = false)
val data =
if (putAttempt.success) {
assert(bytes.limit == size)
Right(bytes.duplicate())
} else {
null
}
PutResult(size, data, putAttempt.droppedBlocks)
}
在 tryToPut 函數中,通過調用 enoughFreeSpace 函數判斷內存空間。如果內存空間足夠,那么就把 block 放到 LinkedHashMap 中;如果內存不足,那么就告訴 BlockManager 內存不足,如果允許 Disk Store,那么就把該 block 放到 disk 上。代碼如下:
private def tryToPut(blockId: BlockId, value: () => Any, size: Long, deserialized: Boolean): ResultWithDroppedBlocks = {
var putSuccess = false
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
accountingLock.synchronized {
val freeSpaceResult = ensureFreeSpace(blockId, size)
val enoughFreeSpace = freeSpaceResult.success
droppedBlocks ++= freeSpaceResult.droppedBlocks
if (enoughFreeSpace) {
val entry = new MemoryEntry(value(), size, deserialized)
entries.synchronized {
entries.put(blockId, entry)
currentMemory += size
}
val valuesOrBytes = if (deserialized) "values" else "bytes"
logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
putSuccess = true
} else {
lazy val data = if (deserialized) {
Left(value().asInstanceOf[Array[Any]])
} else {
Right(value().asInstanceOf[ByteBuffer].duplicate())
}
val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
releasePendingUnrollMemoryForThisTask()
}
ResultWithDroppedBlocks(putSuccess, droppedBlocks)
}
Memory Store 讀取 block 也很簡單,只需要從 LinkedHashMap 中取出 blockId 的 Value 即可。代碼如下:
override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
val entry = entries.synchronized {
entries.get(blockId)
}
if (entry == null) {
None
} else if (entry.deserialized) {
Some(entry.value.asInstanceOf[Array[Any]].iterator)
} else {
val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
Some(blockManager.dataDeserialize(blockId, buffer))
}
}
11.5 數據寫入過程分析

數據寫入的簡要流程:
1)RDD.iterator 是與 storage 子系統交互的入口。
2)CacheManager.getOrCompute 調用 BlockManager 的 put 接口來寫入數據。
3)數據優先寫入到 MemoryStore 即內存,如果 MemoryStore 中的數據已滿則將最近使用次數不頻繁的數據寫入到磁盤。
4)通知 BlockManagerMaster 有新的數據寫入,在 BlockManagerMaster 中保存元數據。
5)將寫入的數據與其它 slave worker 進行同步,一般來說在本機寫入的數據,都會另先一台機器來進行數據的備份,即 replicanumber=1。
其實,我們在 put 和 get block 的時候並沒有那么復雜,前面的細節 BlockManager 都包裝好了,我們只需要調用 BlockManager 中的 put 和 get 函數即可。
代碼如下:
def putBytes(
blockId: BlockId,
bytes: ByteBuffer,
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(bytes != null, "Bytes is null")
doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
}
private def doPut(
blockId: BlockId,
data: BlockValues,
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None)
: Seq[(BlockId, BlockStatus)] = {
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
effectiveStorageLevel.foreach { level =>
require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
}
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val putBlockInfo = {
val tinfo = new BlockInfo(level, tellMaster)
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
if (oldBlockOpt.isDefined) {
if (oldBlockOpt.get.waitForReady()) {
logWarning(s"Block $blockId already exists on this machine; not re-adding it")
return updatedBlocks
}
oldBlockOpt.get
} else {
tinfo
}
}
val startTimeMs = System.currentTimeMillis
var valuesAfterPut: Iterator[Any] = null
var bytesAfterPut: ByteBuffer = null
var size = 0L
val putLevel = effectiveStorageLevel.getOrElse(level)
val replicationFuture = data match {
case b: ByteBufferValues if putLevel.replication > 1 =>
// Duplicate doesn't copy the bytes, but just creates a wrapper
val bufferView = b.buffer.duplicate()
Future {
replicate(blockId, bufferView, putLevel)
}(futureExecutionContext)
case _ => null
}
putBlockInfo.synchronized {
logTrace("Put for block %s took %s to get into synchronized block"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
var marked = false
try {
val (returnValues, blockStore: BlockStore) = {
if (putLevel.useMemory) {
(true, memoryStore)
} else if (putLevel.useOffHeap) {
(false, externalBlockStore)
} else if (putLevel.useDisk) {
(putLevel.replication > 1, diskStore)
} else {
assert(putLevel == StorageLevel.NONE)
throw new BlockException(
blockId, s"Attempted to put block $blockId without specifying storage level!")
}
}
val result = data match {
case IteratorValues(iterator) =>
blockStore.putIterator(blockId, iterator, putLevel, returnValues)
case ArrayValues(array) =>
blockStore.putArray(blockId, array, putLevel, returnValues)
case ByteBufferValues(bytes) =>
bytes.rewind()
blockStore.putBytes(blockId, bytes, putLevel)
}
size = result.size
result.data match {
case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
case Right (newBytes) => bytesAfterPut = newBytes
case _ =>
}
if (putLevel.useMemory) {
result.droppedBlocks.foreach { updatedBlocks += _ }
}
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
marked = true
putBlockInfo.markReady(size)
if (tellMaster) {
reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
}
updatedBlocks += ((blockId, putBlockStatus))
}
} finally {
if (!marked) {
blockInfo.remove(blockId)
putBlockInfo.markFailure()
logWarning(s"Putting block $blockId failed")
}
}
}
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (putLevel.replication > 1) {
data match {
case ByteBufferValues(bytes) =>
if (replicationFuture != null) {
Await.ready(replicationFuture, Duration.Inf)
}
case _ =>
val remoteStartTime = System.currentTimeMillis
if (bytesAfterPut == null) {
if (valuesAfterPut == null) {
throw new SparkException(
"Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
}
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
replicate(blockId, bytesAfterPut, putLevel)
logDebug("Put block %s remotely took %s"
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
}
BlockManager.dispose(bytesAfterPut)
if (putLevel.replication > 1) {
logDebug("Putting block %s with replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
} else {
logDebug("Putting block %s without replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
}
updatedBlocks
}
對於 doPut 函數,主要做了以下幾個操作:
1)創建 BlockInfo 對象存儲 block 信息。
2)將 BlockInfo 加鎖,然后根據 Storage Level 判斷存儲到 Memory 還是 Disk。同時,對於已經准備好讀的 BlockInfo 要進行解鎖。
3)根據 block 的副本數量決定是否向遠程發送副本。
11.5.1 序列化與否
寫入的具體內容可以是序列化之后的 bytes 也可以是沒有序列化的 value. 此處有一個對 scala 的語法中 Either, Left, Right 關鍵字的理解。
11.6 數據讀取過程分析
def get(blockId: BlockId): Option[Iterator[Any]] = {
val local = getLocal(blockId)
if (local.isDefined) {
logInfo("Found block %s locally".format(blockId))
return local
}
val remote = getRemote(blockId)
if (remote.isDefined) {
logInfo("Found block %s remotely".format(blockId))
return remote
}
None
}
11.6.1 本地讀取
首先在查詢本機的 MemoryStore 和 DiskStore 中是否有所需要的 block 數據存在,如果沒有則發起遠程數據獲取。
11.6.2 遠程讀取
遠程獲取調用路徑, getRemote --> doGetRemote, 在 doGetRemote 中最主要的就是調用 BlockManagerWorker.syncGetBlock 來從遠程獲得數據。
def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
val blockManager = blockManagerWorker.blockManager
val connectionManager = blockManager.connectionManager
val blockMessage = BlockMessage.fromGetBlock(msg)
val blockMessageArray = new BlockMessageArray(blockMessage)
val responseMessage = connectionManager.sendMessageReliablySync(
toConnManagerId, blockMessageArray.toBufferMessage)
responseMessage match {
case Some(message) => {
val bufferMessage = message.asInstanceOf[BufferMessage]
logDebug("Response message received " + bufferMessage)
BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage => {
logDebug("Found " + blockMessage)
return blockMessage.getData
})
}
case None => logDebug("No response message received")
}
null
}
上述這段代碼中最有意思的莫過於 sendMessageReliablySync,遠程數據讀取毫無疑問是一個異步 i/o 操作,這里的代碼怎么寫起來就像是在進行同步的操作一樣呢。也就是說如何知道對方發送回來響應的呢?
別急,繼續去看看 sendMessageReliablySync 的定義:
def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
: Future[Option[Message]] = {
val promise = Promise[Option[Message]]
val status = new MessageStatus(
message, connectionManagerId, s => promise.success(s.ackMessage))
messageStatuses.synchronized {
messageStatuses += ((message.id, status))
}
sendMessage(connectionManagerId, message)
promise.future
}
要是我說秘密在這里,你肯定會說我在扯淡,但確實在此處。注意到關鍵字 Promise 和 Future 沒?
如果這個 future 執行完畢,返回 s.ackMessage。我們再看看這個 ackMessage 是在什么地方被寫入的呢。看一看 ConnectionManager.handleMessage 中的代碼片段:
case bufferMessage: BufferMessage =>
{
if (authEnabled) {
val res = handleAuthentication(connection, bufferMessage)
if (res == true) {
// message was security negotiation so skip the rest
logDebug("After handleAuth result was true, returning")
return
}
}
if (bufferMessage.hasAckId) {
val sentMessageStatus = messageStatuses. synchronized {
messageStatuses.get(bufferMessage.ackId) match {
case Some(status) =>{
messageStatuses -= bufferMessage.ackId
status
}
case None =>{
throw new Exception("Could not find reference for received ack message " +
message.id)
null
}
}
}
sentMessageStatus. synchronized {
sentMessageStatus.ackMessage = Some(message)
sentMessageStatus.attempted = true
sentMessageStatus.acked = true
sentMessageStaus.markDone()
}
}
}
注意
:此處的所調用的 sentMessageStatus.markDone 就會調用在 sendMessageReliablySync 中定義的 promise.Success,不妨看看 MessageStatus 的定義。
class MessageStatus(
val message: Message,
val connectionManagerId: ConnectionManagerId,
completionHandler: MessageStatus => Unit) {
var ackMessage: Option[Message] = None
var attempted = false
var acked = false
def markDone() { completionHandler(this) }
}
11.7 Partition 如何轉化為 Block
在 storage 模塊里面所有的操作都是和 block 相關的,但是在 RDD 里面所有的運算都是基於 partition 的,那么 partition 是如何與 block 對應上的呢?
RDD 計算的核心函數是 iterator() 函數:
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}
如果當前 RDD 的 storage level 不是 NONE 的話,表示該 RDD 在 BlockManager 中有存儲,那么調用 CacheManager 中的 getOrCompute() 函數計算 RDD,在這個函數中 partition 和 block 發生了關系:
首先根據 RDD id 和 partition index 構造出 block id (rdd_xx_xx),接着從 BlockManager 中取出相應的 block。
如果該 block 存在,表示此 RDD 在之前已經被計算過和存儲在 BlockManager 中,因此取出即可,無需再重新計算。
如果該 block 不存在則需要調用 RDD 的 computeOrReadCheckpoint() 函數計算出新的 block,並將其存儲到 BlockManager 中。
需要注意的是 block 的計算和存儲是阻塞的,若另一線程也需要用到此 block 則需等到該線程 block 的 loading 結束。
def getOrCompute[T](rdd:RDD[T],split:Partition,context:TaskContext,storageLevel:StorageLevel):Iterator[T]=
{
val key = "rdd_%d_%d".format(rdd.id, split.index)
logDebug("Looking for partition " + key)
blockManager.get(key) match {
case Some(values) =>
// Partition is already materialized, so just return its values
return values.asInstanceOf[Iterator[T]]
case None =>
// Mark the split as loading (unless someone else marks it first)
loading. synchronized {
if (loading.contains(key)) {
logInfo("Another thread is loading %s, waiting for it to finish...".format(key))
while (loading.contains(key)) {
try {
loading.wait()
} catch {
case _:
Throwable =>}
}
logInfo("Finished waiting for %s".format(key))
// See whether someone else has successfully loaded it. The main way this would fail
// is for the RDD-level cache eviction policy if someone else has loaded the same RDD
// partition but we didn't want to make space for it. However, that case is unlikely
// because it's unlikely that two threads would work on the same RDD partition. One
// downside of the current code is that threads wait serially if this does happen.
blockManager.get(key) match {
case Some(values) =>
return values.asInstanceOf[Iterator[T]]
case None =>
logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key))
loading.add(key)
}
} else {
loading.add(key)
}
}
try {
// If we got here, we have to load the split
logInfo("Partition %s not found, computing it".format(key))
val computedValues = rdd.computeOrReadCheckpoint(split, context)
// Persist the result, so long as the task is not running locally
if (context.runningLocally) {
return computedValues
}
val elements = new ArrayBuffer[Any]
elements++ = computedValues
blockManager.put(key, elements, storageLevel, true)
return elements.iterator.asInstanceOf[Iterator[T]]
} finally {
loading. synchronized {
loading.remove(key)
loading.notifyAll()
}
}
}
這樣 RDD 的 transformation、action 就和 block 數據建立了聯系,雖然抽象上我們的操作是在 partition 層面上進行的,但是 partitio n最終還是被映射成為 block,因此實際上我們的所有操作都是對 block 的處理和存取。
11.8 partition 和 block 的對應關系
在 RDD 中,核心的函數是 iterator:
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}
如果當前 RDD 的 storage level 不是 NONE 的話,表示該 RDD 在 BlockManager 中有存儲,那么調用 CacheManager 中的 getOrCompute 函數計算 RDD,在這個函數中 partition 和 block 就對應起來了:
getOrCompute 函數會先構造 RDDBlockId,其中 RDDBlockId 就把 block 和 partition 聯系起來了,RDDBlockId 產生的 name 就是 BlockId 的 name 屬性,形式是:rdd_rdd.id_partition.index。
def getOrCompute[T](
rdd: RDD[T],
partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {
val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
case Some(blockResult) =>
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(blockResult.readMethod)
existingMetrics.incBytesRead(blockResult.bytes)
val iter = blockResult.data.asInstanceOf[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
case None =>
val storedValues = acquireLockForPartition[T](key)
if (storedValues.isDefined) {
return new InterruptibleIterator[T](context, storedValues.get)
}
try {
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
if (context.isRunningLocally) {
return computedValues
}
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
val metrics = context.taskMetrics
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
new InterruptibleIterator(context, cachedValues)
} finally {
loading.synchronized {
loading.remove(key)
loading.notifyAll()
}
}
}
}
同時 getOrCompute 函數會對 block 進行判斷:
如果該 block 存在,表示此 RDD 在之前已經被計算過和存儲在 BlockManager 中,因此取出即可,無需再重新計算。
如果該 block 不存在則需要調用 RDD 的 computeOrReadCheckpoint() 函數計算出新的block,並將其存儲到 BlockManager 中。
需要注意的是 block 的計算和存儲是阻塞的,若另一線程也需要用到此 block 則需等到該線程 block 的 loading 結束。
第12章 Spark Shuffle 過程
12.1 MapReduce 的 Shuffle 過程介紹
Shuffle 的本義是洗牌、混洗,把一組有一定規則的數據盡量轉換成一組無規則的數據,越隨機越好。MapReduce 中的 Shuffle 更像是洗牌的逆過程,把一組無規則的數據盡量轉換成一組具有一定規則的數據。
為什么 MapReduce 計算模型需要 Shuffle 過程?我們都知道 MapReduce 計算模型一般包括兩個重要的階段:Map 是映射,負責數據的過濾分發;Reduce 是規約,負責數據的計算歸並。Reduce 的數據來源於 Map,Map 的輸出即是 Reduce 的輸入,Reduce 需要通過 Shuffle來 獲取數據。
從 Map 輸出到 Reduce 輸入的整個過程可以廣義地稱為 Shuffle。Shuffle 橫跨 Map 端和 Reduce 端,在 Map 端包括 Spill 過程,在 Reduce 端包括 copy 和 sort 過程,如圖所示:
![]()
12.1.1 Spill 過程(刷寫過程)
Spill 過程包括輸出、排序、溢寫、合並等步驟,如圖所示:
![]()
Collect
每個 Map 任務不斷地以
<key, value>
對的形式把數據輸出到內存中構造的一個環形數據結構中。使用環形數據結構是為了更有效地使用內存空間,在內存中放置盡可能多的數據。
這個數據結構其實就是個字節數組,叫 kvbuffer,名如其義,但是這里面不光放置了<key, value>
數據,還放置了一些索引數據,給放置索引數據的區域起了一個 kvmeta 的別名,在 kvbuffer 的一塊區域上穿了一個 IntBuffer(字節序采用的是平台自身的字節序)的馬甲。<key, value>
數據區域和索引數據區域在 kvbuffer 中是相鄰不重疊的兩個區域,用一個分界點來划分兩者,分界點不是亘古不變的,而是每次 Spill 之后都會更新一次。初始的分界點是 0,<key, value>
數據的存儲方向是向上增長,索引數據的存儲方向是向下增長,如圖所示:
![]()
kvbuffer 的存放指針 bufindex 是一直悶着頭地向上增長,比如 bufindex 初始值為 0,一個 Int 型的 key 寫完之后,bufindex 增長為 4,一個 Int 型的 value 寫完之后,bufindex 增長為 8。
索引是對<key, value>
在 kvbuffer 中的索引,是個四元組,包括:value 的起始位置、key 的起始位置、partition 值、value 的長度,占用四個 Int 長度,kvmeta 的存放指針 kvindex 每次都是向下跳四個“格子”,然后再向上一個格子一個格子地填充四元組的數據。比如 Kvindex 初始位置是 -4,當第一個<key, value>
寫完之后,(kvindex+0) 的位置存放 value 的起始位置、(kvindex+1) 的位置存放 key 的起始位置、(kvindex+2) 的位置存放 partition 的值、(kvindex+3) 的位置存放 value 的長度,然后 kvindex 跳到 -8 位置,等第二個<key, value>
和索引寫完之后,kvindex 跳到-32 位置。kvbuffer 的大小雖然可以通過參數設置,但是總共就那么大,
<key, value>
和索引不斷地增加,加着加着,kvbuffer 總有不夠用的那天,那怎么辦?把數據從內存刷到磁盤上再接着往內存寫數據,把 kvbuffer 中的數據刷到磁盤上的過程就叫 Spill
,多么明了的叫法,內存中的數據滿了就自動地 spill 到具有更大空間的磁盤。關於 Spill 觸發的條件,也就是 kvbuffer 用到什么程度開始 Spill,還是要講究一下的。如果把 kvbuffer 用得死死得,一點縫都不剩的時候再開始 Spill,那 Map 任務就需要等 Spill 完成騰出空間之后才能繼續寫數據;如果 kvbuffer 只是滿到一定程度,比如 80% 的時候就開始 Spill,那在 Spill 的同時,Map 任務還能繼續寫數據,如果 Spill 夠快,Map 可能都不需要為空閑空間而發愁。兩利相衡取其大,一般選擇后者。
Spill 這個重要的過程是由 Spill 線程承擔,Spill 線程從 Map 任務接到“命令”之后就開始正式干活,干的活叫 SortAndSpill,原來不僅僅是 Spill,在 Spill 之前還有個頗具爭議性的 Sort。
Sort
先把 kvbuffer 中的數據按照 partition 值和 key 兩個關鍵字升序排序,移動的只是索引數據,排序結果是 kvmeta 中數據按照 partition 為單位聚集在一起,同一 partition 內的按照 key 有序。
Spill
Spill 線程為這次 Spill 過程創建一個磁盤文件:從所有的本地目錄中輪詢查找能存儲這么大空間的目錄,找到之后在其中創建一個類似於 “spill12.out” 的文件。Spill 線程根據排過序的 kvmeta 挨個 partition 的把
<key, value>
數據吐到這個文件中,一個 partition 對應的數據吐完之后順序地吐下個 partition,直到把所有的 partition 遍歷完。一個 partition 在文件中對應的數據也叫段 (segment)。所有的 partition 對應的數據都放在這個文件里,雖然是順序存放的,但是怎么直接知道某個 partition 在這個文件中存放的起始位置呢?強大的索引又出場了。有一個三元組記錄某個 partition 對應的數據在這個文件中的索引:起始位置、原始數據長度、壓縮之后的數據長度,一個 partition 對應一個三元組。然后把這些索引信息存放在內存中,如果內存中放不下了,后續的索引信息就需要寫到磁盤文件中了:從所有的本地目錄中輪訓查找能存儲這么大空間的目錄,找到之后在其中創建一個類似於 “spill12.out.index” 的文件,文件中不光存儲了索引數據,還存儲了 crc32 的校驗數據。(spill12.out.index 不一定在磁盤上創建,如果內存(默認 1M 空間)中能放得下就放在內存中,即使在磁盤上創建了,和 spill12.out 文件也不一定在同一個目錄下。)
每一次 Spill 過程就會最少生成一個 out 文件,有時還會生成 index 文件,Spill 的次數也烙印在文件名中。索引文件和數據文件的對應關系如下圖所示:
![]()
在 Spill 線程如火如荼的進行 SortAndSpill 工作的同時,Map 任務不會因此而停歇,而是一無既往地進行着數據輸出。Map 還是把數據寫到 kvbuffer 中,那問題就來了:
<key, value>
只顧着悶頭按照 bufindex 指針向上增長,kvmeta 只顧着按照 kvindex 向下增長,是保持指針起始位置不變繼續跑呢,還是另謀它路?如果保持指針起始位置不變,很快 bufindex 和 kvindex 就碰頭了,碰頭之后再重新開始或者移動內存都比較麻煩,不可取。Map 取 kvbuffer 中剩余空間的中間位置,用這個位置設置為新的分界點,bufindex 指針移動到這個分界點,kvindex 移動到這個分界點的 -16 位置,然后兩者就可以和諧地按照自己既定的軌跡放置數據了,當 Spill 完成,空間騰出之后,不需要做任何改動繼續前進。分界點的轉換如下圖所示:
![]()
Map 任務總要把輸出的數據寫到磁盤上,即使輸出數據量很小在內存中全部能裝得下,在最后也會把數據刷到磁盤上。
12.1.2 Merge
![]()
Map 任務如果輸出數據量很大,可能會進行好幾次 Spill,out 文件和 Index 文件會產生很多,分布在不同的磁盤上。最后把這些文件進行合並的 merge 過程閃亮登場。
Merge 過程怎么知道產生的 Spill 文件都在哪了呢?從所有的本地目錄上掃描得到產生的 Spill 文件,然后把路徑存儲在一個數組里。Merge 過程又怎么知道 Spill 的索引信息呢?沒錯,也是從所有的本地目錄上掃描得到 Index 文件,然后把索引信息存儲在一個列表里。到這里,又遇到了一個值得納悶的地方。在之前 Spill 過程中的時候為什么不直接把這些信息存儲在內存中呢,何必又多了這步掃描的操作?特別是 Spill 的索引數據,之前當內存超限之后就把數據寫到磁盤,現在又要從磁盤把這些數據讀出來,還是需要裝到更多的內存中。之所以多此一舉,是因為這時 kvbuffer 這個內存大戶已經不再使用可以回收,有內存空間來裝這些數據了。(對於內存空間較大的土豪來說,用內存來省卻這兩個 io 步驟還是值得考慮的。)
然后為 merge 過程創建一個叫 file.out 的文件和一個叫 file.out.Index 的文件用來存儲最終的輸出和索引。
一個 partition 一個 partition 的進行合並輸出。對於某個 partition 來說,從索引列表中查詢這個 partition 對應的所有索引信息,每個對應一個段插入到段列表中。也就是這個 partition 對應一個段列表,記錄所有的 Spill 文件中對應的這個 partition 那段數據的文件名、起始位置、長度等等。
然后對這個 partition 對應的所有的 segment 進行合並,目標是合並成一個 segment。當這個 partition 對應很多個 segment 時,會分批地進行合並:先從 segment 列表中把第一批取出來,以 key 為關鍵字放置成最小堆,然后從最小堆中每次取出最小的<key, value>
輸出到一個臨時文件中,這樣就把這一批段合並成一個臨時的段,把它加回到 segment 列表中;再從 segment 列表中把第二批取出來合並輸出到一個臨時 segment,把其加入到列表中;這樣往復執行,直到剩下的段是一批,輸出到最終的文件中。
最終的索引數據仍然輸出到 Index 文件中。
Map 端的 Shuffle 過程到此結束。
12.1.3 Copy
Reduce 任務通過 HTTP 向各個 Map 任務拖取它所需要的數據。每個節點都會啟動一個常駐的 HTTP server,其中一項服務就是響應 Reduce 拖取 Map 數據。當有 MapOutput 的 HTTP 請求過來的時候,HTTP server 就讀取相應的 Map 輸出文件中對應這個 Reduce 部分的數據通過網絡流輸出給 Reduce。
Reduce 任務拖取某個 Map 對應的數據,如果在內存中能放得下這次數據的話就直接把數據寫到內存中。Reduce 要向每個 Map 去拖取數據,在內存中每個 Map 對應一塊數據,當內存中存儲的 Map 數據占用空間達到一定程度的時候,開始啟動內存中 merge,把內存中的數據 merge 輸出到磁盤上一個文件中。
如果在內存中不能放得下這個 Map 的數據的話,直接把 Map 數據寫到磁盤上,在本地目錄創建一個文件,從 HTTP 流中讀取數據然后寫到磁盤,使用的緩存區大小是 64K。拖一個 Map 數據過來就會創建一個文件,當文件數量達到一定閾值時,開始啟動磁盤文件 merge,把這些文件合並輸出到一個文件。
有些 Map 的數據較小是可以放在內存中的,有些 Map 的數據較大需要放在磁盤上,這樣最后 Reduce 任務拖過來的數據有些放在內存中了有些放在磁盤上,最后會對這些來一個全局合並。
12.1.4 Merge Sort
這里使用的 Merge 和 Map 端使用的 Merge 過程一樣。Map 的輸出數據已經是有序的,Merge 進行一次合並排序,所謂 Reduce 端的 sort 過程就是這個合並的過程。一般 Reduce 是一邊 copy 一邊 sort,即 copy 和 sort 兩個階段是重疊而不是完全分開的。
Reduce 端的 Shuffle 過程至此結束。
12.2 HashShuffle 過程介紹
Spark 豐富了任務類型,有些任務之間數據流轉不需要通過 Shuffle,但是有些任務之間還是需要通過 Shuffle 來傳遞數據,比如 wide dependency 的 group by key。
Spark 中需要 Shuffle 輸出的 Map 任務會為每個 Reduce 創建對應的 bucket,Map 產生的結果會根據設置的 partitioner 得到對應的 bucketId,然后填充到相應的 bucket 中去。每個 Map 的輸出結果可能包含所有的 Reduce 所需要的數據,所以每個 Map 會創建 R 個 bucket(R 是 reduce 的個數),M 個 Map 總共會創建 M*R 個 bucket。
Map 創建的 bucket 其實對應磁盤上的一個文件,Map 的結果寫到每個 bucket 中其實就是寫到那個磁盤文件中,這個文件也被稱為 blockFile,是 Disk Block Manager 管理器通過文件名的 Hash 值對應到本地目錄的子目錄中創建的。每個 Map 要在節點上創建 R 個磁盤文件用於結果輸出,Map 的結果是直接輸出到磁盤文件上的,100KB 的內存緩沖是用來創建 Fast Buffered OutputStream 輸出流。這種方式一個問題就是 Shuffle 文件過多。
![]()
1)每一個 Mapper 創建出和 Reducer 數目相同的 bucket,bucket 實際上是一個 buffer,其大小為 spark.shuffle.file.buffer.kb(默認 32KB)。
2)Mapper 產生的結果會根據設置的 partition 算法填充到每個 bucket 中去,然后再寫入到磁盤文件。
3)Reducer 從遠端或是本地的 block manager 中找到相應的文件讀取數據。
針對上述 Shuffle 過程產生的文件過多問題,Spark 有另外一種改進的 Shuffle 過程:
consolidation Shuffle
,以期顯著減少 Shuffle 文件的數量。在 consolidation Shuffle 中每個 bucket 並非對應一個文件,而是對應文件中的一個 segment 部分。Job 的 map 在某個節點上第一次執行,為每個 reduce 創建 bucke 對應的輸出文件,把這些文件組織成ShuffleFileGroup
,當這次 map 執行完之后,這個 ShuffleFileGroup 可以釋放為下次循環利用;當又有 map 在這個節點上執行時,不需要創建新的 bucket 文件,而是在上次的 ShuffleFileGroup 中取得已經創建的文件繼續追加寫一個 segment;當前次 map 還沒執行完,ShuffleFileGroup 還沒有釋放,這時如果有新的 map 在這個節點上執行,無法循環利用這個 ShuffleFileGroup,而是只能創建新的 bucket 文件組成新的 ShuffleFileGroup 來寫輸出。
![]()
比如一個 Job 有 3 個 Map 和 2 個 reduce:
(1) 如果此時集群有 3 個節點有空槽,每個節點空閑了一個 core,則 3 個 Map 會調度到這 3 個節點上執行,每個 Map 都會創建 2 個 Shuffle 文件,總共創建 6 個 Shuffle 文件;
(2) 如果此時集群有 2 個節點有空槽,每個節點空閑了一個 core,則 2 個 Map 先調度到這 2 個節點上執行,每個 Map 都會創建 2 個 Shuffle 文件,然后其中一個節點執行完 Map 之后又調度執行另一個 Map,則這個 Map 不會創建新的 Shuffle 文件,而是把結果輸出追加到之前 Map 創建的 Shuffle 文件中;總共創建 4 個 Shuffle 文件;
(3) 如果此時集群有 2 個節點有空槽,一個節點有 2 個空 core 一個節點有 1 個空 core,則一個節點調度 2 個 Map 一個節點調度 1 個 Map,調度 2 個 Map 的節點上,一個 Map 創建了 Shuffle 文件,后面的 Map 還是會創建新的 Shuffle 文件,因為上一個 Map 還正在寫,它創建的 ShuffleFileGroup 還沒有釋放;總共創建 6 個 Shuffle 文件。
優點:
1)快-不需要排序,也不需要維持 hash 表
2)不需要額外空間用作排序
3)不需要額外IO-數據寫入磁盤只需一次,讀取也只需一次
缺點:
1)當 partitions 大時,輸出大量的文件(cores * R),性能開始降低
2)大量的文件寫入,使文件系統開始變為隨機寫,性能比順序寫要降低 100 倍
3)緩存空間占用比較大
Reduce 去拖 Map 的輸出數據,Spark 提供了兩套不同的拉取數據框架:通過 socket 連接去取數據;使用n etty 框架去取數據。
每個節點的 Executor 會創建一個 BlockManager,其中會創建一個 BlockManagerWorker 用於響應請求。當 Reduce 的 GET_BLOCK 的請求過來時,讀取本地文件將這個 blockId 的數據返回給 Reduce。如果使用的是 Netty 框架,BlockManager 會創建 ShuffleSender 用於發送 Shuffle 數據。並不是所有的數據都是通過網絡讀取,對於在本節點的 Map 數據,Reduce 直接去磁盤上讀取而不再通過網絡框架。
Reduce 拖過來數據之后以什么方式存儲呢?Spark Map 輸出的數據沒有經過排序,Spark Shuffle 過來的數據也不會進行排序,Spark 認為 Shuffle 過程中的排序不是必須的
,並不是所有類型的 Reduce 需要的數據都需要排序,強制地進行排序只會增加 Shuffle 的負擔。Reduce 拖過來的數據會放在一個 HashMap 中,HashMap 中存儲的也是<key, value>
對,key 是 Map 輸出的 key,Map 輸出對應這個 key 的所有 value 組成 HashMap 的 value。Spark 將 Shuffle 取過來的每一個<key, value>
對插入或者更新到 HashMap 中,來一個處理一個。HashMap 全部放在內存中。
Shuffle 取過來的數據全部存放在內存中,對於數據量比較小或者已經在 Map 端做過合並處理的 Shuffle 數據,占用內存空間不會太大,但是對於比如 group by key 這樣的操作,Reduce 需要得到 key 對應的所有 value,並將這些 value 組一個數組放在內存中,這樣當數據量較大時,就需要較多內存。
當內存不夠時,要不就失敗,要不就用老辦法把內存中的數據移到磁盤上放着。Spark 意識到在處理數據規模遠遠大於內存空間時所帶來的不足,引入了一個具有外部排序的方案。Shuffle 過來的數據先放在內存中,當內存中存儲的<key, value>
對超過 1000 並且內存使用超過 70% 時,判斷節點上可用內存如果還足夠,則把內存緩沖區大小翻倍,如果可用內存不再夠了,則把內存中的<key, value>
對排序然后寫到磁盤文件中。最后把內存緩沖區中的數據排序之后和那些磁盤文件組成一個最小堆,每次從最小堆中讀取最小的數據,這個和 MapReduce 中的 merge 過程類似。
12.3 SortShuffle 過程介紹
從 1.2.0 開始默認為 sort shuffle(spark.shuffle.manager = sort),實現邏輯類似於 Hadoop MapReduce,Hash Shuffle 每一個 reducers 產生一個文件,但是 Sort Shuffle 只是產生一個按照 reducer id 排序可索引的文件,這樣,只需獲取有關文件中的相關數據塊的位置信息,並 fseek 就可以讀取指定 reducer 的數據。但對於 rueducer 數比較少的情況,Hash Shuffle 明顯要比 Sort Shuffle 快,因此 Sort Shuffle 有個 “fallback” 計划,對於 reducers 數少於 “spark.shuffle.sort.bypassMergeThreshold” (200 by default),我們使用 fallback 計划,hashing 相關數據到分開的文件,然后合並這些文件為一個,具體實現為 BypassMergeSortShuffleWriter。
![]()
在 map 進行排序,在 reduce 端應用 Timsort[1] 進行合並。map 端是否容許 spill,通過 spark.shuffle.spill 來設置,默認是 true。設置為 false,如果沒有足夠的內存來存儲 map 的輸出,那么就會導致 OOM 錯誤,因此要慎用。
用於存儲 map 輸出的內存為:“JVM Heap Size” * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction,默認為: “JVM Heap Size” * 0.2 * 0.8 = “JVM Heap Size” * 0.16。如果你在同一個執行程序中運行多個線程(設定 spark.executor.cores/ spark.task.cpus 超過 1),每個 map 任務存儲的空間為 “JVM Heap Size” * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction / spark.executor.cores * spark.task.cpus,默認 2 個 cores,那么為 0.08 * “JVM Heap Size”。
![]()
spark 使用 AppendOnlyMap 存儲 map 輸出的數據,利用開源 hash 函數 MurmurHash3 和平方探測法把 key 和 value 保存在相同的 array 中。這種保存方法可以是 spark 進行 combine。如果 spill 為 true,會在 spill 前 sort。
與 hash shuffle 相比,sort shuffle 中每個 Mapper 只產生一個數據文件和一個索引文件,數據文件中的數據按照 Reducer 排序,但屬於同一個 Reducer 的數據不排序。Mapper 產生的數據先放到 AppendOnlyMap 這個數據結構中,如果內存不夠,數據則會 spill 到磁盤,最后合並成一個文件。
與 Hash shuffle 相比,shuffle 文件數量減少,內存使用更加可控。但排序會影響速度。
優點:
1)map 創建文件量較少。
2)少量的 IO 隨機操作,大部分是順序讀寫。
缺點:
1)要比 Hash Shuffle 要慢,需要自己通過 spark.shuffle.sort.bypassMergeThreshold 來設置合適的值。
2)如果使用 SSD 盤存儲 shuffle 數據,那么 Hash Shuffle 可能更合適。
12.4 TungstenShuffle 過程介紹
Tungsten-sort 算不得一個全新的 shuffle 方案,它在
特定場景下
基於類似現有的 Sort Based Shuffle 處理流程,對內存 /CPU/Cache 使用做了非常大的優化。帶來高效的同時,也就限定了自己的使用場景。如果 Tungsten-sort 發現自己無法處理,則會自動使用 Sort Based Shuffle 進行處理。Tungsten 中文是鎢絲的意思。 Tungsten Project 是 Databricks 公司提出的對 Spark 優化內存和 CPU 使用的計划,該計划初期似乎對 Spark SQL 優化的最多。不過部分 RDD API 還有 Shuffle 也因此受益。
Tungsten-sort 優化點主要在三個方面:
1)直接在 serialized binary data 上 sort 而不是 java objects,減少了 memory 的開銷和 GC 的 overhead。
2)提供 cache-efficient sorter,使用一個 8bytes 的指針,把排序轉化成了一個指針數組的排序。
3)spill 的 merge 過程也無需反序列化即可完成。這些優化的實現導致引入了一個新的內存管理模型,類似 OS 的 Page,對應的實際數據結構為 MemoryBlock,支持 off-heap 以及 in-heap 兩種模式。為了能夠對 Record 在這些 MemoryBlock 進行定位,引入了 Pointer(指針)的概念。
如果你還記得 Sort Based Shuffle 里存儲數據的對象 PartitionedAppendOnlyMap,這是一個放在 JVM heap 里普通對象,在 Tungsten-sort 中,他被替換成了類似操作系統內存頁的對象。如果你無法申請到新的 Page,這個時候就要執行 spill 操作,也就是寫入到磁盤的操作。具體觸發條件,和 Sort Based Shuffle 也是類似的。
Spark 默認開啟的是 Sort Based Shuffle,想要打開 Tungsten-sort,請設置
spark.shuffle.manager=tungsten-sort
對應的實現類是:org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
名字的來源是因為使用了大量 JDK Sun Unsafe API。當且僅當下面條件都滿足時,才會使用新的 Shuffle 方式:
1)Shuffle dependency 不能帶有 aggregation 或者輸出需要排序
2)Shuffle 的序列化器需要是 KryoSerializer 或者 Spark SQL's 自定義的一些序列化方式.
3)Shuffle 文件的數量不能大於 16777216。
4)序列化時,單條記錄不能大於 128 MB。
可以看到,能使用的條件還是挺苛刻的。
這些限制來源於哪里
參看如下代碼,page 的大小:
this.pageSizeBytes = (int) Math.min(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES,shuffleMemoryManager.pageSizeBytes());
這就保證了頁大小不超過 PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES 的值,該值就被定義成了 128M。
而產生這個限制的具體設計原因,我們還要仔細分析下 Tungsten 的內存模型,如下圖所示:
![]()
這張圖其實畫的是 on-heap 的內存邏輯圖,其中 #Page 部分為 13bit,Offset 為 51bit,你會發現 2^51 >> 128M 的。但是在 Shuffle 的過程中,對 51bit 做了壓縮,使用了 27bit,具體如下:
[24 bit partition number][13 bit memory page number][27 bit offset in page]
這里預留出的 24bi t給了 partition number,為了后面的排序用。上面的好幾個限制其實都是因為這個指針引起的:
第一個是 partition 的限制,前面的數字 16777216 就是來源於 partition number 使用 24bit 表示的。
第二個是 page number。
第三個是偏移量,最大能表示到 2^27=128M。那一個 Task 能管理到的內存是受限於這個指針的,最多是 2^13 * 128M 也就是 1TB 左右。有了這個指針,我們就可以定位和管理到 off-heap 或者 on-heap 里的內存了。這個模型還是很漂亮的,內存管理也非常高效,記得之前的預估 PartitionedAppendOnlyMap 的內存是非常困難的,但是通過現在的內存管理機制,是非常快速並且精確的。
對於第一個限制,那是因為后續 Shuffle Write 的 sort 部分,只對前面 24bit 的 partiton number 進行排序,key 的值沒有被編碼到這個指針,所以沒辦法進行 ordering。
同時,因為整個過程是追求不反序列化的,所以不能做 aggregation。Shuffle Write
核心類:
org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter
數據會通過 UnsafeShuffleExternalSorter.insertRecordIntoSorter 一條一條寫入到 serOutputStream 序列化輸出流。
這里消耗內存的地方是
serBuffer = new MyByteArrayOutputStream(1024 * 1024)
默認是 1M,類似於 Sort Based Shuffle 中的 ExternalSorter,在 Tungsten Sort 對應的為 UnsafeShuffleExternalSorter,記錄序列化后就通過 sorter.insertRecord 方法放到 sorter 里去了。
這里 sorter 負責申請 Page,釋放 Page,判斷是否要進行 spill 都這個類里完成。代碼的架子其實和 Sort Based 是一樣的。
![]()
(另外,值得注意的是,這張圖里進行 spill 操作的同時檢查內存可用而導致的 Exeception 的 bug 已經在 1.5.1 版本被修復了,忽略那條路徑)
內存是否充足的條件依然 shuffleMemoryManager 來決定,也就是所有 Task Shuffle 申請的 Page 內存總和不能大於下面的值:
ExecutorHeapMemeory * 0.2 * 0.8
上面的數字可通過下面兩個配置來更改:
spark.shuffle.memoryFraction=0.2
spark.shuffle.safetyFraction=0.8
UnsafeShuffleExternalSorter 負責申請內存,並且會生成該條記錄最后的邏輯地址,也就前面提到的 Pointer。
接着 Record 會繼續流轉到 UnsafeShuffleInMemorySorter 中,這個對象維護了一個指針數組:
private long[] pointerArray;
數組的初始大小為 4096,后續如果不夠了,則按每次兩倍大小進行擴充。
假設 100 萬條記錄,那么該數組大約是 8M 左右,所以其實還是很小的。一旦 spill 后該 UnsafeShuffleInMemorySorter 就會被賦為 null,被回收掉。
我們回過頭來看 spill,其實邏輯上也異常簡單了,UnsafeShuffleInMemorySorter 會返回一個迭代器,該迭代器粒度每個元素就是一個指針,然后到根據該指針可以拿到真實的 record,然后寫入到磁盤,因為這些 record 在一開始進入 UnsafeShuffleExternalSorter 就已經被序列化了,所以在這里就純粹變成寫字節數組了。形成的結構依然和 Sort Based Shuffle 一致,一個文件里不同的 partiton 的數據用 fileSegment 來表示,對應的信息存在一個 index 文件里。
另外寫文件的時候也需要一個 buffer:
spark.shuffle.file.buffer=32k
另外從內存里拿到數據放到 DiskWriter,這中間還要有個中轉,是通過:
final byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE=1024 * 1024];
來完成的,都是內存,所以很快。
Task 結束前,我們要做一次 mergeSpills 操作,然后形成一個 shuffle 文件。這里面其實也挺復雜的,
如果開啟了
spark.shuffle.unsafe.fastMergeEnabled=true
並且沒有開啟
spark.shuffle.compress=true
或者壓縮方式為:
LZFCompressionCodec
則可以非常高效的進行合並,叫做 transferTo。不過無論是什么合並,都不需要進行反序列化。Shuffle Read
Shuffle Read 完全復用 HashShuffleReader,具體參看 Sort-Based Shuffle。
12.5 MapReduce 與 Spark 過程對比
MapReduce 和 Spark 的 Shuffle 過程對比如下:

第13章 Spark 內存管理
Spark 作為一個基於內存的分布式計算引擎,其內存管理模塊在整個系統中扮演着非常重要的角色。理解 Spark 內存管理的基本原理,有助於更好地開發 Spark 應用程序和進行性能調優。本文中闡述的原理基於 Spark 2.1 版本。
在執行 Spark 的應用程序時,Spark 集群會啟動 Driver 和 Executor 兩種 JVM 進程,前者為主控進程,負責創建 Spark 上下文,提交 Spark 作業(Job),並將作業轉化為計算任務(Task),在各個 Executor 進程間協調任務的調度,后者負責在工作節點上執行具體的計算任務,並將結果返回給 Driver,同時為需要持久化的 RDD 提供存儲功能。由於 Driver 的內存管理相對來說較為簡單,本文主要對 Executor 的內存管理進行分析,下文中的 Spark 內存均特指 Executor 的內存
。
13.1 堆內和堆外內存規划
作為一個 JVM 進程,Executor 的內存管理建立在 JVM 的內存管理之上,Spark 對 JVM 的堆內(On-heap)空間進行了更為詳細的分配,以充分利用內存。同時,Spark 引入了堆外(Off-heap)內存,使之可以直接在工作節點的系統內存中開辟空間,進一步優化了內存的使用。堆內和堆外內存示意圖如下:
![]()
13.1.1 堆內內存
堆內內存的大小,由 Spark 應用程序啟動時的
-executor-memory
或spark.executor.memory
參數配置。Executor 內運行的並發任務共享 JVM 堆內內存,這些任務在緩存 RDD 數據和廣播(Broadcast)數據時占用的內存被規划為存儲(Storage)內存
,而這些任務在執行 Shuffle 時占用的內存被規划為執行(Execution)內存
,剩余的部分不做特殊規划,那些 Spark 內部的對象實例,或者用戶定義的 Spark 應用程序中的對象實例,均占用剩余的空間。不同的管理模式下,這三部分占用的空間大小各不相同(下面第 2 小節會進行介紹)。
Spark 對堆內內存的管理是一種邏輯上的規划式的管理
,因為對象實例占用內存的申請和釋放都由 JVM 完成,Spark 只能在申請后和釋放前記錄這些內存,我們來看其具體流程:
申請內存:
1)Spark 在代碼中 new 一個對象實例
2)JVM 從堆內內存分配空間,創建對象並返回對象引用
3)Spark 保存該對象的引用,記錄該對象占用的內存
釋放內存:
1)Spark 記錄該對象釋放的內存,刪除該對象的引用
2)等待 JVM 的垃圾回收機制釋放該對象占用的堆內內存我們知道,
JVM 的對象可以以序列化的方式存儲
,序列化的過程是將對象轉換為二進制字節流,本質上可以理解為將非連續空間的鏈式存儲轉化為連續空間或塊存儲
,在訪問時則需要進行序列化的逆過程--反序列化,將字節流轉化為對象,序列化的方式可以節省存儲空間,但增加了存儲和讀取時候的計算開銷。
對於 Spark 中序列化的對象,由於是字節流的形式,其占用的內存大小可直接計算,而對於非序列化的對象,其占用的內存是通過周期性地采樣近似估算而得,即並不是每次新增的數據項都會計算一次占用的內存大小,這種方法降低了時間開銷但是有可能誤差較大,導致某一時刻的實際內存有可能遠遠超出預期。此外,在被 Spark 標記為釋放的對象實例,很有可能在實際上並沒有被 JVM 回收,導致實際可用的內存小於 Spark 記錄的可用內存。所以 Spark 並不能准確記錄實際可用的堆內內存,從而也就無法完全避免內存溢出(OOM, Out of Memory)的異常。
雖然不能精准控制堆內內存的申請和釋放,但 Spark 通過對存儲內存和執行內存各自獨立的規划管理,可以決定是否要在存儲內存里緩存新的 RDD,以及是否為新的任務分配執行內存,在一定程度上可以提升內存的利用率,減少異常的出現。
13.1.2 堆外內存
為了進一步優化內存的使用以及提高 Shuffle 時排序的效率,Spark 引入了
堆外(Off-heap)內存
,使之可以直接在工作節點的系統內存中開辟空間
,存儲經過序列化的二進制數據。利用 JDK Unsafe API(從 Spark 2.0 開始,在管理堆外的存儲內存時不再基於 Tachyon,而是與堆外的執行內存一樣,基於 JDK Unsafe API 實現),Spark 可以直接操作系統堆外內存,減少了不必要的內存開銷,以及頻繁的 GC 掃描和回收,提升了處理性能。堆外內存可以被精確地申請和釋放
,而且序列化的數據占用的空間可以被精確計算,所以相比堆內內存來說降低了管理的難度,也降低了誤差。
在默認情況下堆外內存並不啟用,可通過配置spark.memory.offHeap.enabled
參數啟用,並由spark.memory.offHeap.size
參數設定堆外空間的大小。除了沒有 other 空間,堆外內存與堆內內存的划分方式相同,所有運行中的並發任務共享存儲內存和執行內存。
13.1.3 內存管理接口
Spark 為存儲內存和執行內存的管理提供了統一的接口--MemoryManager,同一個 Executor 內的任務都調用這個接口的方法來申請或釋放內存:
內存管理接口的主要方法:
// 申請存儲內存
def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
// 申請展開內存
def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
// 申請執行內存
def acquireExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long
// 釋放存儲內存
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit
// 釋放執行內存
def releaseExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Unit
// 釋放展開內存
def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit
Spark的內存管理 – 內存管理接口

我們看到,在調用這些方法時都需要指定其內存模式(MemoryMode),這個參數決定了是在堆內還是堆外完成這次操作。MemoryManager 的具體實現上,Spark 1.6 之后默認為統一管理(Unified Memory Manager)方式,1.6 之前采用的靜態管理(Static Memory Manager)方式仍被保留,可通過配置
spark.memory.useLegacyMode
參數啟用。兩種方式的區別在於對空間分配的方式,下面的第 2 小節會分別對這兩種方式進行介紹。
13.2 內存空間分配
13.2.1 靜態內存管理
在 Spark 最初采用的靜態內存管理機制下,存儲內存、執行內存和其他內存的大小在 Spark 應用程序運行期間均為固定的,但用戶可以應用程序啟動前進行配置。
靜態內存管理圖示--堆內

可以看到,可用的堆內內存的大小需要按照下面的方式計算:
可用堆內內存空間:
可用的存儲內存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction
可用的執行內存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction
其中 systemMaxMemory 取決於當前 JVM 堆內內存的大小,最后可用的執行內存或者存儲內存要在此基礎上與各自的 memoryFraction 參數和 safetyFraction 參數相乘得出。上述計算公式中的兩個 safetyFraction 參數,其意義在於在邏輯上預留出 1-safetyFraction 這么一塊保險區域,降低因實際內存超出當前預設范圍而導致 OOM 的風險(上文提到,對於非序列化對象的內存采樣估算會產生誤差)。值得注意的是,這個預留的保險區域僅僅是一種邏輯上的規划,在具體使用時 Spark 並沒有區別對待,和 “其它內存” 一樣交給了 JVM 去管理。
堆外的空間分配較為簡單,只有存儲內存和執行內存,如下圖所示。可用的執行內存和存儲內存占用的空間大小直接由參數
spark.memory.storageFraction
決定,由於堆外內存占用的空間可以被精確計算,所以無需再設定保險區域。
靜態內存管理圖示--堆外

靜態內存管理機制實現起來較為簡單,但如果用戶不熟悉 Spark 的存儲機制,或沒有根據具體的數據規模和計算任務或做相應的配置,很容易造成 “一半海水,一半火焰” 的局面,即存儲內存和執行內存中的一方剩余大量的空間,而另一方卻早早被占滿,不得不淘汰或移出舊的內容以存儲新的內容。由於新的內存管理機制的出現,這種方式目前已經很少有開發者使用,出於兼容舊版本的應用程序的目的,Spark 仍然保留了它的實現。
13.2.2 統一內存管理
Spark 1.6 之后引入的統一內存管理機制,與靜態內存管理的區別在於存儲內存和執行內存共享同一塊空間,可以動態占用對方的空閑區域。
統一內存管理圖示--堆內

統一內存管理圖示--堆外

其中最重要的優化在於
動態占用機制
,其規則如下:
1)設定基本的存儲內存和執行內存區域(spark.storage.storageFraction
參數),該設定確定了雙方各自擁有的空間的范圍。
2)雙方的空間都不足時,則存儲到硬盤;若己方空間不足而對方空余時,可借用對方的空間;(存儲空間不足是指不足以放下一個完整的 Block)。
3)執行內存的空間被對方占用后,可讓對方將占用的部分轉存到硬盤,然后 “歸還” 借用的空間。
4)存儲內存的空間被對方占用后,無法讓對方 “歸還”,因為需要考慮 Shuffle 過程中的很多因素,實現起來較為復雜。
動態占用機制圖示

憑借統一內存管理機制,Spark 在一定程度上提高了堆內和堆外內存資源的利用率,降低了開發者維護 Spark 內存的難度,但並不意味着開發者可以高枕無憂。譬如,所以如果存儲內存的空間太大或者說緩存的數據過多,反而會導致頻繁的全量垃圾回收,降低任務執行時的性能,因為緩存的 RDD 數據通常都是長期駐留內存的。所以要想充分發揮 Spark 的性能,需要開發者進一步了解存儲內存和執行內存各自的管理方式和實現原理。
13.3 存儲內存管理
13.3.1 RDD 的持久化機制
彈性分布式數據集(RDD)作為 Spark 最根本的數據抽象,是只讀的分區記錄(Partition)的集合,只能基於在穩定物理存儲中的數據集上創建,或者在其他已有的 RDD 上執行轉換(Transformation)操作產生一個新的 RDD。轉換后的 RDD 與原始的 RDD 之間產生的依賴關系,構成了血統(Lineage)。憑借血統,Spark 保證了每一個 RDD 都可以被重新恢復。但 RDD 的所有轉換都是惰性的,即只有當一個返回結果給 Driver 的行動(Action)發生時,Spark 才會創建任務讀取 RDD,然后真正觸發轉換的執行。
Task 在啟動之初讀取一個分區時,會先判斷這個分區是否已經被持久化,如果沒有則需要檢查 Checkpoint 或按照血統重新計算。所以如果一個 RDD 上要執行多次行動,可以在第一次行動中使用 persist 或 cache 方法,在內存或磁盤中持久化或緩存這個 RDD,從而在后面的行動時提升計算速度。事實上,cache 方法是使用默認的 MEMORY_ONLY 的存儲級別將 RDD 持久化到內存,故緩存是一種特殊的持久化。堆內和堆外存儲內存的設計,便可以對緩存 RDD 時使用的內存做統一的規划和管理
(存儲內存的其他應用場景,如緩存 broadcast 數據,暫時不在本文的討論范圍之內)。
RDD 的持久化由 Spark 的 Storage 模塊負責,實現了 RDD 與物理存儲的解耦合。Storage 模塊負責管理 Spark 在計算過程中產生的數據,將那些在內存或磁盤、在本地或遠程存取數據的功能封裝了起來。在具體實現時 Driver 端和 Executor 端的 Storage 模塊構成了主從式的架構,即 Driver 端的 BlockManager 為 Master,Executor 端的 BlockManager 為 Slave。Storage 模塊在邏輯上以 Block 為基本存儲單位,RDD 的每個 Partition 經過處理后唯一對應一個 Block(BlockId 的格式為 rdd_RDD-ID_PARTITION-ID )。Master 負責整個 Spark 應用程序的 Block 的元數據信息的管理和維護,而 Slave 需要將 Block 的更新等狀態上報到 Master,同時接收 Master 的命令,例如新增或刪除一個 RDD。
Storage 模塊示意圖

在對 RDD 持久化時,Spark 規定了 MEMORY_ONLY、MEMORY_AND_DISK 等 7 種不同的 存儲級別,而存儲級別是以下 5 個變量的組合:
存儲級別
class StorageLevel private(
private var _useDisk: Boolean, // 磁盤
private var _useMemory: Boolean, // 這里其實是指堆內內存
private var _useOffHeap: Boolean, // 堆外內存
private var _deserialized: Boolean, // 是否為非序列化
private var _replication: Int = 1 // 副本個數
)
通過對數據結構的分析,可以看出存儲級別從三個維度定義了 RDD 的 Partition(同時也就是 Block)的存儲方式:
1)存儲位置:磁盤/堆內內存/堆外內存。如 MEMORY_AND_DISK 是同時在磁盤和堆內內存上存儲,實現了冗余備份。OFF_HEAP 則是只在堆外內存存儲,目前選擇堆外內存時不能同時存儲到其他位置。
2)存儲形式:Block 緩存到存儲內存后,是否為非序列化的形式。如 MEMORY_ONLY 是非序列化方式存儲,OFF_HEAP 是序列化方式存儲。
3)副本數量:大於 1 時需要遠程冗余備份到其他節點。如 DISK_ONLY_2 需要遠程備份 1 個副本。
13.3.2 RDD 緩存的過程
RDD 在緩存到存儲內存之前,Partition 中的數據一般以迭代器(Iterator)的數據結構來訪問,這是 Scala 語言中一種遍歷數據集合的方法。通過 Iterator 可以獲取分區中每一條序列化或者非序列化的數據項 (Record),這些 Record 的對象實例在邏輯上占用了 JVM 堆內內存的 other 部分的空間,同一 Partition 的不同 Record 的空間並不連續。
RDD 在緩存到存儲內存之后,Partition 被轉換成 Block,Record 在堆內或堆外存儲內存中占用一塊連續的空間。將 Partition 由不連續的存儲空間轉換為連續存儲空間的過程,Spark 稱之為 “展開”(Unroll)
。Block 有序列化和非序列化兩種存儲格式,具體以哪種方式取決於該 RDD 的存儲級別。非序列化的 Block 以一種 DeserializedMemoryEntry 的數據結構定義,用一個數組存儲所有的對象實例,序列化的 Block 則以 SerializedMemoryEntry 的數據結構定義,用字節緩沖區(ByteBuffer)來存儲二進制數據。每個 Executor 的 Storage 模塊用一個鏈式 Map 結構(LinkedHashMap)來管理堆內和堆外存儲內存中所有的 Block 對象的實例,對這個 LinkedHashMap 新增和刪除間接記錄了內存的申請和釋放。
因為不能保證存儲空間可以一次容納 Iterator 中的所有數據,當前的計算任務在 Unroll 時要向 MemoryManager 申請足夠的 Unroll 空間來臨時占位,空間不足則 Unroll 失敗,空間足夠時可以繼續進行。對於序列化的 Partition,其所需的 Unroll 空間可以直接累加計算,一次申請。而非序列化的 Partition 則要在遍歷 Record 的過程中依次申請,即每讀取一條 Record,采樣估算其所需的 Unroll 空間並進行申請,空間不足時可以中斷,釋放已占用的 Unroll 空間。如果最終 Unroll 成功,當前 Partition 所占用的 Unroll 空間被轉換為正常的緩存 RDD 的存儲空間,如下圖所示。
Spark Unroll 示意圖

在靜態內存管理時,Spark 在存儲內存中專門划分了一塊 Unroll 空間,其大小是固定的,統一內存管理時則沒有對 Unroll 空間進行特別區分,當存儲空間不足時會根據動態占用機制進行處理。
13.3.3 淘汰和落盤
由於同一個 Executor 的所有的計算任務共享有限的存儲內存空間,當有新的 Block 需要緩存但是剩余空間不足且無法動態占用時,就要對 LinkedHashMap 中的舊 Block 進行淘汰(Eviction),而被淘汰的 Block 如果其存儲級別中同時包含存儲到磁盤的要求,則要對其進行落盤(Drop),否則直接刪除該 Block。
存儲內存的淘汰規則為:
1)被淘汰的舊 Block 要與新 Block 的 MemoryMode 相同,即同屬於堆外或堆內內存。
2)新舊 Block 不能屬於同一個 RDD,避免循環淘汰。
3)舊 Block 所屬 RDD 不能處於被讀狀態,避免引發一致性問題。
4)遍歷 LinkedHashMap 中 Block,按照最近最少使用(LRU)的順序淘汰,直到滿足新 Block 所需的空間。其中 LRU 是 LinkedHashMap 的特性。
落盤的流程則比較簡單,如果其存儲級別符合_useDisk 為 true
的條件,再根據其_deserialized
判斷是否是非序列化的形式,若是則對其進行序列化,最后將數據存儲到磁盤,在 Storage 模塊中更新其信息。
13.4 執行內存管理
13.4.1 多任務間內存分配
Executor 內運行的任務同樣共享執行內存,Spark 用一個 HashMap 結構保存了任務到內存耗費的映射。每個任務可占用的執行內存大小的范圍為 1/2N ~ 1/N,其中 N 為當前 Executor 內正在運行的任務的個數。每個任務在啟動之時,要向 MemoryManager 請求申請最少為 1/2N 的執行內存,如果不能被滿足要求則該任務被阻塞,直到有其他任務釋放了足夠的執行內存,該任務才可以被喚醒。
13.4.2 Shuffle 的內存占用
執行內存主要用來存儲任務在執行 Shuffle 時占用的內存,Shuffle 是按照一定規則對 RDD 數據重新分區的過程,我們來看 Shuffle 的 Write 和 Read 兩階段對執行內存的使用:
Shuffle Write
1)若在 map 端選擇普通的排序方式,會采用 ExternalSorter 進行外排,在內存中存儲數據時主要占用堆內執行空間。
2)若在 map 端選擇 Tungsten 的排序方式,則采用 ShuffleExternalSorter 直接對以序列化形式存儲的數據排序,在內存中存儲數據時可以占用堆外或堆內執行空間,取決於用戶是否開啟了堆外內存以及堆外執行內存是否足夠。
Shuffle Read
1)在對 reduce 端的數據進行聚合時,要將數據交給 Aggregator 處理,在內存中存儲數據時占用堆內執行空間。
2)如果需要進行最終結果排序,則要將再次將數據交給 ExternalSorter 處理,占用堆內執行空間。在 ExternalSorter 和 Aggregator 中,Spark 會使用一種叫 AppendOnlyMap 的哈希表在堆內執行內存中存儲數據,但在 Shuffle 過程中所有數據並不能都保存到該哈希表中,當這個哈希表占用的內存會進行周期性地采樣估算,當其大到一定程度,無法再從 MemoryManager 申請到新的執行內存時,Spark 就會將其全部內容存儲到磁盤文件中,這個過程被稱為溢存(Spill),溢存到磁盤的文件最后會被歸並(Merge)。
Shuffle Write 階段中用到的 Tungsten 是 Databricks 公司提出的對 Spark 優化內存和 CPU 使用的計划,解決了一些 JVM 在性能上的限制和弊端。Spark 會根據 Shuffle 的情況來自動選擇是否采用 Tungsten 排序。Tungsten 采用的頁式內存管理機制建立在 MemoryManager 之上,即 Tungsten 對執行內存的使用進行了一步的抽象,這樣在 Shuffle 過程中無需關心數據具體存儲在堆內還是堆外。每個內存頁用一個 MemoryBlock 來定義,並用 Object obj 和 long offset 這兩個變量統一標識一個內存頁在系統內存中的地址。堆內的 MemoryBlock 是以 long 型數組的形式分配的內存,其 obj 的值為是這個數組的對象引用,offset 是 long 型數組的在 JVM 中的初始偏移地址,兩者配合使用可以定位這個數組在堆內的絕對地址;堆外的 MemoryBlock 是直接申請到的內存塊,其 obj 為 null,offset 是這個內存塊在系統內存中的 64 位絕對地址。Spark 用 MemoryBlock 巧妙地將堆內和堆外內存頁統一抽象封裝,並用頁表(pageTable)管理每個 Task 申請到的內存頁。Tungsten 頁式管理下的所有內存用 64 位的邏輯地址表示,由頁號和頁內偏移量組成:
頁號:占 13 位,唯一標識一個內存頁,Spark 在申請內存頁之前要先申請空閑頁號。
頁內偏移量:占 51 位,是在使用內存頁存儲數據時,數據在頁內的偏移地址。
有了統一的尋址方式,Spark 可以用 64 位邏輯地址的指針定位到堆內或堆外的內存,整個 Shuffle Write 排序的過程只需要對指針進行排序,並且無需反序列化,整個過程非常高效,對於內存訪問效率和 CPU 使用效率帶來了明顯的提升。Spark 的存儲內存和執行內存有着截然不同的管理方式:對於存儲內存來說,Spark 用一個 LinkedHashMap 來集中管理所有的 Block,Block 由需要緩存的 RDD 的 Partition 轉化而成;而對於執行內存,Spark 用 AppendOnlyMap 來存儲 Shuffle 過程中的數據,在 Tungsten 排序中甚至抽象成為頁式內存管理,開辟了全新的 JVM 內存管理機制。
第14章 部署模式解析
14.1 部署模式概述
Spark 支持的主要的三種分布式部署方式分別是 standalone、spark on mesos 和 spark on YARN。standalone 模式,即獨立模式,自帶完整的服務,可單獨部署到一個集群中,無需依賴任何其他資源管理系統。它是 Spark 實現的資源調度框架,其主要的節點有 Client 節點、Master 節點和 Worker 節點。而 yarn 是統一的資源管理機制,在上面可以運行多套計算框架,如 map reduce、storm 等根據 driver 在集群中的位置不同,分為 yarn client 和 yarn cluster。而 mesos 是一個更強大的分布式資源管理框架,它允許多種不同的框架部署在其上,包括 yarn。基本上,Spark 的運行模式取決於傳遞給 SparkContext 的 MASTER 環境變量的值,個別模式還需要輔助的程序接口來配合使用,目前支持的 Master 字符串及 URL 包括:
![]()
用戶在提交任務給 Spark 處理時,以下兩個參數共同決定了 Spark 的運行方式:
• --master MASTER_URL :決定了 Spark 任務提交給哪種集群處理。
• --deploy-mode DEPLOY_MODE :決定了 Driver 的運行方式,可選值為 Client 或者 Cluster。
14.2 standalone 框架
standalone 集群由三個不同級別的節點組成,分別是:
1)Master 主控節點,可以類比為董事長或總舵主,在整個集群之中,最多只有一個 Master 處在 Active 狀態。
2)Worker 工作節點,這個是 manager,是分舵主, 在整個集群中,可以有多個 Worker,如果 Worker 為零,什么事也做不了。
3)Executor 干苦力活的,直接受 Worker 掌控,一個 Worker 可以啟動多個 executor,啟動的個數受限於機器中的 cpu 核數。
這三種不同類型的節點各自運行於自己的JVM進程之中。Standalone 模式下,集群啟動時包括 Master 與 Worker,其中 Master 負責接收客戶端提交的作業,管理 Worker。根據作業提交的方式不同,分為 driver on client 和 drvier on worker。如下圖所示,上圖為 driver on client 模式,下圖為 driver on work 模式。兩種模式的主要不同點在於 driver 所在的位置。
在 standalone 部署模式下又分為 client 模式和 cluster 模式。
在client 模式下,driver 和 client 運行於同一 JVM 中,不由 worker 啟動,該 JVM 進程直到 spark application 計算完成返回結果后才退出。如下圖所示:
![]()
而在 cluster 模式下,driver 由 worker 啟動,client 在確認 spark application 成功提交給 cluster 后直接退出,並不等待 spark application 運行結果返回。如下圖所示:
![]()
從部署圖來進行分析,每個 JVM 進程在啟動時的文件依賴如何得到滿足。
1)Master 進程最為簡單,除了 spark jar 包之外,不存在第三方庫依賴。
2)Driver 和 Executor 在運行的時候都有可能存在第三方包依賴,分開來講。
3)Driver 比較簡單,spark-submit 在提交的時候會指定所要依賴的 jar 文件從哪里讀取。
4)Executor 由 Worker 來啟動,Worker 需要下載 Executor 啟動時所需要的 jar 文件,那么從哪里下載呢?![]()
Spark Standalone 模式,即獨立模式,自帶完整的服務,可單獨部署到一個集群中,無需依賴其他資源管理系統。在該模式下,用戶可以通過手動啟動 Master 和 Worker 來啟動一個獨立的集群。其中,Master 充當了資源管理的角色,Workder 充當了計算節點的角色。在該模式下,Spark Driver 程序在客戶端(Client)運行,而 Executor 則在 Worker 節點上運行。以下是一個運行在 Standalone 模式下,包含一個 Master 節點,兩個 Worker 節點的 Spark 任務調度交互部署架構圖。
![]()
從上面的 Spark 任務調度過程可以看到:
1)整個集群分為 Master 節點和 Worker 節點,其 Driver 程序運行在客戶端。Master 節點負責為任務分配 Worker 節點上的計算資源,兩者會通過相互通信來同步資源狀態,見途中紅色雙向箭頭。
2)客戶端啟動任務后會運行 Driver 程序,Driver 程序中會完成 SparkContext 對象的初始化,並向 Master 進行注冊。
3)每個 Workder 節點上會存在一個或者多個 ExecutorBackend 進程。每個進程包含一個 Executor 對象,該對象持有一個線程池,每個線程池可以執行一個任務(Task)。ExecutorBackend 進程還負責跟客戶端節點上的 Driver 程序進行通信,上報任務狀態。
14.2.1 Standalone 模式下任務運行過程
上面的過程反映了 Spark 在 standalone 模式下,整體上客戶端、Master 和 Workder 節點之間的交互。對於一個任務的具體運行過程需要更細致的分解,分解運行過程見圖中的小字。
1) 用戶通過 bin/spark-submit 部署工具或者 bin/spark-class 啟動應用程序的 Driver 進程,Driver 進程會初始化 SparkContext 對象,並向 Master 節點進行注冊。
• 1、Master 節點接受 Driver 程序的注冊,檢查它所管理的 Worker 節點,為該 Driver 程序分配需要的計算資源 Executor。Worker 節點完成 Executor 的分配后,向 Master 報告 Executor 的狀態。
• 2、Worker 節點上的 ExecutorBackend 進程啟動后,向 Driver 進程注冊。
2) Driver 進程內部通過 DAG Schaduler、Stage Schaduler、Task Schaduler 等過程完成任務的划分后,向 Worker 節點上的 ExecutorBackend 分配 TASK。
• 1、ExecutorBackend 進行 TASK 計算,並向 Driver 報告 TASK 狀態,直至結束。
• 2、Driver 進程在所有 TASK 都處理完成后,向 Master 注銷。
14.2.2 總結
Spark 能夠以 standalone 模式運行,這是 Spark 自身提供的運行模式,用戶可以通過手動啟動 master 和 worker 進程來啟動一個獨立的集群,也可以在一台機器上運行這些守護進程進行測試。standalone 模式可以用在生產環境,它有效的降低了用戶學習、測試 Spark 框架的成本。
standalone 模式目前只支持跨應用程序的簡單 FIFO 調度。然而,為了允許多個並發用戶,你可以控制每個應用使用的資源的最大數。默認情況下,它會請求使用集群的全部 CUP 內核。
缺省情況下,standalone 任務調度允許 worker 的失敗(在這種情況下它可以將失敗的任務轉移給其他的 worker)。但是,調度器使用 master 來做調度,這會產生一個單點問題
:如果 master 崩潰,新的應用不會被創建。為了解決這個問題,可以通過 zookeeper 的選舉機制在集群中啟動多個 master,也可以使用本地文件實現單節點恢復。
14.3 yarn 集群模式
Apache yarn 是 apache Hadoop 開源項目的一部分。設計之初是為了解決 mapreduce 計算框架資源管理的問題。到 haodoop 2.0 使用 yarn 將 mapreduce 的分布式計算和資源管理區分開來。它的引入使得 Hadoop 分布式計算系統進入了平台化時代,即
各種計算框架可以運行在一個集群中
,由資源管理系統 YRAN 進行統一的管理和調度,從而共享整個集群資源、提高資源利用率。
YARN 總體上也 Master/Slave 架構--ResourceManager/NodeManager。前者(RM)負責對各個 NodeManager(NM) 上的資源進行統一管理和調度。而 Container 是資源分配和調度的基本單位,其中封裝了機器資源,如內存、CPU、磁盤和網絡等,每個任務會被分配一個 Container,該任務只能在該 Container 中執行,並使用該 Container 封裝的資源。NodeManager 的作用則是負責接收並啟動應用的 Container、而向 RM 回報本節點上的應用 Container 運行狀態和資源使用情況。ApplicationMaster 與具體的 Application 相關,主要負責同 ResourceManager 協商以獲取合適的 Container,並跟蹤這些 Container 的狀態和監控其進度。如下圖所示為 yarn 集群的一般模型。
簡單架構圖如下:
![]()
詳細架構圖如下:
![]()
Spark 在 yarn 集群上的部署方式分為兩種,yarn cluster(driver 運行在 master 上)和 yarn client(driver 運行在 client 上)。
driver on master 如下圖所示:
![]()
• (1) Spark Yarn Client 向 YARN 中提交應用程序,包括 Application Master 程序、啟動 Application Master 的命令、需要在 Executor 中運行的程序等。
• (2) Resource manager 收到請求后,在其中一個 Node Manager 中為應用程序分配一個 Container,要求它在 Container 中啟動應用程序的 Application Master,Application Master 初始化 sparkContext 以及創建 DAG Scheduler 和 Task Scheduler。
• (3) Application Master 根據 SparkContext 中的配置,向 Resource Manager 申請 Container,同時,Application Master 向 Resource Manager 注冊,這樣用戶可通過 Resource Manager 查看應用程序的運行狀態。
• (4) Resource Manager 在集群中尋找符合條件的 Node Manager,在 Node Manager 啟動 Container,要求 Container 啟動 Executor。
• (5) Executor 啟動后向 Application Master 注冊,並接收 Application Master 分配的 Task。
• (6) 應用程序運行完成后,Application Master 向 Resource Manager 申請注銷並關閉自己。
driver on client 如下圖所示:
![]()
• (1) Spark Yarn Client 向 YARN 的 Resource Manager 申請啟動 Application Master。同時在 SparkContent 初始化中將創建 DAG Scheduler 和 Task Scheduler 等。
• (2) ResourceManager 收到請求后,在集群中選擇一個 NodeManager,為該應用程序分配第一個 Container,要求它在這個 Container 中啟動應用程序的 ApplicationMaster,與 YARN-Cluster 區別的是在該 ApplicationMaster 不運行 SparkContext,只與 SparkContext 進行聯系進行資源的分派。
• (3) Client 中的 SparkContext 初始化完畢后,與 Application Master 建立通訊,向 Resource Manager 注冊,根據任務信息向 Resource Manager 申請資源 (Container)。
• (4) 當 Application Master 申請到資源后,便與 Node Manager 通信,要求它啟動 Container。
• (5) Container 啟動后向 Driver 中的 SparkContext 注冊,並申請 Task。
• (6) 應用程序運行完成后,Client 的 SparkContext 向 ResourceManage r申請注銷並關閉自己。
Yarn-client 和Yarn cluster 模式對比可以看出,在 Yarn-client(Driver on client)中,Application Master 僅僅從 Yarn 中申請資源給 Executor,之后 client 會跟 container 通信進行作業的調度。如果 client 離集群距離較遠,建議不要采用此方式,不過此方式有利於交互式的作業。
![]()
Spark 能夠以集群的形式運行,可用的集群管理系統有 Yarn、Mesos 等。集群管理器的核心功能是資源管理和任務調度。以 Yarn 為例,Yarn 以 Master/Slave 模式工作,在 Master 節點運行的是 Resource Manager(RM),負責管理整個集群的資源和資源分配。在 Slave 節點運行的 Node Manager(NM),是集群中實際擁有資源的工作節點。我們提交 Job 以后,會將組成 Job 的多個 Task 調度到對應的 Node Manager 上進行執行。另外,在 Node Manager 上將資源以 Container 的形式進行抽象,Container 包括兩種資源 內存 和 CPU。
以下是一個運行在 Yarn 集群上,包含一個 Resource Manager 節點,三個 Node Manager 節點(其中,兩個是 Worker 節點,一個 Master 節點)的 Spark 任務調度交換部署架構圖。
![]()
從上面的Spark任務調度過程圖可以看到:
1)整個集群分為 Master 節點和 Worker 節點,它們都存在於 Node Manager 節點上,在客戶端提交任務時由 Resource Manager 統一分配,運行 Driver 程序的節點被稱為 Master 節點,執行具體任務的節點被稱為 Worder 節點。Node Manager 節點上資源的變化都需要及時更新給 Resource Manager,見圖中紅色雙向箭頭。
2)Master 節點上常駐 Master 守護進程 -- Driver 程序,Driver 程序中會創建 SparkContext對 象,並負責跟各個 Worker 節點上的 ExecutorBackend 進程進行通信,管理 Worker 節點上的任務,同步任務進度。實際上,在 Yarn 中 Node Manager 之間的關系是平等的,因此 Driver 程序會被調度到任何一個 Node Manager 節點。
3)每個 Worker 節點上會存在一個或者多個 ExecutorBackend 進程。每個進程包含一個 Executor 對象,該對象持有一個線程池,每個線程池可以執行一個任務(Task)。ExecutorBackend 進程還負責跟 Master 節點上的 Driver 程序進行通信,上報任務狀態。
集群下任務運行過程
上面的過程反映出了 Spark 在集群模式下,整體上 Resource Manager 和 Node Manager 節點間的交互,Master 和 Worker 之間的交互。對於一個任務的具體運行過程需要更細致的分解,分解運行過程見圖中的小字。
• 1) 用戶通過 bin/spark-submit 部署工具或者 bin/spark-class 向 Yarn 集群提交應用程序。
• 2) Yarn 集群的 Resource Manager 為提交的應用程序選擇一個 Node Manager 節點並分配第一個 Container,並在該節點的 Container 上啟動 SparkContext 對象。
• 3) SparkContext 對象向 Yarn 集群的 Resource Manager 申請資源以運行 Executor。
• 4) Yarn 集群的 Resource Manager 分配 Container 給 SparkContext 對象,SparkContext 和相關的 Node Manager 通訊,在獲得的 Container 上啟動 ExecutorBackend 守護進程,ExecutorBackend 啟動后開始向 SparkContext 注冊並申請 Task。
• 5) SparkContext 分配 Task 給 ExecutorBackend 執行。
• 6) ExecutorBackend 開始執行 Task,並及時向 SparkContext 匯報運行狀況。Task 運行完畢,SparkContext 歸還資源給 Node Manager,並注銷退。
14.4 mesos 集群模式
Mesos 是 apache 下的開源分布式資源管理框架。起源於
加州大學伯克利分校
,后被 Twitter 推廣使用。Mesos 上可以部署多種分布式框架,Mesos 的架構圖如下圖所示,其中 Framework 是指外部的計算框架,如 Hadoop、Mesos 等,這些計算框架可通過注冊的方式接入 Mesos,以便 Mesos 進行統一管理和資源分配。
![]()
在 Mesos 上運行的 Framework 由兩部分組成:一個是 scheduler ,通過注冊到 Master 來獲取集群資源。另一個是在 Slave 節點上運行的 executor 進程,它可以執行 Framework 的 task 。 Master 決定為每個 Framework 提供多少資源,Framework 的 scheduler 來選擇其中提供的資源。當 Framework 同意了提供的資源,它通過 Master 將 task 發送到提供資源的 Slaves 上運行。Mesos c的資源分配圖如下圖所示:
![]()
(1) Slave1 向 Master 報告,有 4 個 CPU 和 4 GB 內存可用。
(2) Master 發送一個 Resource Offer 給 Framework1 來描述 Slave1 有多少可用資源。
(3) FrameWork1 中的 FW Scheduler 會答復 Master,我有兩個 Task 需要運行在 Slave1,一個 Task 需要<2個CPU,1 GB內存="">
,另外一個 Task 需要<1個CPU,2 GB內存="">
。
(4) 最后,Master 發送這些 Tasks 給 Slave1。然后,Slave1 還有 1 個 CPU 和 1GB 內存沒有使用,所以分配模塊可以把這些資源提供給 Framework2。
Spark 可作為其中一個分布式框架部署在 mesos 上,部署圖與 mesos 的一般框架部署圖類似,如下圖所示,這里不再重述。
![]()
14.5 spark 三種部署模式的區別
在這三種部署模式中,standalone 作為 spark 自帶的分布式部署模式,是最簡單也是最基本的 spark 應用程序部署模式,這里就不再贅述。這里就講一下 yarn 和 mesos 的區別:
(1) 就兩種框架本身而言,mesos上可部署 yarn 框架。而 yarn 是更通用的一種部署框架,而且技術較成熟。
(2) mesos 雙層調度機制,能支持多種調度模式,而 yarn 通過 Resource Mananger 管理集群資源,只能使用一種調度模式。Mesos 的雙層調度機制為:mesos 可接入如 yarn 一般的分布式部署框架,但 Mesos 要求可接入的框架必須有一個調度器模塊,該調度器負責框架內部的任務調度。當一個 Framework 想要接入 mesos 時,需要修改自己的調度器,以便向 mesos 注冊,並獲取 mesos 分配給自己的資源,這樣再由自己的調度器將這些資源分配給框架中的任務,也就是說,整個 mesos 系統采用了雙層調度框架:第一層,由 mesos 將資源分配給框架;第二層,框架自己的調度器將資源分配給自己內部的任務。
(3) mesos 可實現粗、細粒度資源調度,可動態分配資源,而 yarn 只能實現靜態資源分配。其中粗粒度和細粒度調度定義如下:
粗粒度模式(Coarse-grained Mode):程序運行之前就要把所需要的各種資源(每個 executor 占用多少資源,內部可運行多少個 executor)申請好,運行過程中不能改變。
細粒度模式(Fine-grained Mode):為了防止資源浪費,對資源進行按需分配。與粗粒度模式一樣,應用程序啟動時,先會啟動 executor,但每個 executor 占用資源僅僅是自己運行所需的資源,不需要考慮將來要運行的任務,之后,mesos 會為每個 executor 動態分配資源,每分配一些,便可以運行一個新任務,單個 Task 運行完之后可以馬上釋放對應的資源。每個 Task 會匯報狀態給 Mesos Slave 和 Mesos Master,便於更加細粒度管理和容錯,這種調度模式類似於 MapReduce 調度模式,每個 Task 完全獨立,優點是便於資源控制和隔離,但缺點也很明顯,短作業運行延遲大。
從 yarn 和 mesos 的區別可看出,它們各自有優缺點。因此實際使用中,選擇哪種框架,要根據本公司的實際需要而定,可考慮現有的大數據生態環境。如我司采用 yarn 部署 spark,原因是,我司早已有較成熟的 hadoop 的框架,考慮到使用的方便性,采用了 yarn 模式的部署。
14.6 異常場景分析
上面說明的是正常情況下,各節點的消息分發細節。那么如果在運行中,集群中的某些節點出現了問題,整個集群是否還能夠正常處理 Application 中的任務呢?
14.6.1 異常分析1:Worker 異常退出

在 Spark 運行過程中,經常碰到的問題就是 Worker 異常退出,當 Worker 退出時,整個集群會有哪些故事發生呢?請看下面的具體描述:
1)Worker 異常退出,比如說有意識的通過 kill 指令將 Worker 殺死。
2)Worker 在退出之前,會將自己所管控的所有小弟 Executor 全干掉。
3)Worker 需要定期向 Master 改善心跳消息的,現在 Worker 進程都已經玩完了,哪有心跳消息,所以 Master 會在超時處理中意識到有一個 “分舵” 離開了。
4)Master 非常傷心,傷心的 Master 將情況匯報給了相應的 Driver。
Driver 通過兩方面確認分配給自己的 Executor 不幸離開了,一是 Master 發送過來的通知,二是 Driver 沒有在規定時間內收到 Executor 的 StatusUpdate,於是 Driver 會將注冊的 Executor 移除。
后果分析
Worker 異常退出會帶來哪些影響:
1)Executor 退出導致提交的 Task 無法正常結束,會被再一次提交運行。
2)如果所有的 Worker 都異常退出,則整個集群不可用。
3)需要有相應的程序來重啟 Worker 進程,比如使用 supervisord 或 runit。
測試步驟
1)啟動 Master。
2)啟動 Worker。
3)啟動 spark-shell。
4)手工 kill 掉 Worker 進程。
5)用 jps 或 ps -ef | grep -i java 來查看啟動着的 java 進程。
異常退出的代碼處理
定義 ExecutorRunner.scala 的 start 函數
def start() {
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
}
workerThread.start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = new Thread() {
override def run() {
killProcess(Some("Worker shutting down"))
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
}
killProcess 的過程就是停止相應 CoarseGrainedExecutorBackend 的過程。
Worker 停止的時候,一定要先將自己啟動的 Executor 停止掉。這是不是很像水滸中宋江的手段,李逵就是這樣不明不白的把命給丟了。
小結
需要特別指出的是,當 Worker 在啟動 Executor 的時候,是通過 ExecutorRunner 來完成的,ExecutorRunner 是一個獨立的線程,和 Executor 是一對一的關系,這很重要。Executor 作為一個獨立的進程在運行,但會受到 ExecutorRunner 的嚴密監控。
14.6.2 異常分析2:Executor 異常退出

后果分析
Executor 作為 Standalone 集群部署方式下的最底層員工,一旦異常退出,其后果會是什么呢?
1)Executor 異常退出,ExecutorRunner 注意到異常,將情況通過 ExecutorStateChanged 匯報給 Master。
2)Master 收到通知之后,非常不高興,盡然有小弟要跑路,那還了得,要求 Executor 所屬的 Worker 再次啟動。
3)Worker 收到 LaunchExecutor 指令,再次啟動 Executor。
測試步驟
1)啟動 Master
2)啟動 Worker
3)啟動 spark-shell
4)手工 kill 掉 CoarseGrainedExecutorBackend
fetchAndRunExecutor
fetchAndRunExecutor 負責啟動具體的 Executor,並監控其運行狀態,具體代碼邏輯如下所示
def fetchAndRunExecutor() {
try {
// Create the executor's working directory
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
// Launch the process
val command = getCommandSeq
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
val builder = new ProcessBuilder(command: _*).directory(executorDir)
val env = builder.environment()
for ((key, value) {
logInfo("Runner thread for executor " + fullId + " interrupted")
state = ExecutorState.KILLED
killProcess(None)
}
case e: Exception => {
logError("Error running executor", e)
state = ExecutorState.FAILED
killProcess(Some(e.toString))
}
}
}
14.6.3 異常分析3:Master 異常退出

Worker 和 Executor 異常退出的場景都講到了,我們剩下最后一種情況了,Master 掛掉了怎么辦?
后果分析
帶頭大哥如果不在了,會是什么后果呢?
1)Worker 沒有匯報的對象了,也就是如果 Executor 再次跑飛,Worker 是不會將 Executor 啟動起來的,大哥沒給指令。
2)無法向集群提交新的任務。
3)老的任務即便結束了,占用的資源也無法清除,因為資源清除的指令是 Master 發出的。
第15章 wordcount 程序運行原理窺探
15.1 spark 之 scala 實現 wordcount
在 spark 中使用 scala 來實現 wordcount(統計單詞出現次數模型)更加簡單,相對 java 代碼上更加簡潔,其函數式編程的思維邏輯也更加直觀。
package com.spark.firstApp
import org.apache.spark.{SparkContext, SparkConf}
/**
* scala 實現 wordcount
*/
object WordCount1 {
def main(args: Array[String]) {
if (args.length == 0) {
System.err.println("Usage: WordCount1 <file1>")
System.exit(1)
}
/**
* 1、實例化 SparkConf
* 2、構建 SparkContext,SparkContext 是 spark 應用程序的唯一入口
* 3. 通過 SparkContext 的 textFile 方法讀取文本文件
*/
val conf = new SparkConf().setAppName("WordCount1").setMaster("local")
val sc = new SparkContext(conf)
/**
* 4、通過 flatMap 對文本中每一行的單詞進行拆分(分割符號為空格),並利用 map 進行函數轉換形成 (K,V) 對形式,再進行 reduceByKey,打印輸出 10 個結果
* 函數式編程更加直觀的反映思維邏輯
*/
sc.textFile(args(0)).flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).take(10).foreach(println)
sc.stop()
}
}
15.2 原理窺探
在 spark 集群中運行 wordcount 程序其主要業務邏輯比較簡單,涵蓋一下 3 個過程:
1)讀取存儲介質上的文本文件(一般存儲在 hdfs 上);
2)對文本文件內容進行解析,按照單詞進行分組統計匯總;
3)將過程 2 的分組結果保存到存儲介質上。(一般存儲在 hdfs 或者 RMDB 上)
雖然 wordcount 的業務邏輯非常簡單,但其應用程序在 spark 中的運行過程卻巧妙得體現了 spark 的核心精髓--分布式彈性數據集
、內存迭代
以及函數式編程
等特點。下圖對 spark 集群中 wordcount 的運行過程進行剖析,加深對 spark 技術原理窺探。

該圖橫向分割下面給出了 wordcount 的 scala 核心程序實現,該程序在 spark 集群的運行過程涉及幾個核心的 RDD,主要有 textFileRDD、flatMapRDD、mapToPairRDD、shuffleRDD(reduceByKey)等。
應用程序通過 textFile 方法讀取 hdfs 上的文本文件,數據分片的形式以 RDD 為統一模式將數據加載到不同的物理節點上,如上圖所示的節點 1、節點 2 到節點 n;並通過一系列的數據轉換,如利用 flatMap 將文本文件中對應每行數據進行拆分(文本文件中單詞以空格為分割符號),形成一個以每個單詞為核心新的數據集合 RDD;之后通過 MapRDD 繼續轉換形成形成 (K,V) 對數據形式,以便進一步使用 reduceByKey 方法,該方法會觸發 shuffle 行為,促使不同的單詞到對應的節點上進行匯聚統計(實際上在跨節點進行數據 shuffle 之前會在本地先對相同單詞進行合並累加),形成 wordcount 的統計結果;最終通過 saveAsTextFile 方法將數據保存到 hdfs 上。具體的運行邏輯原理以及過程上圖給出了詳細的示意說明。