本課主題
- 大數據性能調優的本質
- Spark 性能調優要點分析
- Spark 資源使用原理流程
- Spark 資源調優最佳實戰
- Spark 更高性能的算子
引言
我們談大數據性能調優,到底在談什么,它的本質是什么,以及 Spark 在性能調優部份的要點,這兩點讓在進入性能調優之前都是一個至關重要的問題,它的本質限制了我們調優到底要達到一個什么樣的目標或者說我們是從什么本源上進行調優。希望這篇文章能為讀者帶出以下的啟發:
- 了解大數據性能調優的本質
- 了解 Spark 性能調優要點分析
- 了解 Spark 在資源優化上的一些參數調優
- 了解 Spark 的一些比較高效的 RDD 操作算子
大數據性能調優的本質
編程的時候發現一個驚人的規律,軟件是不存在的!所有編程高手級別的人無論做什么類型的編程,最終思考的都是硬件方面的問題!最終思考都是在一秒、一毫秒、甚至一納秒到底是如何運行的,並且基於此進行算法實現和性能調優,最后都是回到了硬件!
在大數據性能的調優,它的本質是硬件的調優!即基於 CPU(計算)、Memory(存儲)、IO-Disk/ Network(數據交互) 基礎上構建算法和性能調優!我們在計算的時候,數據肯定是存儲在內存中的。磁盤IO怎么去處理和網絡IO怎么去優化。
Spark 性能調優要點分析
在大數據性能本質的思路上,我們應該需要在那些方面進行調優呢?比如:
- 並行度
- 壓縮
- 序例化
- 數據傾斜
- JVM調優 (例如 JVM 數據結構化優化)
- 內存調優
- Task性能調優 (例如包含 Mapper 和 Reducer 兩種類型的 Task)
- Shuffle 網絡調優 (例如小文件合並)
- RDD 算子調優 (例如 RDD 復用、自定義 RDD)
- 數據本地性
- 容錯調優
- 參數調優
大數據最怕的就是數據本地性(內存中)和數據傾斜或者叫數據分布不均衡、數據轉輸,這個是所有分布式系統的問題!數據傾斜其實是跟你的業務緊密相關的。所以調優 Spark 的重點一定是在數據本地性和數據傾斜入手。
- 資源分配和使用:你能夠申請多少資源以及如何最優化的使用計算資源
- 關發調優:如何基於 Spark 框架內核原理和運行機制最優化的實現代碼功能
- Shuffle調優:分布式系統必然面臨的殺手級別的問題
- 數據傾斜:分布式系統業務本身有數據傾斜
Spark 資源使用原理流程
這是一張來至於官方的經典資源使用流程圖,這里有三大組件,第一部份是 Driver 部份,第二就是具體處理數據的部份,第三就是資源管理部份。這一張圖中間有一個過程,這表示在程序運行之前向資源管理器申請資源。在實際生產環境中,Cluster Manager 一般都是 Yarn 的 ResourceManager,Driver 會向 ResourceManager 申請計算資源(一般情況下都是在發生計算之前一次性進行申請請求),分配的計算資源就是 CPU Core 和 Memory,我們具體的 Job 里的 Task 就是基於這些分配的內存和 Cores 構建的線程池來運行 Tasks 的。
[下圖是 Spark 官方網站上的經典Spark架框圖]

當然在 Task 運行的過程中會大量的消耗內存,而Task又分為 Mapper 和 Reducer 兩種不同類型的 Task,也就是 ShuffleMapTask 和 ResultTask 兩種類型,這類有一個很關建的調優點就是如何對內存進行使用。在一個 Task 運行的時候,默應會占用 Executor 總內存的 20%,Shuffle 拉取數據和進行聚合操作等占用了 20% 的內存,剩下的大概有 60% 是用於 RDD 持久化 (例如 cache 數據到內存),Task 在運行時候是跑在 Core 上的,比較理想的是有足夠的 Core 同時數據分布比較均勻,這個時候往往能夠充分利用集群的資源。
核心調優參數如下:
num-executors executor-memory executor-cores driver-memory spark.default.parallelizm spark.storage.memoryFraction spark.shuffle.memoryFraction
- num-executors:該參數一定會被設置,Yarn 會按照 Driver 的申請去最終為當前的 Application 生產指定個數的 Executors,實際生產環境下應該分配80個左右 Executors 會比較合適呢。
- executor-memory:這個定義了每個 Executor 的內存,它與 JVM OOM 緊密相關,很多時候甚至決定了 Spark 運行的性能。實際生產環境下建義是 8G 左右,很多時候 Spark 運行在 Yarn 上,內存占用量不要超過 Yarn 的內存資源的 50%。
- executor-cores:決定了在 Executors 中能夠並行執行的 Tasks 的個數。實際生產環境下應該分配4個左右,一般情況下不要超過 Yarn 隊列中 Cores 總數量的 50%。
- driver-memory:默應是 1G
- spark.default.parallelizm:並行度問題,如果不設置這個參數,Spark 會跟據 HDFS 中 Block 的個數去設置這一個數量,原理是默應每個 Block 會對應一個 Task,默應情況下,如果數據量不是太多就不可以充份利用 executor 設置的資源,就會浪費了資源。建義設置為 100個,最好 700個左右。Spark官方的建義是每一個 Core 負責 2-3 個 Task。
- spark.storage.memoryFraction:默應占用 60%,如果計算比較依賴於歷史數據則可以調高該參數,當如果計算比較依賴 Shuffle 的話則需要降低該比例。
- spark.shuffle.memoryFraction:默應占用 20%,如果計算比較依賴 Shuffle 的話則需要調高該比例。
Spark 更高性能的算子
Shuffle 分開兩部份,一個是 Mapper 端的Shuffle,另外一個就是 Reducer端的 Shuffle,性能調優有一個很重要的總結就是盡量不使用 Shuffle 類的算子,我們能避免就盡量避免,因為一般進行 Shuffle 的時候,它會把集群中多個節點上的同一個 Key 匯聚在同一個節點上,例如 reduceByKey。然后會優先把結果數據放在內存中,但如果內存不夠的話會放到磁盤上。Shuffle 在進行數據抓取之前,為了整個集群的穩定性,它的 Mapper 端會把數據寫到本地文件系統。這可能會導致大量磁盤文件的操作。如何避免Shuffle可以考慮以下:
- 采用 Map 端的 Join (RDD1 + RDD2 )先把一個 RDD1的數據收集過來,然后再通過 sc.broadcast( ) 把數據廣播到 Executor 上;
- 如果無法避免Shuffle,退而求其次就是需要更多的機器參與 Shuffle 的過程,這個時候就需要充份地利用 Mapper 端和 Reducer 端機制的計算資源,盡量使用 Mapper 端的 Aggregrate 功能,e.g. aggregrateByKey 操作。相對於 groupByKey而言,更傾向於使用 reduceByKey( ) 和 aggregrateByKey( ) 來取代 groupByKey,因為 groupByKey 不會進行 Mapper 端的操作,aggregrateByKey 可以給予更多的控制。
- 如果一批一批地處理數據來說,可以使用 mapPartitions( ),但這個算子有可能會出現 OOM 機會,它會進行 JVM 的 GC 操作!
- 如果進行批量插入數據到數據庫的話,建義采用foreachPartition( ) 。
- 因為我們不希望有太多的數據碎片,所以能批量處理就盡量批量處理,你可以調用 coalesce( ) ,把一個更多的並行度的分片變得更少,假設有一萬個數據分片,想把它變得一百個,就可以使用 coalesce( )方法,一般在 filter( ) 算子之后就會用 coalesce( ),這樣可以節省資源。
- 官方建義使用 repartitionAndSortWithPartitions( );
- 數據進行復用時一般都會進行持久化 persisit( );
- 建義使用 mapPartitionWithIndex( );
- 也建義使用 tree 開頭的算子,比如說 treeReduce( ) 和 treeAggregrate( );
總結
大數據必然要思考的核心性能問題不外乎 CPU 計算、內存管理、磁盤和網絡IO操作,這是無可避免的,但是可以基於這個基礎上進行優化,思考如何最優化的使用計算資源,思考如何在優化代碼,在代碼層面上防避墜入性能弱點;思考如何減少網絡傳輸和思考如何最大程度的實現數據分布均衡。
在資源管理調優方面可以設置一些參數,比如num-executors、executor-memory、executor-cores、driver-memory、spark.default.parallelizm、spark.storage.memoryFraction、spark.shuffle.memoryFraction
Shuffle 所導致的問題是所有分布式系統都無法避免的,但是如何把 Shuffle 所帶來的性能問題減少最低,是一個很可靠的優化方向。Shuffle 的第一階段即Mapper端在默應情況下會寫到本地,而reducer通過網絡抓取的同一個 Key 在不同節點上都把它抓取過來,內存可能不夠,不夠的話就寫到磁盤中,這可能會導致大量磁盤文件的操作。在實際編程的時候,可以用一些比較高效的RDD算子,例如 reduceByKey、aggregrateByKey、coalesce、foreachPartition、repartitionAndSortWithPartitions。
參考資料
資料來源來至
[1] DT大數據夢工廠 大數據商業案例以及性能調優
第20課:大數據性能調優的本質和Spark性能調優要點分析
第21課:Spark性能調優之系統資源使用原理和調優最佳實踐
第22課:Spark性能調優之使用更高性能算子及其源碼剖析
