Spark性能優化點
一、分配更多的資源
它是性能優化調優的王道,就是增加和分配更多的資源,這對於性能和速度上的提升是顯而易見的,
基本上,在一定范圍之內,增加資源與性能的提升,是成正比的;寫完了一個復雜的spark作業之后,進行性能調 優的時候,首先第一步,就是要來調節優的資源配置;在這個基礎之上,如果說你的spark作業,能夠分配的資源達到 了你的能力范圍的頂端之后,無法再分配更多的資源了,公司資源有限;那么才是考慮去做后面的這些性能調優的點。
1.1 參數調節到多大,算是最大?
第一種情況:standalone模式
先計算出公司spark集群上的所有資源 每台節點的內存大小和cpu核數,
比如:一共有20台worker節點,每台節點8g內存,10個cpu。
實際任務在給定資源的時候,可以給20個executor、每個executor的內存8g、每個executor的使用的cpu個數 10。
第二種情況:Yarn 先計算出yarn集群的所有大小,比如一共500g內存,100個cpu;
這個時候可以分配的大資源,比如給定50個executor、每個executor的內存大小10g,每個executor使用的cpu 個數為2。
使用原則:你能使用的資源有多大,就盡量去調節到大的大小(executor的數量:幾十個到上百個不等;executor的內存;exector的cpu個數)
1.2 為什么調大資源以后性能可以提升
二、提高並行度
2.1 Spark的並行度指的是什么
spark作業中,各個stage的task的數量,也就代表了spark作業在各個階段stage的並行度!
當分配完所能分配的大資源了,然后對應資源去調節程序的並行度,如果並行度沒有與資源相匹配,那么導致你 分配下去的資源都浪費掉了。同時並行運行,還可以讓每個task要處理的數量變少(很簡單的原理。合理設置並行度, 可以充分利用集群資源,減少每個task處理數據量,而增加性能加快運行速度。)
舉例說明:
假如, 現在已經在spark‐submit 腳本里面,給我們的spark作業分配了足夠多的資源,比如50個executor ,每 個executor 有10G內存,每個executor有3個cpu core 。 基本已經達到了spark集群或者yarn集群上限。task沒有 設置,或者設置的很少,比如就設置了100個task、50個executor、每個executor有3個core ,也就是說 Application 任何一個stage運行的時候,都有總數150個cpu core ,可以並行運行。
但是你現在只有100個task,平均分配一下,每個executor 分配到2個task,那么同時在運行的task,只有100個 task,每個executor 只會並行運行 2個task。 每個executor 剩下的一個cpu core 就浪費掉了!你的資源,雖然分 配充足了,但是問題是, 並行度沒有與資源相匹配,導致你分配下去的資源都浪費掉了。合理的並行度的設置,應該要 設置的足夠大,大到可以完全合理的利用你的集群資源; 比如上面的例子,總共集群有150個cpu core ,可以並行運 行150個task。那么你就應該將你的Application 的並行度,至少設置成150個,才能完全有效的利用你的集群資源, 讓150個task並行執行,而且task增加到150個以后,即可以同時並行運行,還可以讓每個task要處理的數量變少; 比 如總共150G的數據要處理, 如果是100個task ,每個task 要計算1.5G的數據。 現在增加到150個task,每個task只 要處理1G數據。
2.2 如何提高並行度
1) 可以設置task的數量
至少設置成與spark Application 的總cpu core 數量相同(理想情況,150個core,分配150task,一起運 行,差不多同一時間運行完畢)官方推薦,task數量,設置成spark Application 總cpu core數量的2~3倍 。 比如150個cpu core ,基本設置task數量為300~500. 與理想情況不同的,有些task會運行快一點,比如50s就完 了,有些task 可能會慢一點,要一分半才運行完,所以如果你的task數量,剛好設置的跟cpu core 數量相同,可能會 導致資源的浪費。
因為比如150個task中10個先運行完了,剩余140個還在運行,但是這個時候,就有10個cpu core空閑出來了,導 致浪費。如果設置2~3倍,那么一個task運行完以后,另外一個task馬上補上來,盡量讓cpu core不要空閑。同時盡量 提升spark運行效率和速度。提升性能。
2) 如何設置task數量來提升並行度
設置參數spark.defalut.parallelism
默認是沒有值的,如果設置了值為10,它會在shuffle的過程才會起作用。
比如:val rdd2 = rdd1.reduceByKey(_+_)
此時rdd2的分區數就是10,rdd1的分區數不受這個參數的影響。
可以通過在構建SparkConf對象的時候設置,例如:new SparkConf().set("spark.defalut.parallelism","500")
3) 給RDD重新設置partition的數量
使用rdd.repartition 來重新分區,該方法會生成一個新的rdd,使其分區數變大。
此時由於一個partition對應一個task,那么對應的task個數越多,通過這種方式也可以提高並行度。
4) 提高spark sql 運行的task數量
通過設置參數 spark.sql.shuffle.partitions=500 默認為200;
可以適當增大,來提高並行度。 比如設置為 spark.sql.shuffle.partitions=500
三、RDD的重用和持久化
3.1 實際開發遇到的情況說明
如上圖所示的計算邏輯:
(1)當第一次使用rdd2做相應的算子操作得到rdd3的時候,就會從rdd1開始計算,先讀取HDFS上的文件,然后對rdd1 做對應的算子操作得到rdd2,再由rdd2計算之后得到rdd3。同樣為了計算得到rdd4,前面的邏輯會被重新計算。
(2)默認情況下多次對一個rdd執行算子操作,去獲取不同的rdd,都會對這個rdd及之前的父rdd全部重新計算一次。 這種情況在實際開發代碼的時候會經常遇到,但是我們一定要避免一個rdd重復計算多次,否則會導致性能急劇降低。
總結:可以把多次使用到的rdd,也就是公共rdd進行持久化,避免后續需要,再次重新計算,提升效率。
3.2 如何對rdd進行持久化
可以調用rdd的cache或者persist方法。
(1)cache方法默認是把數據持久化到內存中 ,例如:rdd.cache ,其本質還是調用了persist方法
(2)persist方法中有豐富的緩存級別,這些緩存級別都定義在StorageLevel這個object中,可以結合實際的應用場 景合理的設置緩存級別。例如: rdd.persist(StorageLevel.MEMORY_ONLY),這是cache方法的實現。
3.3 rdd持久化時可以采用序列化
(1)如果正常將數據持久化在內存中,那么可能會導致內存的占用過大,這樣的話,也許會導致OOM內存溢出。
(2)當純內存無法支撐公共RDD數據完全存放的時候,就優先考慮使用序列化的方式在純內存中存儲。將RDD的每個 partition的數據,序列化成一個字節數組;序列化后,大大減少內存的空間占用。
(3)序列化的方式,唯一的缺點就是,在獲取數據的時候,需要反序列化。但是可以減少占用的空間和便於網絡傳輸
(4)如果序列化純內存方式,還是導致OOM,內存溢出;就只能考慮磁盤的方式,內存+磁盤的普通方式(無序列化)。 (5)為了數據的高可靠性,而且內存充足,可以使用雙副本機制,進行持久化。
持久化的雙副本機制,持久化后的一個副本,因為機器宕機了,副本丟了,就還是得重新計算一次;
持久化的每個數據單元,存儲一份副本,放在其他節點上面,從而進行容錯;
一個副本丟了,不用重新計算,還可以使用另外一份副本。這種方式,僅僅針對你的內存資源極度充足。
3.4 廣播變量
1) 場景描述
在實際工作中可能會遇到這樣的情況,由於要處理的數據量非常大,這個時候可能會在一個stage中出現大量的 task,比如有1000個task,這些task都需要一份相同的數據來處理業務,這份數據的大小為100M,該數據會拷貝 1000份副本,通過網絡傳輸到各個task中去,給task使用。這里會涉及大量的網絡傳輸開銷,同時至少需要的內存 為1000*100M=100G,這個內存開銷是非常大的。不必要的內存的消耗和占用,就導致了,你在進行RDD持久化 到內存,也許就沒法完全在內存中放下;就只能寫入磁盤,后導致后續的操作在磁盤IO上消耗性能;這對於 spark任務處理來說就是一場災難。
由於內存開銷比較大,task在創建對象的時候,可能會出現堆內存放不下所有對象,就會導致頻繁的垃圾回收器的 回收GC。GC的時候一定是會導致工作線程停止,也就是導致Spark暫停工作那么一點時間。頻繁GC的話,對 Spark作業的運行的速度會有相當可觀的影響。
2) 廣播變量引入
Spark中分布式執行的代碼需要傳遞到各個executor的task上運行。對於一些只讀、固定的數據,每次都需要Driver 廣播到各個Task上,這樣效率低下。廣播變量允許將變量只廣播(提前廣播)給各個executor。該executor上的各 個task再從所在節點的BlockManager(負責管理某個executor對應的內存和磁盤上的數據)獲取變量,而不是從 Driver獲取變量,從而提升了效率。
廣播變量,初始的時候,就在Drvier上有一份副本。通過在Driver把共享數據轉換成廣播變量。
task在運行的時候,想要使用廣播變量中的數據,此時首先會在自己本地的Executor對應的BlockManager中,嘗 試獲取變量副本;如果本地沒有,那么就從Driver遠程拉取廣播變量副本,並保存在本地的BlockManager中;
此后這個executor上的task,都會直接使用本地的BlockManager中的副本。那么這個時候所有該executor中的 task都會使用這個廣播變量的副本。也就是說一個executor只需要在第一個task啟動時,獲得一份廣播變量數據,之后 的task都從本節點的BlockManager中獲取相關數據。
executor的BlockManager除了從driver上拉取,也可能從其他節點的BlockManager上拉取變量副本,網絡距離 越近越好。
3) 使用廣播變量后的性能分析
比如一個任務需要50個executor,1000個task,共享數據為100M。
(1)在不使用廣播變量的情況下,1000個task,就需要該共享數據的1000個副本,也就是說有1000份數需要大量的網絡 傳輸和內存開銷存儲。耗費的內存大小1000*100=100G.
(2)使用了廣播變量后,50個executor就只需要50個副本數據,而且不一定都是從Driver傳輸到每個節點,還可能是就 近從近的節點的executor的blockmanager上拉取廣播變量副本,網絡傳輸速度大大增加;內存開銷 50*100M=5G
總結: 不使用廣播變量的內存開銷為100G,使用后的內存開銷5G,這里就相差了20倍左右的網絡傳輸性能損耗和內存開 銷,使用廣播變量后對於性能的提升和影響,還是很可觀的。
廣播變量的使用不一定會對性能產生決定性的作用。比如運行30分鍾的spark作業,可能做了廣播變量以后,速度 快了2分鍾,或者5分鍾。但是一點一滴的調優,積少成多。后還是會有效果的。
4) 如何使用廣播變量
(1) 通過sparkContext的broadcast方法把數據轉換成廣播變量,類型為Broadcast,
val broadcastArray: Broadcast[Array[Int]] = sc.broadcast(Array(1,2,3,4,5,6))
(2) 然后executor上的BlockManager就可以拉取該廣播變量的副本獲取具體的數據。
獲取廣播變量中的值可以通過調用其value方法 val array: Array[Int] = broadcastArray.value
四、使用Kryo序列化
4.1 spark序列化介紹
Spark在進行任務計算的時候,會涉及到數據跨進程的網絡傳輸、數據的持久化,這個時候就需要對數據進行序列 化。Spark默認采用Java的序列化器。默認java序列化的優缺點如下:
其好處: 處理起來方便,不需要我們手動做其他操作,只是在使用一個對象和變量的時候,需要實現Serializble接口。
其缺點: 默認的序列化機制的效率不高,序列化的速度比較慢;序列化以后的數據,占用的內存空間相對還是比較大。
Spark支持使用Kryo序列化機制。Kryo序列化機制,比默認的Java序列化機制,速度要快,序列化后的數據要更小,大概是Java序列化機制的1/10。所以Kryo序列化優化以后,可以讓網絡傳輸的數據變少;在集群中耗費的內存資源大大減少。
4.2 Kryo序列化啟用后生效的地方
Kryo序列化機制,一旦啟用以后,會生效的幾個地方:
(1)算子函數中使用到的外部變量
算子中的外部變量可能來着與driver需要涉及到網絡傳輸,就需要用到序列化。
最終可以優化網絡傳輸的性能,優化集群中內存的占用和消耗
(2)持久化RDD時進行序列化,StorageLevel.MEMORY_ONLY_SER
將rdd持久化時,對應的存儲級別里,需要用到序列化。
最終可以優化內存的占用和消耗;持久化RDD占用的內存越少,task執行的時候,創建的對象,就不至於頻繁 的占滿內存,頻繁發生GC。
(3) 產生shuffle的地方,也就是寬依賴
下游的stage中的task,拉取上游stage中的task產生的結果數據,跨網絡傳輸,需要用到序列化。
最終可以優化網絡傳輸的性能
4.3 如何開啟Kryo序列化機制
(1) 在構建sparkConf的時候設置相關參數
new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
Kryo之所以沒有被作為默認的序列化類庫的原因,主要是因為Kryo要求如果要達到它的佳性能的話,那么就一定 要注冊你自定義的類(如,你的算子函數中使用到了外部自定義類型的對象變量,這時,就要求必須注冊你的類,否 則Kryo達不到佳性能)。
Kryo也不支持所有實現了 java.io.Serializable 接口的類型,它需要你在程序中 register 需要序列化的類 型,以得到佳性能。
(2) 注冊需要通過Kryo序列化的一些自定義類
new SparkConf().registerKryoClasses(Array(classOf[Student]))
該方法需要一個Class類型的數組,表示可以一下子注冊多個需要實現Kryo序列化的類。
五、使用fastutil優化數據格式
5.1 fastutil 介紹
fastutil是擴展了Java標准集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的類庫,提供了特殊類型 的map、set、list和queue;
fastutil能夠提供更小的內存占用,更快的存取速度;我們使用fastutil提供的集合類,來替代自己平時使用的JDK的 原生的Map、List、Set。
5.2 fastutil 好處
fastutil集合類,可以減小內存的占用,並且在進行集合的遍歷、根據索引(或者key)獲取元素的值和設置元素的值 的時候,提供更快的存取速度;
fastutil也提供了64位的array、set和list,以及高性能快速的,以及實用的IO類,來處理二進制和文本類型的文件;
fastutil新版本要求Java 7以及以上版本;
fastutil的每一種集合類型,都實現了對應的Java中的標准接口(比如fastutil的map,實現了Java的Map接口),因此可以直接放入已有系統的任何代碼中。
fastutil還提供了一些JDK標准類庫中沒有的額外功能(比如雙向迭代器。
fastutil除了對象和原始類型為元素的集合,fastutil也提供引用類型的支持,但是對引用類型是使用等於號(=)進 行比較的,而不是equals()方法。
fastutil盡量提供了在任何場景下都是速度快的集合類庫。
5.3 Spark中應用fastutil 的場景
1) 算子函數使用了外部變量
● 你可以使用Broadcast廣播變量優化;
● 可以使用Kryo序列化類庫,提升序列化性能和效率;
● 如果外部變量是某種比較大的集合,那么可以考慮使用fastutil改寫外部變量;
首先從源頭上就減少內存的占用(fastutil),通過廣播變量進一步減少內存占用,再通過Kryo序列化類庫進一步減少內存占用。
2) 算子函數里使用了比較大的集合Map/List
在你的算子函數里,也就是task要執行的計算邏輯里面,如果有邏輯中,出現,要創建比較大的Map、List等集合, 可能會占用較大的內存空間,而且可能涉及到消耗性能的遍歷、存取等集合操作;
那么此時,可以考慮將這些集合類型使用fastutil類庫重寫,
使用了fastutil集合類以后,就可以在一定程度上,減少task創建出來的集合類型的內存占用。
避免executor內存頻繁占滿,頻繁喚起GC,導致性能下降。
3) 關於fastutil 的調優說明
fastutil其實沒有你想象中的那么強大,也不會跟官網上說的效果那么一鳴驚人。
廣播變量、Kryo序列化類庫、 fastutil 都是之前所說的,對於性能來說,類似於一種調味品,烤雞,本來就很好吃了,然后加了一點特質的孜然麻辣 粉調料,就更加好吃了一點。
分配資源、並行度、RDD架構與持久化,這三個就是烤雞; broadcast、kryo、fastutil,類似於調料。
比如說,你的spark作業,經過之前一些調優以后,大概30分鍾運行完,現在加上broadcast、kryo、fastutil,也許就是優化到29分鍾運行完、或者更好一點,也許就是28分鍾、25分鍾。
真正有意義的就是后面要學習的shuffle調優,可能優化之后只需要15分鍾;
還有把groupByKey用reduceByKey改寫,執行本地聚合,也許10分鍾;
甚至可以向公司申請更多的資源,擴大整個集群的計算能力,后可能到達5分鍾就完成任務了。
4) fastutil 的使用
第一步:在pom.xml中引用fastutil的包
<dependency>
<groupId> fastutil </groupId>
<artifactId> fastutil </artifactId>
<version> 5.0.9 </version>
</dependency>
第二步:平時使用List (Integer)的替換成IntList即可。
List<Integer>的list對應的到fastutil就是IntList類型
使用說明: 基本都是類似於IntList的格式,前綴就是集合的元素類型;
特殊的就是Map,Int2IntMap,代表了key‐value映射的元素類型。
六、調節數據本地化等待時長
Spark在Driver上對Application的每一個stage的task進行分配之前,都會計算出每個task要計算的是哪個分片數 據,RDD的某個partition;Spark的task分配算法,優先會希望每個task正好分配到它要計算的數據所在的節點, 這樣的話就不用在網絡間傳輸數據;
但是通常來說,有時事與願違,可能task沒有機會分配到它的數據所在的節點,為什么呢,可能那個節點的計算資 源和計算能力都滿了;所以這種時候,通常來說,Spark會等待一段時間,默認情況下是3秒(不是絕對的,還有很 多種情況,對不同的本地化級別,都會去等待),到后實在是等待不了了,就會選擇一個比較差的本地化級別, 比如說將task分配到距離要計算的數據所在節點比較近的一個節點,然后進行計算。
6.1 本地化級別
(1)PROCESS_LOCAL:進程本地化
代碼和數據在同一個進程中,也就是在同一個executor中;計算數據的task由executor執行,數據在executor的 BlockManager中;性能好。
(2)NODE_LOCAL:節點本地化
代碼和數據在同一個節點中;比如說數據作為一個HDFS block塊,就在節點上,而task在節點上某個executor中 運行;或者是數據和task在一個節點上的不同executor中;數據需要在進程間進行傳輸;性能其次。
(3)RACK_LOCAL:機架本地化
數據和task在一個機架的兩個節點上;數據需要通過網絡在節點之間進行傳輸; 性能比較差。
(4) ANY:無限制
數據和task可能在集群中的任何地方,而且不在一個機架中;性能最差。
6.2 數據本地化等待時長
spark.locality.wait,默認是3s
首先采用佳的方式,等待3s后降級,還是不行,繼續降級...,后還是不行,只能夠采用差的。
七、降低cache操作的內存占比
7.1 為什么需要JVM調優
spark的scala代碼調用了很多java api。scala也是運行在java虛擬機中的。spark是運行在java虛擬機中的。
java虛擬機可能會產生什么樣的問題:內存不足??!!
我們的RDD的緩存、task運行定義的算子函數,可能會創建很多對象。都可能會占用大量內存,沒搞好的話,可能導致 JVM出問題。
JVM調優(Java虛擬機):JVM相關的參數,通常情況下,如果你的硬件配置、基礎的JVM的配置,都ok的話,JVM通常不 會造成太嚴重的性能問題;反而更多的是在troubleshooting(故障排除)中,JVM占了很重要的地位;JVM造成線上的 spark作業的運行報錯,甚至失敗(比如OOM)。
7.2 JVM堆內存模型圖
有通過new創建的對象的內存都在堆中分配,其大小可以通過‐Xmx和‐Xms來控制。
堆被划分為年輕代和老年代,年輕代又被進一步划分為Eden和Survivor區
JVM堆空間內存分配,在默認情況下:
年輕代 :用於存放新產生的對象。我們在spark task執行算子函數操作的時候,可能會創建很多對象,這些對象都 是要放入JVM年輕代中的,它占用堆中三分之一的堆內存空間。
里面又分為三個區域
eden區: 8/10 的年輕代空間
survivor0 : 1/10 的年輕代空間
survivor1 : 1/10 的年輕代空間
老年代 : 用於存放被長期引用的對象,它占用堆中三分之一的堆內存空間。
7.3 JVM工作闡述
每一次放對象的時候,都是放入eden區域,和其中一個survivor區域;另外一個survivor區域是空閑的。 當eden區域和一個survivor區域放滿了以后(spark運行過程中,產生的對象實在太多了),就會觸發minor gc,小型 垃圾回收。把不再使用的對象,從內存中清空,給后面新創建的對象騰出來點兒地方。
清理掉了不再使用的對象之后,那么也會將存活下來的對象(還要繼續使用的),放入之前空閑的那一個 survivor區域中。這里可能會出現一個問題。默認eden、survior0和survivor1的內存占比是8:1:1。問題是,如果存 活下來的對象是1.5,一個survivor區域放不下,將多余的對象,直接放入老年代了。
如果你的JVM內存不夠大的話,可能導致頻繁的年輕代內存滿溢,頻繁的進行minor gc(清理Eden區和 Survivor區)。頻繁的minor gc會導致短時間內,有些存活的對象,多次垃圾回收都沒有回收掉。會導致這種短生命周 期(其實不一定是要長期使用的)對象,年齡過大,垃圾回收次數太多還沒有回收到,跑到老年代。老年代中,可能會因 為內存不足,囤積一大堆,短生命周期的,本來應該在年輕代中的,可能馬上就要被回收掉的對象。此時可能導致老年代 頻繁滿溢。頻繁進行full gc(清理整個堆空間—包括年輕代和老年代)。full gc就會去回收老年代中的對象。 full gc / minor gc,無論是快,還是慢,都會導致jvm的工作線程停止工作,stop the world。 簡而言之,就是說gc的時候,spark停止工作了。等着垃圾回收結束。
內存不充足的時候,會出現的問題:
(1)、頻繁minor gc,也會導致頻繁spark停止工作
(2)、老年代囤積大量活躍對象(短生命周期的對象),導致頻繁full gc,full gc時間很長,短則數十秒,長則數分 鍾,甚至數小時。可能導致spark長時間停止工作。
(3)、頻繁gc會嚴重影響spark的性能和運行的速度。
7.4 降低cache操作的內存占比
spark中,堆內存又被划分成了兩塊兒,一塊兒是專門用來給RDD的cache、persist操作進行RDD數據緩存用的;
另外一塊兒,就是我們剛才所說的,用來給spark算子函數的運行使用的,存放函數中自己創建的對象。
默認情況下,給RDD cache操作的內存占比是0.6(spark.storage.memoryFraction=0.6),60%的內存都給 了cache操作了。如果某些情況下,cache不是那么的緊張,在task算子函數中創建的對象過多,然后內存又不太大,導 致了頻繁的minor gc,甚至頻繁full gc,導致spark頻繁的停止工作。性能影響會很大。
針對上述這種情況,大家可以在之前我們講過的那個spark ui。yarn去運行的話,那么就通過yarn的界面,去 查看你的spark作業的運行統計,很簡單大家一層一層點擊進去就好。可以看到每個stage的運行情況,包括每個task的 運行時間、gc時間等等。如果發現gc太頻繁,時間太長。此時就可以適當調節這個比例。
降低cache操作的內存占比,大不了用persist操作,選擇將一部分緩存的RDD數據寫入磁盤,或者序列化方式, 配合Kryo序列化類,減少RDD緩存的內存占用;降低cache操作內存占比;此時對應的算子函數的內存占比就提升了。這個 時候,可能就可以減少minor gc的頻率,同時減少full gc的頻率。對性能的提升是有一定的幫助的。
總之一句話,讓task執行算子函數時,有更多的內存可以使用。
7.5 降低cache操作的內存占比代碼實現
cache操作的內存占比為堆內存的0.6 也就是百分之60,可以適當調節,降低該值,
修改spark.storage.memoryFraction參數
可以設置為0.5‐‐‐>0.4‐‐‐‐‐‐>0.3
例如:
new SparkConf().set("spark.storage.memoryFraction","0.4")
把cache操作的內存占比修改為堆內存的百分之40,讓堆內存可以容納更多的對象,減少gc的頻率,提高spark任務運行 的速度和性能。