Spark源碼系列(十一)spark源碼解析總結


========== Spark 通信架構 ==========


1、spark 一開始使用 akka 作為網絡通信框架,spark 2.X 版本以后完全拋棄 akka,而使用 netty 作為新的網絡通信框架。
最主要原因:spark 對 akka 沒有維護,需要 akka 更新,spark 的發展受到了 akka 的牽制,akka 版本之間無法通信,即 akka 兼容性問題。
2、RpcEnv:RPC 上下文環境,每個 Rpc 端點運行時依賴的上下文環境稱之為 RpcEnv。類似於 SparkContext,默認由 NettyRpcEnv 實現,由 NettyRpcEnvFactory 創建 RpcEnv。
3、RpcEndpoint:RPC 端點,Spark 針對於每個節點(Client/Master/Worker)都稱之一個 Rpc 端點且都實現 RpcEndpoint 接口,內部根據不同端點的需求,設計不同的消息和不同的業務處理,如果需要發送(詢問)則調用 Dispatcher。代理是 RpcEndpointRef。
4、Dispatcher:消息分發器,針對於 RPC 端點需要發送消息或者從遠程 RPC 接收到的消息,分發至對應的指令收件箱/發件箱。
5、Inbox:指令消息收件箱,一個本地端點對應一個收件箱,Dispatcher 在每次向 Inbox 存入消息時,都將對應 EndpointData 加入內部待 Receiver Queue 中。
6、OutBox:指令消息發件箱,一個遠程端點對應一個發件箱,當消息放入 Outbox 后,緊接着將消息通過 TransportClient 發送出去。
7、TransportClient:Netty 通信客戶端,主要負責將相對應的 OutBox 中的數據發送給遠程 TransportServer。
8、TransportServer:Netty 通信服務端,主要用於接收遠程 RpcEndpoint 發送過來的消息,並把消息傳送給 Dispatcher。

========== Spark 腳本解析 ==========
1、start-slave.sh 用於啟動 slave 節點,最終啟動的類是 org.apache.spark.deploy.worker.Worker 類。
2、start-master.sh 用於啟動 master 節點,最終啟動的類是 org.apache.spark.deploy.master.Master 類。
3、spark-submit 和 spark-shell 最終都會調用 spark-class 腳本,通過 spark-class 腳本啟動相對應的入口類。

========== Spark standalone 模式啟動流程 ==========


1、Master 和 Worker 都繼承了 RpcEndpoint 類,成為了具體的消息發送與接收端點,整個應用是利用 Actor 模型實現的異步消息通信架構。
2、Master 節點在啟動的時候的主要任務是創建了通信架構中的 RpcEnv,並注冊了 Master 成為端點。
3、Worker 節點在啟動的時候的主要任務是創建了通信架構中的 RpcEnv,並注冊了 Worker 成為端點,並且獲取了 Master 端點的代理,通過端點代理向 Master 發送消息。
4、Worker 節點在啟動的時候執行 onStar 方法,向 Master 進行了注冊。

========== Spark 應用提交流程 ==========


1、Driver 提交流程:用戶通過 spark-submit 將 jar 包和相對應的參數提交給 spark 框架,內部實現是通過 ClientEndpoint 向 Master 發送了 RequestSubmitDriver 消息,Master 獲取消息之后通過 Worker 進行 LaunchDriver 操作。
2、Driver 的進程啟動:主要通過 Worker 節點的 DriverRunner 來啟動整個的 Driver 進程。
3、注冊 Application:Driver 進程在啟動之后,通過 SparkContext 的初始化操作,創建了對應的 StandaloneSchedulerBackend,實現了向 Master 進行當前應用的注冊。
4、啟動 Executor 進程:當 Driver 向 Master 進行注冊之后,Master 通過 scheduler() 方法來對當前的 App 進行 Executor 的分配,實現上是通過 Worker 的 ExecutorRunner 來進行 Executor 的創建和運行。
5、啟動 Task 運行:當 Driver 收到所有的 Executor 資源后,通過 RDD 的 action 操作,觸發 SparkContext.runJob 方法,進而調用 Dagscheduler() 方法進行當前 DAG 的運行。通過向 Executor 發送 LaunchTask 消息來啟動 Executor 上的任務運行。
6、Task 運行完成:當 Executor 運任務完成之后,會通知 Driver 當前任務的運行狀態,然后執行任務 或者退出整個應用。

========== Spark shuffle 過程 ==========
MapReduce Shuffle 過程
1、在 spill(刷寫)階段,數據直接寫入到 kvbuffer 數據緩沖器中。會寫兩種類型的數據。一種是 kvmeta 數據,用於存放分區信息、索引信息;另一種是 (k, v) 對類型的數據,是實際的數據。
2、會以一個起點反向來寫,即當遇到 spill 進程啟動的時候,寫入點會重新進行選擇。

Hash Shuffle 過程
1、未優化版本,每一個 task 任務都會根據 reduce 任務的個數創建對應數量的 bucket,bucket 其實就是寫入緩沖區,每一個 bucket 都會存入一個文件,這個文件叫做 blockfile。最大的缺點是:產生的文件過多。
2、在優化版本中,主要通過 consolidation 這個參數進行優化,實現了 ShuffleFileGroup 的概念,不同批次的 task 任務可以復用最終寫入的文件,來整體減少文件的數量。

Sort Shuffle 過程
1、Sort Shuffle 整個過程的實現和 MapReduce Shuffle 過程類似。
2、Bypass 機制:Hash Shuffle 在 reduce 的數量比價少的時候性能要比 Sort Shuffle 要高,所以如果你的 reduce 的數量少於 Bypass 定義的數值的時候,Sort Shuffle 在 task 任務寫出的時候會采用 Hash 方式,而不會采用 ApplyOnlyMap 以及排序的方法。

========== Spark 內存管理與分配 ==========
1、內存分配模式:靜態分配和統一分配。靜態分配就是固定大小分配,統一分配就是存儲區和 Shuffle 區可以動態占用。
2、有幾種內存配置模式:
  (1)other 區,一般占用 20% 的內存區域,主要是用於代碼運行以及相關數據的運行。
  (2)Execution 區,這個區域一般占用 20% 的內存區域,主要用於 Shuffle 過程的內存消耗,通過 spark.shuffle.memeoryFaction 參數進行控制。
  (3)Storage 區,這個區域主要用於 RDD 的緩存,通過 spark.storage.memeoryFaction 參數進行控制。
3、spark 目前支持堆內內存和堆外內存,堆外內存主要用於存儲序列化后的二進制數據。

========== Spark 部署模式 ==========


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM