【大數據】Spark性能優化和故障處理


 

第一章 Spark 性能調優

1.1 常規性能調優

1.1.1 常規性能調優一:最優資源配置

Spark性能調優的第一步,就是為任務分配更多的資源,在一定范圍內,增加資源的分配與性能的提升是成正比的,實現了最優的資源配置后,在此基礎上再考慮進行后面論述的性能調優策略。

資源的分配在使用腳本提交Spark任務時進行指定,標准的Spark任務提交腳本如代碼清單2-1所示:

代碼清單2-1 標准Spark提交腳本

/usr/opt/modules/spark/bin/spark-submit \

--class com.atguigu.spark.Analysis \

--num-executors 80 \

--driver-memory 6g \

--executor-memory 6g \

--executor-cores 3 \

/usr/opt/modules/spark/jar/spark.jar \

可以進行分配的資源如表2-1所示:

2-1 可分配資源表

名稱

說明

--num-executors

配置Executor的數量

--driver-memory

配置Driver內存(影響不大)

--executor-memory

配置每個Executor的內存大小

--executor-cores

配置每個Executor的CPU core數量

調節原則:盡量將任務分配的資源調節到可以使用的資源的最大限度。

對於具體資源的分配,我們分別討論Spark的兩種Cluster運行模式:

第一種是Spark Standalone模式,你在提交任務前,一定知道或者可以從運維部門獲取到你可以使用的資源情況,在編寫submit腳本的時候,就根據可用的資源情況進行資源的分配,比如說集群有15台機器,每台機器為8G內存,2個CPU core,那么就指定15個Executor,每個Executor分配8G內存,2個CPU core。

第二種是Spark Yarn模式,由於Yarn使用資源隊列進行資源的分配和調度,在表寫submit腳本的時候,就根據Spark作業要提交到的資源隊列,進行資源的分配,比如資源隊列有400G內存,100個CPU core,那么指定50個Executor,每個Executor分配8G內存,2個CPU core。

對表2-1中的各項資源進行了調節后,得到的性能提升如表2-2所示:

2-2 資源調節后的性能提升

名稱

解析

 

 

增加Executor·個數

在資源允許的情況下,增加Executor的個數可以提高執行task的並行度。比如有4個Executor,每個Executor有2個CPU core,那么可以並行執行8個task,如果將Executor的個數增加到8個(資源允許的情況下),那么可以並行執行16個task,此時的並行能力提升了一倍。

 

 

 

增加每個Executor的CPU core個數

  在資源允許的情況下,增加每個Executor的Cpu core個數,可以提高執行task的並行度。比如有4個Executor,每個Executor有2個CPU core,那么可以並行執行8個task,如果將每個Executor的CPU core個數增加到4個(資源允許的情況下),那么可以並行執行16個task,此時的並行能力提升了一倍。

 

 

 

 

 

 

 

增加每個Executor的內存量

  在資源允許的情況下,增加每個Executor的內存量以后,對性能的提升有三點:

  1. 可以緩存更多的數據(即對RDD進行cache),寫入磁盤的數據相應減少,甚至可以不寫入磁盤,減少了可能的磁盤IO;
  2. 可以為shuffle操作提供更多內存,即有更多空間來存放reduce端拉取的數據,寫入磁盤的數據相應減少,甚至可以不寫入磁盤,減少了可能的磁盤IO;
  3. 可以為task的執行提供更多內存,在task的執行過程中可能創建很多對象,內存較小時會引發頻繁的GC,增加內存后,可以避免頻繁的GC,提升整體性能。

補充:生產環境Spark submit腳本配置

/usr/local/spark/bin/spark-submit \

--class com.atguigu.spark.dataetl \

--num-executors 80 \

--driver-memory 6g \

--executor-memexecutoory 6g \

--r-cores 3 \

--master yarn-cluster \

--queue root.default \

--conf spark.yarn.executor.memoryOverhead=2048 \

--conf spark.core.connection.ack.wait.timeout =300 \

/usr/local/spark/spark.jar

參數配置參考值:

--num-executors:50~100

--driver-memory:1G~5G

--executor-memory:6G~10G

--executor-cores:3

--master:實際生產環境一定使用yarn-cluster

1.1.2 常規性能調優二:RDD優化

1.2.1 RDD復用

在對RDD進行算子時,要避免相同的算子和計算邏輯之下對RDD進行重復的計算,如圖2-1所示:

 

2-1 RDD的重復計算

對圖2-1中的RDD計算架構進行修改,得到如圖2-2所示的優化結果:

 

2-2 RDD架構優化

1.2.2 RDD持久化

Spark中,當多次對同一個RDD執行算子操作時,每一次都會對這個RDD以之前的父RDD重新計算一次,這種情況是必須要避免的,對同一個RDD的重復計算是對資源的極大浪費,因此,必須對多次使用的RDD進行持久化,通過持久化將公共RDD的數據緩存到內存/磁盤中,之后對於公共RDD的計算都會從內存/磁盤中直接獲取RDD數據。

對於RDD的持久化,有兩點需要說明:

第一,RDD的持久化是可以進行序列化的,當內存無法將RDD的數據完整的進行存放的時候,可以考慮使用序列化的方式減小數據體積,將數據完整存儲在內存中。

第二,如果對於數據的可靠性要求很高,並且內存充足,可以使用副本機制,對RDD數據進行持久化。當持久化啟用了復本機制時,對於持久化的每個數據單元都存儲一個副本,放在其他節點上面,由此實現數據的容錯,一旦一個副本數據丟失,不需要重新計算,還可以使用另外一個副本。

1.2.3 RDD盡可能早的filter操作

獲取到初始RDD后,應該考慮盡早地過濾掉不需要的數據,進而減少對內存的占用,從而提升Spark作業的運行效率。

1.1.3 常規性能調優三:廣播大變量

默認情況下,task中的算子中如果使用了外部的變量,每個task都會獲取一份變量的復本,這就造成了內存的極大消耗。一方面,如果后續對RDD進行持久化,可能就無法將RDD數據存入內存,只能寫入磁盤,磁盤IO將會嚴重消耗性能;另一方面,task在創建對象的時候,也許會發現堆內存無法存放新創建的對象,這就會導致頻繁的GC,GC會導致工作線程停止,進而導致Spark暫停工作一段時間,嚴重影響Spark性能。

假設當前任務配置了20個Executor,指定500個task,有一個20M的變量被所有task共用,此時會在500個task中產生500個副本,耗費集群10G的內存,如果使用了廣播變量, 那么每個Executor保存一個副本,一共消耗400M內存,內存消耗減少了5倍。

廣播變量在每個Executor保存一個副本,此Executor的所有task共用此廣播變量,這讓變量產生的副本數量大大減少。

在初始階段,廣播變量只在Driver中有一份副本。task在運行的時候,想要使用廣播變量中的數據,此時首先會在自己本地的Executor對應的BlockManager中嘗試獲取變量,如果本地沒有,BlockManager就會從Driver或者其他節點的BlockM=anager上遠程拉取變量的復本,並由本地的BlockManager進行管理;之后此Executor的所有task都會直接從本地的BlockManager中獲取變量。

1.1.4 常規性能調優四:Kryo序列化

默認情況下,Spark使用Java的序列化機制。Java的序列化機制使用方便,不需要額外的配置,在算子中使用的變量實現Serializable接口即可,但是,Java序列化機制的效率不高,序列化速度慢並且序列化后的數據所占用的空間依然較大。

Kryo序列化機制比Java序列化機制性能提高10倍左右,Spark之所以沒有默認使用Kryo作為序列化類庫,是因為它不支持所有對象的序列化,同時Kryo需要用戶在使用前注冊需要序列化的類型,不夠方便,但Spark 2.0.0版本開始,簡單類型、簡單類型數組、字符串類型的Shuffling RDDs 已經默認使用Kryo序列化方式了。

Kryo序列化注冊方式的實例代碼如代碼清單2-3所示:

代碼清單2-3 Kryo序列化機制配置代碼

public class MyKryoRegistrator implements KryoRegistrator

{

  @Override

  public void registerClasses(Kryo kryo)

  {

    kryo.register(StartupReportLogs.class);

  }

}

配置Kryo序列化方式的實例代碼如代碼清單2-4所示:

代碼清單2-4 Kryo序列化機制配置代碼

//創建SparkConf對象

val conf = new SparkConf().setMaster(…).setAppName(…)

//使用Kryo序列化庫,如果要使用Java序列化庫,需要把該行屏蔽掉

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");  

//Kryo序列化庫中注冊自定義的類集合,如果要使用Java序列化庫,需要把該行屏蔽掉

conf.set("spark.kryo.registrator", "atguigu.com.MyKryoRegistrator");

1.1.5 常規性能調優五:調節本地化等待時長

Spark作業運行過程中,Driver會對每一個stage的task進行分配。根據Spark的task分配算法,Spark希望task能夠運行在它要計算的數據算在的節點(數據本地化思想),這樣就可以避免數據的網絡傳輸。通常來說,task可能不會被分配到它處理的數據所在的節點,因為這些節點可用的資源可能已經用盡,此時,Spark會等待一段時間,默認3s,如果等待指定時間后仍然無法在指定節點運行,那么會自動降級,嘗試將task分配到比較差的本地化級別所對應的節點上,比如將task分配到離它要計算的數據比較近的一個節點,然后進行計算,如果當前級別仍然不行,那么繼續降級。

task要處理的數據不在task所在節點上時,會發生數據的傳輸。task會通過所在節點的BlockManager獲取數據,BlockManager發現數據不在本地時,戶通過網絡傳輸組件從數據所在節點的BlockManager處獲取數據。

網絡傳輸數據的情況是我們不願意看到的,大量的網絡傳輸會嚴重影響性能,因此,我們希望通過調節本地化等待時長,如果在等待時長這段時間內,目標節點處理完成了一部分task,那么當前的task將有機會得到執行,這樣就能夠改善Spark作業的整體性能。

Spark的本地化等級如表2-3所示:

2-3 Spark本地化等級

名稱

解析

PROCESS_LOCAL

進程本地化,task和數據在同一個Executor中,性能最好。

NODE_LOCAL

節點本地化,task和數據在同一個節點中,但是task和數據不在同一個Executor中,數據需要在進程間進行傳輸。

RACK_LOCAL

機架本地化,task和數據在同一個機架的兩個節點上,數據需要通過網絡在節點之間進行傳輸。

NO_PREF

對於task來說,從哪里獲取都一樣,沒有好壞之分。

ANY

task和數據可以在集群的任何地方,而且不在一個機架中,性能最差。

Spark項目開發階段,可以使用client模式對程序進行測試,此時,可以在本地看到比較全的日志信息,日志信息中有明確的task數據本地化的級別,如果大部分都是PROCESS_LOCAL,那么就無需進行調節,但是如果發現很多的級別都是NODE_LOCAL、ANY,那么需要對本地化的等待時長進行調節,通過延長本地化等待時長,看看task的本地化級別有沒有提升,並觀察Spark作業的運行時間有沒有縮短。

注意,過猶不及,不要將本地化等待時長延長地過長,導致因為大量的等待時長,使得Spark作業的運行時間反而增加了。

Spark本地化等待時長的設置如代碼清單2-5所示:

代碼清單2-5 Spark本地化等待時長設置示例

val conf = new SparkConf()

  .set("spark.locality.wait", "6")

1.2 算子調優

1.2.1 算子調優一:mapPartitions

普通的map算子對RDD中的每一個元素進行操作,而mapPartitions算子對RDD中每一個分區進行操作。如果是普通的map算子,假設一個partition有1萬條數據,那么map算子中的function要執行1萬次,也就是對每個元素進行操作。

 

2-3 map算子

如果是mapPartition算子,由於一個task處理一個RDD的partition,那么一個task只會執行一次function,function一次接收所有的partition數據,效率比較高。

 

2-4 mapPartitions算子

比如,當要把RDD中的所有數據通過JDBC寫入數據,如果使用map算子,那么需要對RDD中的每一個元素都創建一個數據庫連接,這樣對資源的消耗很大,如果使用mapPartitions算子,那么針對一個分區的數據,只需要建立一個數據庫連接。

mapPartitions算子也存在一些缺點:對於普通的map操作,一次處理一條數據,如果在處理了2000條數據后內存不足,那么可以將已經處理完的2000條數據從內存中垃圾回收掉;但是如果使用mapPartitions算子,但數據量非常大時,function一次處理一個分區的數據,如果一旦內存不足,此時無法回收內存,就可能會OOM,即內存溢出。

因此,mapPartitions算子適用於數據量不是特別大的時候,此時使用mapPartitions算子對性能的提升效果還是不錯的。(當數據量很大的時候,一旦使用mapPartitions算子,就會直接OOM)

在項目中,應該首先估算一下RDD的數據量、每個partition的數據量,以及分配給每個Executor的內存資源,如果資源允許,可以考慮使用mapPartitions算子代替map。

1.2.2 算子調優二:foreachPartition優化數據庫操作

在生產環境中,通常使用foreachPartition算子來完成數據庫的寫入,通過foreachPartition算子的特性,可以優化寫數據庫的性能。

如果使用foreach算子完成數據庫的操作,由於foreach算子是遍歷RDD的每條數據,因此,每條數據都會建立一個數據庫連接,這是對資源的極大浪費,因此,對於寫數據庫操作,我們應當使用foreachPartition算子。

mapPartitions算子非常相似,foreachPartition是將RDD的每個分區作為遍歷對象,一次處理一個分區的數據,也就是說,如果涉及數據庫的相關操作,一個分區的數據只需要創建一次數據庫連接,如圖2-5所示:

 

2-5 foreachPartition算子

使用了foreachPartition算子后,可以獲得以下的性能提升:

1. 對於我們寫的function函數,一次處理一整個分區的數據;

2. 對於一個分區內的數據,創建唯一的數據庫連接;

3. 只需要向數據庫發送一次SQL語句和多組參數;

在生產環境中,全部都會使用foreachPartition算子完成數據庫操作-foreachPartition算子存在一個問題,與mapPartitions算子類似,如果一個分區的數據量特別大,可能會造成OOM,即內存溢出。

1.2.3 算子調優三:filter與coalesce的配合使用

Spark任務中我們經常會使用filter算子完成RDD中數據的過濾,在任務初始階段,從各個分區中加載到的數據量是相近的,但是一旦進過filter過濾后,每個分區的數據量有可能會存在較大差異,如圖2-6所示:

 

2-6 分區數據過濾結果

根據圖2-6我們可以發現兩個問題:

1. 每個partition的數據量變小了,如果還按照之前與partition相等的task個數去處理當前數據,有點浪費task的計算資源;

2. 每個partition的數據量不一樣,會導致后面的每個task處理每個partition數據的時候,每個task要處理的數據量不同,這很有可能導致數據傾斜問題。

如圖2-6所示,第二個分區的數據過濾后只剩100條,而第三個分區的數據過濾后剩下800條,在相同的處理邏輯下,第二個分區對應的task處理的數據量與第三個分區對應的task處理的數據量差距達到了8倍,這也會導致運行速度可能存在數倍的差距,這也就是數據傾斜問題。

針對上述的兩個問題,我們分別進行分析:

1. 針對第一個問題,既然分區的數據量變小了,我們希望可以對分區數據進行重新分配,比如將原來4個分區的數據轉化到2個分區中,這樣只需要用后面的兩個task進行處理即可,避免了資源的浪費。

2. 針對第二個問題,解決方法和第一個問題的解決方法非常相似,對分區數據重新分配,讓每個partition中的數據量差不多,這就避免了數據傾斜問題。

那么具體應該如何實現上面的解決思路?我們需要coalesce算子。

repartition與coalesce都可以用來進行重分區,其中repartition只是coalesce接口中shuffle為true的簡易實現,coalesce默認情況下不進行shuffle,但是可以通過參數進行設置。

假設我們希望將原本的分區個數A通過重新分區變為B,那么有以下幾種情況:

  1. A > B(多數分區合並為少數分區)

① AB相差值不大

此時使用coalesce即可,無需shuffle過程。

② AB相差值很大

此時可以使用coalesce並且不啟用shuffle過程,但是會導致合並過程性能低下,所以推薦設置coalesce的第二個參數為true,即啟動shuffle過程。

  1. A < B(少數分區分解為多數分區)

此時使用repartition即可,如果使用coalesce需要將shuffle設置為true,否則coalesce無效。

我們可以在filter操作之后,使用coalesce算子針對每個partition的數據量各不相同的情況,壓縮partition的數量,而且讓每個partition的數據量盡量均勻緊湊,以便於后面的task進行計算操作,在某種程度上能夠在一定程度上提升性能。

注意:local模式是進程內模擬集群運行,已經對並行度和分區數量有了一定的內部優化,因此不用去設置並行度和分區數量。

1.2.4 算子調優四:repartition解決SparkSQL低並行度問題

Spark SQL的並行度不允許用戶自己指定,Spark SQL自己會默認根據hive表對應的HDFS文件的block個數自動設置Spark SQL所在的那個stage的並行度,但有時此默認並行度過低,導致任務運行緩慢。

由於Spark SQL所在stage的並行度無法手動設置,如果數據量較大,並且此stage中后續的transformation操作有着復雜的業務邏輯,而Spark SQL自動設置的task數量很少,這就意味着每個task要處理為數不少的數據量,然后還要執行非常復雜的處理邏輯,這就可能表現為第一個有Spark SQLstage速度很慢,而后續的沒有Spark SQLstage運行速度非常快。

為了解決Spark SQL無法設置並行度和task數量的問題,我們可以使用repartition算子。

 

2-7 repartition算子使用前后對比圖

Spark SQL這一步的並行度和task數量肯定是沒有辦法去改變了,但是,對於Spark SQL查詢出來的RDD,立即使用repartition算子,去重新進行分區,這樣可以重新分區為多個partition,從repartition之后的RDD操作,由於不再設計Spark SQL,因此stage的並行度就會等於你手動設置的值,這樣就避免了Spark SQL所在的stage只能用少量的task去處理大量數據並執行復雜的算法邏輯。使用repartition算子的前后對比如圖2-7所示。

1.2.5 算子調優五:reduceByKey本地聚合

reduceByKey相較於普通的shuffle操作一個顯著的特點就是會進行map端的本地聚合map端會先對本地的數據進行combine操作,然后將數據寫入給下個stage的每個task創建的文件中,也就是在map端,對每一個key對應的value,執行reduceByKey算子函數。reduceByKey算子的執行過程如圖2-8所示:

 

2-8 reduceByKey算子執行過程

使用reduceByKey對性能的提升如下:

  1. 本地聚合后,在map端的數據量變少,減少了磁盤IO,也減少了對磁盤空間的占用;
  2. 本地聚合后,下一個stage拉取的數據量變少,減少了網絡傳輸的數據量;
  3. 本地聚合后,在reduce端進行數據緩存的內存占用減少;
  4. 本地聚合后,在reduce端進行聚合的數據量減少。

基於reduceByKey的本地聚合特征,我們應該考慮使用reduceByKey代替其他的shuffle算子,例如groupByKeyreduceByKeygroupByKey的運行原理如圖2-9和圖2-10所示:

 

2-9 groupByKey原理

 

2-10 reduceByKey原理

根據上圖可知,groupByKey不會進行map端的聚合,而是將所有map端的數據shuffle到reduce端,然后在reduce端進行數據的聚合操作。由於reduceByKeymap端聚合的特性,使得網絡傳輸的數據量減小,因此效率要明顯高於groupByKey。

1.3 Shuffle調優

1.3.1 Shuffle調優一:調節map端緩沖區大小

Spark任務運行過程中,如果shuffle的map端處理的數據量比較大,但是map端緩沖的大小是固定的,可能會出現map端緩沖數據頻繁spill溢寫到磁盤文件中的情況,使得性能非常低下,通過調節map端緩沖的大小,可以避免頻繁的磁盤IO操作,進而提升Spark任務的整體性能。

map端緩沖的默認配置是32KB,如果每個task處理640KB的數據,那么會發生640/32 = 20次溢寫,如果每個task處理64000KB的數據,機會發生64000/32=2000此溢寫,這對於性能的影響是非常嚴重的。

map端緩沖的配置方法如代碼清單2-7所示:

代碼清單2-7 map端緩沖配置

val conf = new SparkConf()

  .set("spark.shuffle.file.buffer", "64")

1.3.2 Shuffle調優二:調節reduce端拉取數據緩沖區大小

Spark Shuffle過程中,shuffle reduce task的buffer緩沖區大小決定了reduce task每次能夠緩沖的數據量,也就是每次能夠拉取的數據量,如果內存資源較為充足,適當增加拉取數據緩沖區的大小,可以減少拉取數據的次數,也就可以減少網絡傳輸的次數,進而提升性能。

reduce端數據拉取緩沖區的大小可以通過spark.reducer.maxSizeInFlight參數進行設置,默認為48MB,該參數的設置方法如代碼清單2-8所示:

代碼清單2-8 reduce端數據拉取緩沖區配置

val conf = new SparkConf()

  .set("spark.reducer.maxSizeInFlight", "96")

1.3.3 Shuffle調優三:調節reduce端拉取數據重試次數

Spark Shuffle過程中,reduce task拉取屬於自己的數據時,如果因為網絡異常等原因導致失敗會自動進行重試。對於那些包含了特別耗時的shuffle操作的作業,建議增加重試最大次數(比如60次),以避免由於JVM的full gc或者網絡不穩定等因素導致的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~上百億)的shuffle過程,調節該參數可以大幅度提升穩定性。

reduce端拉取數據重試次數可以通過spark.shuffle.io.maxRetries參數進行設置,該參數就代表了可以重試的最大次數。如果在指定次數之內拉取還是沒有成功,就可能會導致作業執行失敗,默認為3,該參數的設置方法如代碼清單2-9所示:

代碼清單2-9 reduce端拉取數據重試次數配置

val conf = new SparkConf()

  .set("spark.shuffle.io.maxRetries", "6")

1.3.4 Shuffle調優四:調節reduce端拉取數據等待間隔

Spark Shuffle過程中,reduce task拉取屬於自己的數據時,如果因為網絡異常等原因導致失敗會自動進行重試,在一次失敗后,會等待一定的時間間隔再進行重試,可以通過加大間隔時長(比如60s),以增加shuffle操作的穩定性。

reduce端拉取數據等待間隔可以通過spark.shuffle.io.retryWait參數進行設置,默認值為5s,該參數的設置方法如代碼清單2-10所示:

代碼清單2-10 reduce端拉取數據等待間隔配置

val conf = new SparkConf()

  .set("spark.shuffle.io.retryWait", "10s")

1.3.5 Shuffle調優五:調節SortShuffle排序操作閾值

對於SortShuffleManager,如果shuffle reduce task的數量小於某一閾值則shuffle write過程中不會進行排序操作,而是直接按照未經優化的HashShuffleManager的方式去寫數據,但是最后會將每個task產生的所有臨時磁盤文件都合並成一個文件,並會創建單獨的索引文件。

當你使用SortShuffleManager時,如果的確不需要排序操作,那么建議將這個參數調大一些,大於shuffle read task的數量,那么此時map-side就不會進行排序了,減少了排序的性能開銷,但是這種方式下,依然會產生大量的磁盤文件,因此shuffle write性能有待提高。

SortShuffleManager排序操作閾值的設置可以通過spark.shuffle.sort. bypassMergeThreshold這一參數進行設置,默認值為200,該參數的設置方法如代碼清單2-11所示:

代碼清單2-10 reduce端拉取數據等待間隔配置

val conf = new SparkConf()

  .set("spark.shuffle.sort.bypassMergeThreshold", "400")

1.4 JVM調優

對於JVM調優,首先應該明確,full gc/minor gc,都會導致JVM的工作線程停止工作,即stop the world。

1.4.1 JVM調優一:降低cache操作的內存占比

1. 靜態內存管理機制

根據Spark靜態內存管理機制,堆內存被划分為了兩塊,Storage和Execution。Storage主要用於緩存RDD數據和broadcast數據,Execution主要用於緩存在shuffle過程中產生的中間數據,Storage占系統內存的60%,Execution占系統內存的20%,並且兩者完全獨立。

在一般情況下,Storage的內存都提供給了cache操作,但是如果在某些情況下cache操作內存不是很緊張,而task的算子中創建的對象很多,Execution內存又相對較小,這回導致頻繁的minor gc,甚至於頻繁的full gc,進而導致Spark頻繁的停止工作,性能影響會很大。

Spark UI中可以查看每個stage的運行情況,包括每個task的運行時間、gc時間等等,如果發現gc太頻繁,時間太長,就可以考慮調節Storage的內存占比,讓task執行算子函數式,有更多的內存可以使用。

Storage內存區域可以通過spark.storage.memoryFraction參數進行指定,默認為0.6,即60%,可以逐級向下遞減,如代碼清單2-6所示:

代碼清單2-6 Storage內存占比設置

val conf = new SparkConf()

  .set("spark.storage.memoryFraction", "0.4")

2. 統一內存管理機制

根據Spark統一內存管理機制,堆內存被划分為了兩塊,Storage和Execution。Storage主要用於緩存數據,Execution主要用於緩存在shuffle過程中產生的中間數據,兩者所組成的內存部分稱為統一內存,Storage和Execution各占統一內存的50%,由於動態占用機制的實現,shuffle過程需要的內存過大時,會自動占用Storage的內存區域,因此無需手動進行調節。

1.4.2 JVM調優二:調節Executor堆外內存 

Executor的堆外內存主要用於程序的共享庫、Perm Space、 線程Stack和一些Memory mapping等, 或者類C方式allocate object。

有時,如果你的Spark作業處理的數據量非常大,達到幾億的數據量,此時運行Spark作業會時不時地報錯,例如shuffle output file cannot find,executor lost,task lost,out of memory等,這可能是Executor的堆外內存不太夠用,導致Executor在運行的過程中內存溢出。

stage的task在運行的時候,可能要從一些Executor中去拉取shuffle map output文件,但是Executor可能已經由於內存溢出掛掉了,其關聯的BlockManager也沒有了,這就可能會報出shuffle output file cannot find,executor lost,task lost,out of memory等錯誤,此時,就可以考慮調節一下Executor的堆外內存,也就可以避免報錯,與此同時,堆外內存調節的比較大的時候,對於性能來講,也會帶來一定的提升。

默認情況下,Executor堆外內存上限大概為300多MB,在實際的生產環境下,對海量數據進行處理的時候,這里都會出現問題,導致Spark作業反復崩潰,無法運行,此時就會去調節這個參數,到至少1G,甚至於2G、4G。

Executor堆外內存的配置需要在spark-submit腳本里配置,如代碼清單2-7所示:

代碼清單2-7 Executor堆外內存配置

--conf spark.yarn.executor.memoryOverhead=2048

以上參數配置完成后,會避免掉某些JVM OOM的異常問題,同時,可以提升整體Spark作業的性能。

1.4.3 JVM調優三:調節連接等待時長

Spark作業運行過程中,Executor優先從自己本地關聯的BlockManager中獲取某份數據,如果本地BlockManager沒有的話,會通過TransferService遠程連接其他節點上Executor的BlockManager來獲取數據。

如果task在運行過程中創建大量對象或者創建的對象較大,會占用大量的內存,這回導致頻繁的垃圾回收,但是垃圾回收會導致工作現場全部停止,也就是說,垃圾回收一旦執行,Spark的Executor進程就會停止工作,無法提供相應,此時,由於沒有響應,無法建立網絡連接,會導致網絡連接超時。

在生產環境下,有時會遇到file not found、file lost這類錯誤,在這種情況下,很有可能是Executor的BlockManager在拉取數據的時候,無法建立連接,然后超過默認的連接等待時長60s后,宣告數據拉取失敗,如果反復嘗試都拉取不到數據,可能會導致Spark作業的崩潰。這種情況也可能會導致DAGScheduler反復提交幾次stage,TaskScheduler返回提交幾次task,大大延長了我們的Spark作業的運行時間。

此時,可以考慮調節連接的超時時長,連接等待時長需要在spark-submit腳本中進行設置,設置方式如代碼清單2-8所示:

代碼清單2-8 連接等待時長配置

--conf spark.core.connection.ack.wait.timeout=300

調節連接等待時長后,通常可以避免部分的XX文件拉取失敗、XX文件lost等報錯。

第二章 Spark 數據傾斜

Spark中的數據傾斜問題主要指shuffle過程中出現的數據傾斜問題,是由於不同的key對應的數據量不同導致的不同task所處理的數據量不同的問題。

例如,reduce點一共要處理100萬條數據,第一個和第二個task分別被分配到了1萬條數據,計算5分鍾內完成,第三個task分配到了98萬數據,此時第三個task可能需要10個小時完成,這使得整個Spark作業需要10個小時才能運行完成,這就是數據傾斜所帶來的后果。

注意,要區分開數據傾斜與數據量過量這兩種情況,數據傾斜是指少數task被分配了絕大多數的數據,因此少數task運行緩慢;數據過量是指所有task被分配的數據量都很大,相差不多,所有task都運行緩慢。

數據傾斜的表現:

1. Spark作業的大部分task都執行迅速,只有有限的幾個task執行的非常慢,此時可能出現了數據傾斜,作業可以運行,但是運行得非常慢;

2. Spark作業的大部分task都執行迅速,但是有的task在運行過程中會突然報出OOM,反復執行幾次都在某一個task報出OOM錯誤,此時可能出現了數據傾斜,作業無法正常運行。

定位數據傾斜問題:

1. 查閱代碼中的shuffle算子,例如reduceByKey、countByKey、groupByKeyjoin等算子,根據代碼邏輯判斷此處是否會出現數據傾斜;

2. 查看Spark作業的log文件,log文件對於錯誤的記錄會精確到代碼的某一行,可以根據異常定位到的代碼位置來明確錯誤發生在第幾個stage,對應的shuffle算子是哪一個;

2.1 解決方案一:聚合原數據

  1. 避免shuffle過程

絕大多數情況下,Spark作業的數據來源都是Hive表,這些Hive表基本都是經過ETL之后的昨天的數據。

為了避免數據傾斜,我們可以考慮避免shuffle過程,如果避免了shuffle過程,那么從根本上就消除了發生數據傾斜問題的可能。

如果Spark作業的數據來源於Hive表,那么可以先在Hive表中對數據進行聚合,例如按照key進行分組,將同一key對應的所有value用一種特殊的格式拼接到一個字符串里去,這樣,一個key就只有一條數據了;之后,對一個key的所有value進行處理時,只需要進行map操作即可,無需再進行任何的shuffle操作。通過上述方式就避免了執行shuffle操作,也就不可能會發生任何的數據傾斜問題。

對於Hive表中數據的操作,不一定是拼接成一個字符串,也可以是直接對key的每一條數據進行累計計算。

2.2 解決方案二:過濾導致傾斜的key

如果在Spark作業中允許丟棄某些數據,那么可以考慮將可能導致數據傾斜的key進行過濾,濾除可能導致數據傾斜的key對應的數據,這樣,在Spark作業中就不會發生數據傾斜了。

2.3 解決方案三:提高shuffle操作中的reduce並行度

當方案一和方案二對於數據傾斜的處理沒有很好的效果時,可以考慮提高shuffle過程中的reduce端並行度,reduce端並行度的提高就增加了reduce端task的數量,那么每個task分配到的數據量就會相應減少,由此緩解數據傾斜問題。

  1. reduce端並行度的設置

在大部分的shuffle算子中,都可以傳入一個並行度的設置參數,比如reduceByKey(500),這個參數會決定shuffle過程中reduce端的並行度,在進行shuffle操作的時候,就會對應着創建指定數量的reduce task。對於Spark SQL中的shuffle類語句,比如group by、join等,需要設置一個參數,即spark.sql.shuffle.partitions,該參數代表了shuffle read task的並行度,該值默認是200,對於很多場景來說都有點過小。

增加shuffle read task的數量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數據。舉例來說,如果原本有5個key,每個key對應10條數據,這5個key都是分配給一個task的,那么這個task就要處理50條數據。而增加了shuffle read task以后,每個task就分配到一個key,即每個task就處理10條數據,那么自然每個task的執行時間都會變短了。

  1. reduce端並行度設置存在的缺陷

提高reduce端並行度並沒有從根本上改變數據傾斜的本質和問題(方案一和方案二從根本上避免了數據傾斜的發生),只是盡可能地去緩解和減輕shuffle reduce task的數據壓力,以及數據傾斜的問題,適用於有較多key對應的數據量都比較大的情況。

該方案通常無法徹底解決數據傾斜,因為如果出現一些極端情況,比如某個key對應的數據量有100萬,那么無論你的task數量增加到多少,這個對應着100萬數據的key肯定還是會分配到一個task中去處理,因此注定還是會發生數據傾斜的。所以這種方案只能說是在發現數據傾斜時嘗試使用的第一種手段,嘗試去用嘴簡單的方法緩解數據傾斜而已,或者是和其他方案結合起來使用。

在理想情況下,reduce端並行度提升后,會在一定程度上減輕數據傾斜的問題,甚至基本消除數據傾斜;但是,在一些情況下,只會讓原來由於數據傾斜而運行緩慢的task運行速度稍有提升,或者避免了某些task的OOM問題,但是,仍然運行緩慢,此時,要及時放棄方案三,開始嘗試后面的方案。

2.4 解決方案四:使用隨機key實現雙重聚合

當使用了類似於groupByKey、reduceByKey這樣的算子時,可以考慮使用隨機key實現雙重聚合,如圖3-1所示:

 

3-1 隨機key實現雙重聚合

首先,通過map算子給每個數據的key添加隨機數前綴,對key進行打散,將原先一樣的key變成不一樣的key,然后進行第一次聚合,這樣就可以讓原本被一個task處理的數據分散到多個task上去做局部聚合;隨后,去除掉每個key的前綴,再次進行聚合。

此方法對於由groupByKey、reduceByKey這類算子造成的數據傾斜由比較好的效果,僅僅適用於聚合類的shuffle操作,適用范圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案。

此方法也是前幾種方案沒有比較好的效果時要嘗試的解決方案。

2.5 解決方案五:將reduce join轉換為map join

正常情況下,join操作都會執行shuffle過程,並且執行的是reduce join,也就是先將所有相同的key和對應的value匯聚到一個reduce task中,然后再進行join。普通join的過程如下圖所示:

 

3-2 普通join過程

普通的join是會走shuffle過程的,而一旦shuffle,就相當於會將相同key的數據拉取到一個shuffle read task中再進行join,此時就是reduce join。但是如果一個RDD是比較小的,則可以采用廣播小RDD全量數據+map算子來實現與join同樣的效果,也就是map join,此時就不會發生shuffle操作,也就不會發生數據傾斜。

注意,RDD是並不能進行廣播的,只能將RDD內部的數據通過collect拉取到Driver內存然后再進行廣播

  1. 核心思路:

不使用join算子進行連接操作,而使用Broadcast變量與map類算子實現join操作,進而完全規避掉shuffle類的操作,徹底避免數據傾斜的發生和出現。將較小RDD中的數據直接通過collect算子拉取到Driver端的內存中來,然后對其創建一個Broadcast變量;接着對另外一個RDD執行map類算子,在算子函數內,從Broadcast變量中獲取較小RDD的全量數據,與當前RDD的每一條數據按照連接key進行比對,如果連接key相同的話,那么就將兩個RDD的數據用你需要的方式連接起來。

根據上述思路,根本不會發生shuffle操作,從根本上杜絕了join操作可能導致的數據傾斜問題。

join操作有數據傾斜問題並且其中一個RDD的數據量較小時,可以優先考慮這種方式,效果非常好。map join的過程如圖3-3所示:

 

3-3 map join過程

  1. 不適用場景分析:

由於Spark的廣播變量是在每個Executor中保存一個副本,如果兩個RDD數據量都比較大,那么如果將一個數據量比較大的 RDD做成廣播變量,那么很有可能會造成內存溢出。

2.6 解決方案六:sample采樣對傾斜key單獨進行join

Spark中,如果某個RDD只有一個key,那么在shuffle過程中會默認將此key對應的數據打散,由不同的reduce端task進行處理。

由單個key導致數據傾斜時,可有將發生數據傾斜的key單獨提取出來,組成一個RDD,然后用這個原本會導致傾斜的key組成的RDD根其他RDD單獨join,此時,根據Spark的運行機制,此RDD中的數據會在shuffle階段被分散到多個task中去進行join操作。傾斜key單獨join的流程如圖3-4所示:

 

3-4 傾斜key單獨join流程

1. 適用場景分析:

對於RDD中的數據,可以將其轉換為一個中間表,或者是直接使用countByKey()的方式,看一個這個RDD中各個key對應的數據量,此時如果你發現整個RDD就一個key的數據量特別多,那么就可以考慮使用這種方法。

當數據量非常大時,可以考慮使用sample采樣獲取10%的數據,然后分析這10%的數據中哪個key可能會導致數據傾斜,然后將這個key對應的數據單獨提取出來。

2. 不適用場景分析:

如果一個RDD中導致數據傾斜的key很多,那么此方案不適用。

2.7 解決方案七:使用隨機數以及擴容進行join

如果在進行join操作時,RDD中有大量key導致數據傾斜,那么進行分拆key也沒什么意義,此時就只能使用最后一種方案來解決問題了,對於join操作,我們可以考慮對其中一個RDD數據進行擴容,另一個RDD進行稀釋后再join。

我們會將原先一樣的key通過附加隨機前綴變成不一樣的key,然后就可以將這些處理后的“不同key”分散到多個task中去處理,而不是讓一個task處理大量的相同key。這一種方案是針對有大量傾斜key的情況,沒法將部分key拆分出來進行單獨處理,需要對整個RDD進行數據擴容,對內存資源要求很高。

1. 核心思想:

選擇一個RDD,使用flatMap進行擴容,對每條數據的key添加數值前綴(1~N的數值),將一條數據映射為多條數據;(擴容)

選擇另外一個RDD,進行map映射操作,每條數據的key都打上一個隨機數作為前綴(1~N的隨機數);(稀釋)

將兩個處理后的RDD,進行join操作。

 

3-6 使用隨機數以及擴容進行join

2. 局限性:

如果兩個RDD都很大,那么將RDD進行N倍的擴容顯然行不通;

使用擴容的方式只能緩解數據傾斜,不能徹底解決數據傾斜問題。

  1. 使用方案七對方案六進一步優化分析:

RDD中有幾個key導致數據傾斜時,方案六不再適用,而方案七又非常消耗資源,此時可以引入方案七的思想完善方案六:

1. 對包含少數幾個數據量過大的key的那個RDD,通過sample算子采樣出一份樣本來,然后統計一下每個key的數量,計算出來數據量最大的是哪幾個key。

2. 然后將這幾個key對應的數據從原來的RDD中拆分出來,形成一個單獨的RDD,並給每個key都打上n以內的隨機數作為前綴,而不會導致傾斜的大部分key形成另外一個RDD。

3. 接着將需要join的另一個RDD,也過濾出來那幾個傾斜key對應的數據並形成一個單獨的RDD,將每條數據膨脹成n條數據,這n條數據都按順序附加一個0~n的前綴,不會導致傾斜的大部分key也形成另外一個RDD。

4. 再將附加了隨機前綴的獨立RDD與另一個膨脹n倍的獨立RDD進行join,此時就可以將原先相同的key打散成n份,分散到多個task中去進行join了。

5. 而另外兩個普通的RDD就照常join即可。

6. 最后將兩次join的結果使用union算子合並起來即可,就是最終的join結果。

第三章 Spark Troubleshooting

3.1 故障排除一:控制reduce端緩沖大小以避免OOM

Shuffle過程,reduce端task並不是等到map端task將其數據全部寫入磁盤后再去拉取,而是map端寫一點數據,reduce端task就會拉取一小部分數據,然后立即進行后面的聚合、算子函數的使用等操作。

reduce端task能夠拉取多少數據,由reduce拉取數據的緩沖區buffer來決定,因為拉取過來的數據都是先放在buffer中,然后再進行后續的處理,buffer的默認大小為48MB。

reducetask會一邊拉取一邊計算,不一定每次都會拉滿48MB的數據,可能大多數時候拉取一部分數據就處理掉了。

雖然說增大reduce端緩沖區大小可以減少拉取次數,提升Shuffle性能,但是有時map端的數據量非常大,寫出的速度非常快,此時reduce端的所有task在拉取的時候,有可能全部達到自己緩沖的最大極限值,即48MB,此時,再加上reduce端執行的聚合函數的代碼,可能會創建大量的對象,這可難會導致內存溢出,即OOM。

如果一旦出現reduce端內存溢出的問題,我們可以考慮減小reduce端拉取數據緩沖區的大小,例如減少為12MB。

在實際生產環境中是出現過這種問題的,這是典型的以性能換執行的原理。reduce端拉取數據的緩沖區減小,不容易導致OOM,但是相應的,reudce端的拉取次數增加,造成更多的網絡傳輸開銷,造成性能的下降。

注意,要保證任務能夠運行,再考慮性能的優化。

3.2 故障排除二:JVM GC導致的shuffle文件拉取失敗

Spark作業中,有時會出現shuffle file not found的錯誤,這是非常常見的一個報錯,有時出現這種錯誤以后,選擇重新執行一遍,就不再報出這種錯誤。

出現上述問題可能的原因是Shuffle操作中,后面stage的task想要去上一個stage的task所在的Executor拉取數據,結果對方正在執行GC,執行GC會導致Executor內所有的工作現場全部停止,比如BlockManager、基於netty的網絡通信等,這就會導致后面的task拉取數據拉取了半天都沒有拉取到,就會報出shuffle file not found的錯誤,而第二次再次執行就不會再出現這種錯誤。

可以通過調整reduce端拉取數據重試次數和reduce端拉取數據時間間隔這兩個參數來對Shuffle性能進行調整,增大參數值,使得reduce端拉取數據的重試次數增加,並且每次失敗后等待的時間間隔加長。

代碼清單4-1 JVM GC導致的shuffle文件拉取失敗

val conf = new SparkConf()

  .set("spark.shuffle.io.maxRetries", "6")

  .set("spark.shuffle.io.retryWait", "6s")

3.3 故障排除三:解決各種序列化導致的報錯

Spark作業在運行過程中報錯,而且報錯信息中含有Serializable等類似詞匯,那么可能是序列化問題導致的報錯。

序列化問題要注意以下三點:

  1. 作為RDD的元素類型的自定義類,必須是可以序列化的;
  2. 算子函數里可以使用的外部的自定義變量,必須是可以序列化的;
  3. 不可以在RDD的元素類型、算子函數里使用第三方的不支持序列化的類型,例如Connection。

3.4 故障排除四:解決算子函數返回NULL導致的問題

在一些算子函數里,需要我們有一個返回值,但是在一些情況下我們不希望有返回值,此時我們如果直接返回NULL,會報錯,例如Scala.Math(NULL)異常。

如果你遇到某些情況,不希望有返回值,那么可以通過下述方式解決:

  1. 返回特殊值,不返回NULL,例如“-1”;

2. 在通過算子獲取到了一個RDD之后,可以對這個RDD執行filter操作,進行數據過濾,將數值為-1的數據給過濾掉;

3. 在使用完filter算子后,繼續調用coalesce算子進行優化。

3.5 故障排除五:YARN-CLIENT模式導致的網卡流量激增問題

YARN-client模式的運行原理如下圖所示:

 

4-1 YARN-client模式運行原理

YARN-client模式下,Driver啟動在本地機器上,而Driver負責所有的任務調度,需要與YARN集群上的多個Executor進行頻繁的通信。

假設有100個Executor, 1000個task,那么每個Executor分配到10個task,之后,Driver要頻繁地跟Executor上運行的1000個task進行通信,通信數據非常多,並且通信品類特別高。這就導致有可能在Spark任務運行過程中,由於頻繁大量的網絡通訊,本地機器的網卡流量會激增。

注意,YARN-client模式只會在測試環境中使用,而之所以使用YARN-client模式,是由於可以看到詳細全面的log信息,通過查看log,可以鎖定程序中存在的問題,避免在生產環境下發送故障。

在生產環境下,使用的一定是YARN-cluster模式。在YARN-cluster模式下,就不會造成本地機器網卡流量激增問題,如果YARN-cluster模式下存在網絡通信的問題,需要運維團隊進行解決。

3.6 故障排除六:YARN-CLUSTER模式的JVM棧內存溢出無法執行問題

YARN-cluster模式的運行原理如下圖所示:

 

4-1 YARN-client模式運行原理

Spark作業中包含SparkSQL的內容時,可能會碰到YARN-client模式下可以運行,但是YARN-cluster模式下無法提交運行(報出OOM錯誤)的情況。

YARN-client模式下,Driver是運行在本地機器上的,Spark使用的JVM的PermGen的配置(JDK1.8之前),是本地機器上的spark-class文件,JVM永久代的大小是128MB,這個是沒有問題的,但是在YARN-cluster模式下,Driver運行在YARN集群的某個節點上,使用的是沒有經過配置的默認設置,PermGen永久代大小為82MB。

SparkSQL的內部要進行很復雜的SQL的語義解析、語法樹轉換等等,非常復雜,如果sql語句本身就非常復雜,那么很有可能會導致性能的損耗和內存的占用,特別是對PermGen的占用會比較大。

所以,此時如果PermGen的占用好過了82MB,但是又小於128MB,就會出現YARN-client模式下可以運行,YARN-cluster模式下無法運行的情況。

解決上述問題的方法時增加PermGen的容量,需要在spark-submit腳本中對相關參數進行設置,設置方法如代碼清單4-2所示。

代碼清單4-2 配置

--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"

通過上述方法就設置了Driver永久代的大小,默認為128MB,最大256MB,這樣就可以避免上面所說的問題。

3.7 故障排除七:解決SparkSQL導致的JVM棧內存溢出

SparkSQLsql語句有成百上千的or關鍵字時,就可能會出現Driver端的JVM棧內存溢出。

JVM棧內存溢出基本上就是由於調用的方法層級過多,產生了大量的,非常深的,超出了JVM棧深度限制的遞歸。(我們猜測SparkSQL有大量or語句的時候,在解析SQL時,例如轉換為語法樹或者進行執行計划的生成的時候,對於or的處理是遞歸,or非常多時,會發生大量的遞歸)

此時,建議將一條sql語句拆分為多條sql語句來執行,每條sql語句盡量保證100個以內的子句。根據實際的生產環境試驗,一條sql語句的or關鍵字控制在100個以內,通常不會導致JVM棧內存溢出。

3.8 故障排除八:持久化后的RDD數據丟失

Spark持久化在大部分情況下是沒有問題的,但是有時數據可能會丟失,如果數據一旦丟失,就需要對丟失的數據重新進行計算,計算完后再緩存和使用,為了避免數據的丟失,可以選擇對這個RDD進行checkpoint,也就是將數據持久化一份到容錯的文件系統上(比如HDFS)。

一個RDD緩存並checkpoint后,如果一旦發現緩存丟失,就會優先查看checkpoint數據存不存在,如果有,就會使用checkpoint數據,而不用重新計算。也即是說,checkpoint可以視為cache的保障機制,如果cache失敗,就使用checkpoint的數據。

使用checkpoint的優點在於提高了Spark作業的可靠性,一旦緩存出現問題,不需要重新計算數據,缺點在於,checkpoint時需要將數據寫入HDFS等文件系統,對性能的消耗較大。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM