spark 任務運行原理


調優概述

在開發完Spark作業之后,就該為作業配置合適的資源了。Spark的資源參數,基本都可以在spark-submit命令中作為參數設置。很多Spark初學者,通常不知道該設置哪些必要的參數,以及如何設置這些參數,最后就只能胡亂設置,甚至壓根兒不設置。資源參數設置的不合理,可能會導致沒有充分利用集群資源,作業運行會極其緩慢;或者設置的資源過大,隊列沒有足夠的資源來提供,進而導致各種異常。總之,無論是哪種情況,都會導致Spark作業的運行效率低下,甚至根本無法運行。因此我們必須對Spark作業的資源使用原理有一個清晰的認識,並知道在Spark作業運行過程中,有哪些資源參數是可以設置的,以及如何設置合適的參數值。

Spark作業基本運行原理

Spark基本運行原理

詳細原理見上圖。我們使用spark-submit提交一個Spark作業之后,這個作業就會啟動一個對應的Driver進程。根據你使用的部署模式(deploy-mode)不同,Driver進程可能在本地啟動,也可能在集群中某個工作節點上啟動。Driver進程本身會根據我們設置的參數,占有一定數量的內存和CPU core。而Driver進程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的資源管理集群,美團•大眾點評使用的是YARN作為資源管理集群)申請運行Spark作業需要使用的資源,這里的資源指的就是Executor進程。YARN集群管理器會根據我們為Spark作業設置的資源參數,在各個工作節點上,啟動一定數量的Executor進程,每個Executor進程都占有一定數量的內存和CPU core。

在申請到了作業執行所需的資源之后,Driver進程就會開始調度和執行我們編寫的作業代碼了。Driver進程會將我們編寫的Spark作業代碼分拆為多個stage,每個stage執行一部分代碼片段,並為每個stage創建一批task,然后將這些task分配到各個Executor進程中執行。task是最小的計算單元,負責執行一模一樣的計算邏輯(也就是我們自己編寫的某個代碼片段),只是每個task處理的數據不同而已。一個stage的所有task都執行完畢之后,會在各個節點本地的磁盤文件中寫入計算中間結果,然后Driver就會調度運行下一個stage。下一個stage的task的輸入數據就是上一個stage輸出的中間結果。如此循環往復,直到將我們自己編寫的代碼邏輯全部執行完,並且計算完所有的數據,得到我們想要的結果為止。

Spark是根據shuffle類算子來進行stage的划分。如果我們的代碼中執行了某個shuffle類算子(比如reduceByKey、join等),那么就會在該算子處,划分出一個stage界限來。可以大致理解為,shuffle算子執行之前的代碼會被划分為一個stage,shuffle算子執行以及之后的代碼會被划分為下一個stage。因此一個stage剛開始執行的時候,它的每個task可能都會從上一個stage的task所在的節點,去通過網絡傳輸拉取需要自己處理的所有key,然后對拉取到的所有相同的key使用我們自己編寫的算子函數執行聚合操作(比如reduceByKey()算子接收的函數)。這個過程就是shuffle。

當我們在代碼中執行了cache/persist等持久化操作時,根據我們選擇的持久化級別的不同,每個task計算出來的數據也會保存到Executor進程的內存或者所在節點的磁盤文件中。

因此Executor的內存主要分為三塊:第一塊是讓task執行我們自己編寫的代碼時使用,默認是占Executor總內存的20%;第二塊是讓task通過shuffle過程拉取了上一個stage的task的輸出后,進行聚合等操作時使用,默認也是占Executor總內存的20%;第三塊是讓RDD持久化時使用,默認占Executor總內存的60%。

task的執行速度是跟每個Executor進程的CPU core數量有直接關系的。一個CPU core同一時間只能執行一個線程。而每個Executor進程上分配到的多個task,都是以每個task一條線程的方式,多線程並發運行的。如果CPU core數量比較充足,而且分配到的task數量比較合理,那么通常來說,可以比較快速和高效地執行完這些task線程。

以上就是Spark作業的基本運行原理的說明,大家可以結合上圖來理解。理解作業基本原理,是我們進行資源參數調優的基本前提。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM