Spark性能調優
1、開發Spark項目的經驗准則
(1)盡量少生成RDD;
(2)盡量少對RDD進行算子操作,如果有可能,盡量在一個算子里面實現多個功能;
(3)盡量少使用發生Shuffle操作的算子,如GroupByKey、ReduceByKey、SortByKey;
(4)無論什么功能,性能第一;
(5)對於Accumulator這種分布式累加計算的變量的使用,在從Accumulator中獲取數據插入數據庫時,一定要在action操作之后;
2、Spark項目開發流程:
數據調研 --> 需求分析 --> 技術方案設計 --> 數據庫設計 --> 編碼實現 --> 單元測試 --> 本地測試 --> 性能調優 --> Troubshoting --> 數據傾斜解決
3、常規性能調優:
3.1、分配更多資源
性能和速度的提升在一定范圍內和運算資源成正比
(1)分配哪些資源?
executor | cpu per executor | memory per executor | driver memory ......
(2)在哪分配資源?
- spark_submit
- class wordCount - num-exector n 配置executor的數量(50 - 100) - driver-memory 100m 配置driver的內存大小(影響不大| 1-5G) - executor-memory 100m 配置每個executor能使用的內存大小(5 - nG) - exceutor-cores n 配置每個exceutor的cpu core數量 /path/wordConut.jar
分配更多executor和cup個數可以增大task並行度,增大內存對cache、shuffle和task任務執行的GC有益;
通過sparkconf.set(“spark.cores.max”,n)可以限制每個作業能夠使用的cpu core總數量,負責作業默認使用全部的CPU;
(3)應該調節到多大?
盡量調節到最大資源(可以使用的資源|exceutor數量、executor內存),假如擁有20台機器(每台2個CPU、4G內存),則共有80G內存、40個CPU,如果使用StandAlone模式,則可以啟動20個executor,每個executor分配2個CPU,4G內存;如果共有500G內存和100個CPU,則可以嘗試啟動50個exceutor,平均每個executor2個CPU、10G內存(Yarn調度模式)。
3.2、調節並行度
並行度就是指Spark作業中,每個Stage的task數量,就是Spark作業在各個階段(Stage)的並行度(Spark作業中每個action觸發一個job,每個job內的shuffle操作會將job拆分成多個Stage)。
(1)合理的設置並行度,可以充分利用集群資源,且減少每個task處理的數據量;
(2)task數量至少設置成與Spark application的總CPU核數相同;
(3)官方推薦task數量設置為cou core數量的2-3倍,原因是某些task運行的比較快,剩余的task運行過程中,已經運行完的task的cpu core便會空閑,但不會被Spark application釋放,因此如果設置為cou core數量的2-3倍,可以讓空閑出來的cpu繼續執行任務,從而提升性能。
(4)如何設置Spark application的並行度:
sparkConf.set("spark.default.parallelism" , "500");
這種並行度設置,只會在沒有使用SparkSQL的那些Stage生效,如果想調節SparkSQL的並行度,則可以通過 spark.sql.shuffle.partions調節。
3.3、RDD重構以及RDD持久化
(1)默認情況下,多次對一個RDD算子執行算子去獲取不同的RDD,都會對這個RDD以及之前的父RDD全部重新計算一次;
(2)盡量復用RDD,功能差不多的RDD,可以抽取為一個公共的RDD;
(3)公共RDD一定要持久化,將RDD的數據緩存到內存/磁盤;
(4)將持久化數據進行序列化,避免OOM;
(5)考慮數據的高可靠性,如果內存充足,可以使用雙副本機制進行持久化.
3.4、廣播大變量
如果task使用大變量(如存儲大量數據的map集合),將會導致性能受損,可以嘗試將大變量廣播出去;
(1)默認的task執行算子過程中,每個task都會獲取一份外部變量的副本,可能會增大網絡傳輸開銷(副本發送)、增大磁盤IO開銷、序列化導致內存值不足 、頻繁GC導致Spark作業暫停運行(task創建對象,堆內存放不下);
(2)BlockManager 負責管理每個Executor對應的內存和磁盤上的數據,BlockManager也許會從Driver上獲取變量副本,也可能從距離較近的executor的BlockManager獲取;
(3)廣播變量在Driver上有一份初始副本,task在運行時如果需要使用變量副本,會首先在本地executor對應的BlockManager中獲取,若本地沒有,則從Driver或者其它executor拉取並保存到本地;
(4)使用廣播變量后,不會每一個task擁有一份變量副本,而是每一個executor一份副本;
(5)廣播變量默認使用的最大內存可以設置為 ExecutorMemeory*60%*90%*80%。
3.5、使用Kryo序列化
(1)Spark內部默認使用java序列化機制,好處在於處理簡單,但是效率不高,並且會占用更多空間、速度慢,Spark默認支持Kryo序列化,性能更好。
(2)當Spark需要通過網絡進行傳輸數據,或者將數據溢寫到磁盤,Spark會將數據序列化,Kryo序列化機制啟用后生效的幾個地方:
-- 算子函數中使用外部變量
-- 持久化RDD時進行序列化 -- Stage之間的數據Shuffle
(3)使用Kryo序列化機制,需要注冊自定義類
sparkConf.registerKryoClasses(new Class[]{***.class});
3.6、使用 FastUtil 優化數據格式
Fastutil是java標准化集合框架(Map,List,Set)的類庫擴展以及替代品,可以減小內存占用並提供更快的查詢速度,Spark使用FastUtil的場景:
(1)如果算子函數中使用了外部變量,第一步可以廣播變量,第二步可以使用Kryo序列化機制,第三步如果是較大的數據集合可以使用fastutil進行重寫;
(2)Task要執行的計算邏輯里,有較大的集合時可以使用fastUtil,在一定程度上可以減小內存占用,避免頻繁GC;
3.7、調節數據本地化等待時長
(1)本地化級別
Process_local
Node_local
No_pref
rack_local
Any
(2)參數調節
spark.locality.wait.process spark.locality.wait 調節數據本地化等待時長 sparkConf.set("spark.locality.wait","10") spark.locality.node spark.locality.wait.rack
4、JVM調優:
4.1、調節Cache操作的內存占比
(1)JVM內存不足會導致
①頻繁minorGC,導致Spark作業頻繁停止工作
②老年代囤積大量短生命周期對象,導致頻繁fullGC,Spark作業長時間停止工作 ③嚴重影響Spark作業的性能和運行速度
(2)Spark作業運行過程中,對內存被划分為兩塊,一塊用來給RDD的Cache、Persist操作進行RDD數據緩存,另外一塊用來存儲Spark作業算子函數創建的對象和運算;
(3)默認60%內存給Cache操作,如果在某些情況下Cache對內存的要求不是很大,而task算子函數中創建的對象過多導致頻繁GC(可以通過Spark UI查看Yarn界面,查看Spark作業的運行統計,從而找到每個Stage的運行情況,包括每個task的運行時間、gc時間等),可以通過降低Cache內存占比的方式,給task更多的運算空間,從而避免頻繁GC;
(4)sparkConf.set("spark.storage.memoryFraction","0.4");
4.2、調節executor堆外內存與連接等待時長
(1)有時候Spark作業處理時會報錯如:shuffle file cannot find 、 executor lost 、 task lost 、 OOM ,則有可能是因為executor的堆外內存不夠用導致內存溢出,也可能導致后續的Stage的task在運行時從別的executor拉取shffle map output文件,但因為executor已經掛掉了,關聯的BlockManager也沒有了,所以會出現shuffle file cannot find 、 executor lost 、 task lost等報錯;
(2)可以通過參數調節executor的堆外內存大小來解決上述問題
-- conf spark.yarn.executor.memeroryOverhed=2048 針對基於yarn的提交模式
在spark的啟動指令中添加參數,默認情況下堆外內存大小為三百多MB,可調節為1G\2G\4G…,可以避免某些JVM OOM問題,同時讓Spark作業有較大性能提升;
(3)調節連接等待時長
當某個executor的task創建的對象特別大,頻繁的讓JVM內存溢滿進行垃圾回收,作業將停止工作無法提供相應,當下游的executor嘗試建立遠程網絡連接拉取數據,可能會因為超過默認的60s而失敗,因此導致Spark作業崩潰,也可能導致DAGSecheduler反復提交幾次stage,taskScheduler反復提交task,大大延長了作業時長;
可以通過參數調節等待時長,從而避免文件拉取失敗: --conf spark.core.connection.ack.wait.timeout = 300 ;
5、Shuffle調優:
Spark的一些算子會觸發shuffle,比如GroupByKey、ReduceByKey、CountByKey、Join等;
GroupByKey會把分布在集群各節點上的數據中同一個key對應的values集中到同一個節點的一個executor的一個task中進行分析處理 --- >; ReduceByKey對values集合進行reduce操作,最終變為一個value; Join只需要兩個RDD的Key相同,就會分發到同一個節點的executor中的task中; ......
5.1、開啟consolidation機制(針對HashShuffle)(效果明顯)
(1)Shuffle中的寫磁盤操作,基本就是shuffle中性能消耗最大的地方,Shuffle前半部分的task在寫入磁盤文件之前,都會先寫入一個內存緩沖,再溢寫到磁盤文件,而且Shuffle的前半部分Stage的task,每個task都會創建下一個Stage的task數量的文件;
(2)可以開啟Hash Shuffle的文件合並機制,從而在map端就將文件進行合並,避免產生大量小文件,此機制只適用於HashShuffle,如果不需要SortShuffle的排序機制,除了SortShuffle的Bypass機制,也可以開啟HashShuffle並啟用文件合並機制;只有並行執行的task會創建下一批task個數的文件,下一批task個數相同的並行的task會復用已有的輸出文件,但多批task同時執行,是不能復用輸出文件的;
(3)sparkConf.set.("spark.shuffle.consolidateFiles","true");
合並輸出文件對Spark作業有哪些影響?
①map task 減少,磁盤IO減少;
②網絡傳輸性能消耗減少;
5.2、調節Spark Shuffle ShuffleMapTask階段內存緩沖大小和ShuffleReduceTask階段內存占比(效果不明顯)
(1)spark.shuffle.file.buffer 默認32kb map端內存緩沖可能會引起頻繁Spill和磁盤IO消耗 spark.shuffle.memoryFraction 默認0.2 reduce端聚合內存比例過小可能會導致頻繁磁盤文件讀寫;
(2)默認shuffle的map task,在將數據輸出到磁盤文件之前,會統一先寫入每個task關聯的內存緩沖區,默認大小為32kb,當緩沖區滿了才會進行spill操作,如果文件很大,將會導致多次磁盤寫操作,如果reduce端內存不夠用,也可能會導致頻繁的spill;
(3)查看Spark UI,如果每個task的shuffle write和shuffle read很大,則可以考慮進行相應調優;
spark.shuffle.file.buffer 每次增加一倍 32kb -> 64kb -> 128kb ... spark.shuffle.memoryFraction 每次增加0.1 0.2 -> 0.3 -> 0.4 ...
5.3、HashShuffleManager和SortShuffleManager
(1) “spark.shuffle.manager” --> “hash” | “sort” | “tungsten-sort”
(2) HashShuffleManager和SortShuffleManager的不同:
①SortShuffleManager會對reduce task要處理的數據進行排序; ②SortShuffleManager會避免像HashShuffleManager一樣創建多份磁盤文件,一個task只會寫入一個磁盤文件,不同的reduce task的數據用offset來划定界限;
(3) tungsten-sort和sort機制差不多,但由於自己實現了內存管理機制,性能有較大提升,可以避免一些OOM、GC等內存相關的異常;
(4)SortShuffle可以通過開啟Bypass機制限制排序機制,即當輸出文件個數小於某個設定值時不會觸發排序機制;
6、算子調優
6.1、使用mapPartitions提升map操作的性能
Spark中每個Task處理一個RDD的Partition,如果是普通的map,加入partition內有100w條數據,那么task的map函數的fuction要執行和 計算100w次,如果使用mapPatitons,則一個task僅執行一次fuction(一次接收整個partiton的所有數據),效率比較高;相比較來說,map函數可以通過GC回收掉已經使用過的內存,但是mapPartitions因為一次傳入大量數據,容易導致OOM,所以比較適用於數據量不是很大的場景,所以在實際開發中應估測一下Partition的數據量和每個executor的內存資源。
6.2、filter之后使用coalesce減少分區數量
默認情況下經過filter之后,RDD的每個Partition的數據量將會變的不均勻,所以可能會有一些數據量較小的partition單獨啟動一個task進行處理,造成資源浪費,也可能會導致數據傾斜;coalesce算子主要就是在filter之后針對每個partition進行壓縮,減少partiton數量從而讓每個partion的數據量更加均勻;repartiton算子其實就是第二個參數為true的coalesce算子的實現;
6.3、使用repartition解決SparkSQL低並行度的性能問題
並行度可以通過參數自定義:
spark.default.parallelism 一般調節為application總CPU個數的2-3倍 textFile(path,n) 傳參時第二個參數n可以指定partition數量
當SparkSQL讀取Hive表對應的HDFS文件的block,可能會因為block數量少而導致並行度較低,而spark.default.parallelism參數只能對除SparkSQL意外的算子生效,如果需要增加並行度,則可以使用repartiton算子進行重分區以提高並行度。
6.4、使用foreachPartition優化寫數據庫性能
foreach對於每條數據都會建立和銷毀數據庫鏈接,並發送和執行多次SQL,對於性能消耗較大,在實際開發中,可以使用foreachPartion函數來進行數據庫寫操作,對於一個partion只會建立一次數據庫連接,並且只需要向數據庫發送一次SQL和多組參數,但因為一次對整個partiton進行操作,所以可能會引起OOM問題,需要估算一下數據量。
6.5、使用reduceByKey實現本地聚合
reduceByKey相較於普通shuffle,會進行一次map端本地聚合,在map端給每個stage的每個task創建的文件輸出數據之前,會進行本地聚合,所以使用reduceByKey算子后map端數據量會減少,從而也會減少磁盤IO和磁盤空間占用,也會降低網絡傳輸消耗,reduc端數據緩存的內存占用也會降低;reduceByKey適用於要實現類似於wordCount程序一樣的對每個key對應的value進行數據計算的場景,也適用於一些字符串拼接等較為復雜的使用 場景;
7、TroubleShooting
7.1、控制shuffle reduce端緩沖區大小以避免OOM
(1) reduce端task會拉取map端的一部分數據放在緩沖區,再在executor分配的內存中進行聚合操作,reduce端的緩沖區默認大小為48MB;
(2) 有時map端數據量非常大,從而導致寫出數據較多,reduce端的緩沖區被填滿,函數拉取緩沖區內數據進行處理,創建的大量對象來不及回收會導致OOM,所以可以適當減小緩沖區大小,從而使內存可以被及時回收;
(3) 如果整個Spark application的內存支援比較充足,而且map端的輸出數據量不是很大,則可以考慮加大緩沖區大小,以減少reduce task的拉取數據的次數,從而減少網絡性能消耗和reduce端聚合操作執行次數;
(4) sparkConf.set("spark.reducer.maxIInFlight","48");
7.2、解決JVM GC導致的shuffle文件拉取失敗的問題
(1)spark.shuffle.io.maxRetres默認值為3 shuffle文件拉取不到,最多重試次數spark.shuffle.io.retrywait 默認值5s 每次拉取的等待時長,如果重試次數為3,則3*5秒之后會報錯–shuffle file not find;
(2)可以通過對兩個參數進行預調節,從而盡量保證第二個stage的task能一定拉取到上一個stage的輸出文件,最多可以接受一個小時,因為full gc不可能一個小時都沒結束;
7.3、Yarn隊列資源不足導致application直接失敗
(1)當基於Yarn提交作業,可能會存在兩個同樣的任務導致內存資源不足,從而可能會導致兩種情況:
①Yarn發現資源不足,直接Fail;
②Yarn發現資源不足,后來的作業一直等待第一個作業運行完成后執行;
(2)解決方案:
①在J2EE系統中限制Spark作業的提交個數;
②分兩個調度隊列分別運行,避免小作業被大作業阻塞;
③無論如何都只同時運行一個作業並給與最大內存資源;
④在J2EE系統中使用線程池對作業進行調度管理,一個線程池對應一個資源隊列,線程池的容量設為1;
7.4、解決各種序列化導致的報錯
(1) 算子函數中,如果使用到了外部的自定義類型的變量,則自定義的變量必須是可序列化的;
(2) 如果要將自定義的類型作為RDD的元素類型,那么自定義類型也需要是可序列化的;
(3) 不能在上述情況下,使用一些第三方的不支持序列化的類型,如數據庫的鏈接類Connection conn;
7.5、解決算子函數返回NULL導致的問題
有些算子函數需要有一個返回值,但有時候我們可能不需要返回值,如果直接返回null則可能會報錯 – scala.Math(null) exception,解決辦法為:返回一些特殊值,比如 “-999”,然后算子獲取RDD后將"-999"數據過濾掉,最后使用coalesce算子壓縮partition數目,以提升性能;
7.6、解決Yarn-client模式導致的網卡流量激增問題
spark在yarn-client模式下,Application的注冊和task的調度是分離開的,driver啟動在本地,需要頻繁的和yarn集群上運行的的多個executor的每個task進行網絡通訊,如果task較多,則可能導致本地機器網卡流量激增,yarn-client一般用於測試工作,便於查看log和trouble shooting,生產環境中應采用yarn-cluster模式,該模式driver運行在集群上,所以網卡流量激增問題也不會發生;
7.7、解決yarn–cluster模式的JVM內存溢出無法執行問題
有時運行作業會出現本地client模式測試成功,但是cluster模式報出JVM 永久代(Permgen)溢出的錯誤,是因為本地client模式默認內存大小為128MB,但是cluster模式默認為82MB,可以在提交Spark作業時設置永久代內存大小:
-- conf spark.driver.extraJavaOptions = "-Xx:PermSize=128M -Xx:MaxPermSize=256MB"
如果是棧內存溢出,則可能是因為SparkSQL使用 “or” 過多,需要將SQL語句進行優化拆分;
7.8、錯誤的使用持久化和checkPoint
(1) 正確的使用持久化的方式
var usersRDD; ①usersRDD = usersRDD.cache(); ②val cacheUsersRDD = usersRDD.cache(); 如果直接用usersRDD調用cache算子,而不用對象接收,則會報file not find的錯誤;
(2)checkPoint的使用方式:
第一步:設置checkPoint目錄 sc.setCheckPointDir("path"); 第二步:對RDD執行checkPoint算子 rdd.checkPoint();
(3)checkPoint是將RDD的數據持久化一份到容錯系統(HDFS),如果cache失效,則checkPoint作為數據備份;
8、數據傾斜
8.1、如何定位數據傾斜問題?
數據傾斜的發生基本是因為shuffle操作引起,可以通過查看代碼中有哪些會觸發shuffle的算子(如groupByKey、reduceByKey、 countByKey、join),也可以看log日志文件,看哪些代碼導致了OOM或者哪個Stage拖后腿。
8.2、聚合源數據(重劍無鋒)
(1)通過groupByKey和reduceByKey算子可以對數據進行聚合操作,也可以在hive etl中使用reduceBykey函數將values聚合;
(2)數據傾斜即某個key對應大量的數據,可以直接對values進行預聚合從而減少數據量,比如按照key進行分組,將key對應的所有values全部用一種特殊的格式拼接到一個字符串里面(比如"key=sessionId,value : name=tom|age=22|sex=男"),處理之后每個key對應一條數據,在Spark程序中不再需要執行groupByKey+map操作,可以直接對每個key對應的value進行操作,從而也避免了shffle,所以一定程度上避免了數據傾斜。
(3)在實際的業務場景中,可能沒法對每條key對應的數據進行聚合,可以放粗粒度(比如數據內包含了幾個城市、幾天、幾個地區的數據),則可以按照城市粒度進行聚合,同樣也可以減輕數據傾斜問題。
(4)過濾導致數據傾斜的key: 如果可以接受某些數據在作業中被舍棄,則可以選擇在hive中過濾掉某些導致數據傾斜的key,從而也不會影響業務需求。
8.3、提高shuffle操作並行度(如果前面的方案都不適用,則可以嘗試這種方法)
(1)將reduce task數量變多,就可以讓每個reduce task分配更少的數據量,甚至解決數據傾斜問題;
(2)可以在shuffle操作的算子中傳入一個數字參數,從而提高並行度(如groupBeyKey、countBeyKey、reduceBeyKey),但是提高並行度並不能完全解決數據傾斜問題,治標不治本,只是盡可能的緩解和減輕shuffle中reduce task的壓力。
8.4、使用隨機key實現雙重聚合
(1)使用場景:
groupBeyKey、reduceBeyKey
先將key打散,將原來一樣的key隨機加上前綴從而將其分成多個組,然后針對多個組進行key的局部聚合操作,然后再去除key的前綴進行全局聚合;
8.5、將reduce joiin轉換為map join
(1)reduce join會發生shuffle操作,可以將原本兩個RDD中的一個廣播出來,然后進行map操作;
適用於哪種情況?
如果兩個RDD要進行join,其中一個RDD較小,可以將小的RDD廣播出去,小的RDD便會在每個executor的Blockmanager中駐留一份,從而避免了數據傾斜,如果兩個Rdd都比較大則不適合采用這種方案進行處理;
(2)這種方法不止適用於數據傾斜問題,即使沒有數據傾斜發生,也可以避免shuffle操作從而提升性能。
8.6、sample采樣傾斜key進行兩次join
(1)方案思路:關鍵在於將發生數據傾斜的key單獨拉出來放到一個RDD中,然后利用這個RDD和其它的RDD單獨進行join操作,從而將key對應的數據分散到多個task中進行join操作,避免了所有傾斜Key混在一個RDD中導致只有一個RDD發生數據傾斜;
(2)什么時候適用這個方案?
在join操作時一個或少數幾個key對應的數據特別多(可以用countBeKey計算每個key對應的數據個數),如果導致數據傾斜的key過多,則不適用這種方案。
8.7、使用隨機數以及擴容表進行join
(1)實現步驟:
①選擇一個RDD要用flatMap進行擴容,將每條數據映射為多條數據,每個映射出來的數據,都帶有一個n以內的隨機數作為key的前綴,一般情況下n取值10; ②將兩外一個RDD做普通的map操作,每條數據都打上n以內的隨機數作為前綴; ③最后將兩個處理后的RDD進行join操作。
(2)局限性:因為兩個RDD都比較大,所以無法將某個RDD擴容到特別大,一般為10倍,而且數據傾斜只是減輕和緩解並不能徹底解決數據傾斜問題;
8.8、自定義Partioner將數據進行自定義分區
