Spark實踐 -- 性能優化基礎


  1. 性能調優相關的原理講解、經驗總結;
  2. 掌握一整套Spark企業級性能調優解決方案;而不只是簡單的一些性能調優技巧。
  3. 針對寫好的spark作業,實施一整套數據傾斜解決方案:實際經驗中積累的數據傾斜現象的表現,以及處理后的效果總結。

調優前首先要對spark的作業流程清楚:

  • Driver到Executor的結構;
Master: Driver
    |-- Worker: Executor
            |-- job
                 |-- stage
                       |-- Task Task 
  • 一個Stage內,最終的RDD有多少個partition,就會產生多少個task,一個task處理一個partition的數據;
  • 作業划分為task分到Executor上,然后一個cpu core執行一個task;
  • BlockManager負責Executor,task的數據管理,task來它這里拿數據;

1.1 資源分配

性能調優的王道:分配更多資源。

  • 分配哪些資源? executor、cpu per executor、memory per executor、driver memory
  • 在哪里分配這些資源?
    在我們在生產環境中,提交spark作業時,用的spark-submit shell腳本,里面調整對應的參數
/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--driver-memory 1000m \     #driver的內存,影響不大,只要不出driver oom
--num-executors 3 \         #executor的數量
--executor-memory 100m \    #每個executor的內存大小
--executor-cores 3 \        #每個executor的cpu core數量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

如何調節資源分配

  • 第一種,Spark Standalone 模式下資源分配。

假如說一共20台機器,每台機器能使用4G內存,2個cpu core。

executor = 20,對應worker節點數量;
executor-memory = 4G內存,對應每個worker能用的最大內存;
executor-cores = 2,對應每個worker給每個executor能用的最多個cpu core。
  • 第二種,Yarn模式資源隊列下資源調度。

應該去查看spark作業,要提交到的資源隊列,大概有多少資源?
一個原則,你能使用的資源有多大,就盡量去調節到最大的大小(executor的數量,幾十個到上百個不等;

為什么調節資源以后,性能可以提升?

  • num-executors:增加executor個數

    如果executor數量比較少,能夠並行執行的task就少,Application的並行執行的能力就很弱。

  • executor-cores:增加每個executor的cpu core,增加了執行的並行能力

    原本20個executor,各有2個cpu core。能夠並行40個task。
    現在每個executor的cpu core,增加到了5個。就能夠並行執行100個task。
    執行的速度,提升了2.5倍。
    但不超過服務器的cpu core數,不然會waiting

  • executor-memory:增加每個executor的內存量

    增加了內存量以后,對性能的提升有以下幾點,但是不超過分配各每個worker的內存

    1. 如果需要對RDD進行cache,可以緩存更多的數據,將更少的數據寫入磁盤。
    2. 對於shuffle操作減少了磁盤IO,reduce端需要內存來存放拉取的數據並進行聚合。如果內存不夠,會寫入磁盤。
    3. 對於task的執行,會創建很多對象。如果內存比較小,可能會頻繁導致JVM堆內存滿了,然后頻繁GC,垃圾回收,minor GC和full GC。速度變慢。

1.2 調節並行度

並行度:Spark作業中根據寬窄依賴拆成多個stage,各個stage的task數量,也就代表了job在各個階段(stage)的並行度。

SparkConf conf = new SparkConf()
  .set("spark.default.parallelism", "500")

spark自己的算法給task選擇Executor,Execuotr進程里面包含着task線程
舉例(wordCount):
map階段:每個task根據去找到自己需要的數據寫到文件去處理。生成的文件一定是存放相同的key對應的values,相同key的values一定是進入同一個文件。
reduce階段:每個stage1的task,會去上面生成的文件拉取數據;拉取到的數據,一定是相同key對應的數據。對相同的key,對應的values,才能去執行我們自定義的function操作(_ + _)

假設資源調到上限了,如果不調節並行度,導致並行度過低,會怎么樣?

假設task設置了100個task。有50個executor,每個executor有3個cpu core。
則Application任何一個stage運行的時候,都有總數在150個cpu core,可以並行運行。
同時在運行的task只有100個。每個executor剩下的一個cpu core,並行度沒有與資源相匹配,就浪費掉了。

合理的並行度的設置,應該要設置到可以完全合理的利用你的集群資源;
比如上面的例子,總共集群有150個cpu core,可以並行運行150個task。
即可以同時並行運行,還可以讓每個task要處理的數據量變少。
最終,就是提升你的整個Spark作業的性能和運行速度。

官方是推薦,task數量,設置成spark application總cpu core數量的2~3倍,比如150個cpu core,基本要設置task數量為300~500;

因為有些task會運行的快一點,比如50s就完了,有些task,可能會慢一點,要1分半才運行完,如果task數量設置成cpu core總數的2~3倍,那么一個task運行完了以后,另一個task馬上可以補上來,就盡量不讓cpu core空閑,盡量提升spark作業運行的效率和速度,提升性能。

1.3 重構RDD架構以及RDD持久化

  1. RDD架構重構與優化:盡量復用RDD,差不多的RDD抽取為一個共同的RDD,供后面的RDD計算時,反復使用。
  2. 公共RDD一定要實現持久化,將RDD的數據緩存到內存中/磁盤中,(BlockManager),以后無論對這個RDD做多少次計算,那么都是直接取這個RDD的同一份數據。
  3. 持久化是可以進行序列化的,如果正常將數據持久化在內存中,可能會導致內存的占用過大,導致OOM。
  4. 為了數據的高可靠性,而且內存充足,可以使用雙副本機制,進行持久化。

1.4 大變量進行廣播,使用Kryo序列化,本地化等待時間

廣播變量
session分析模塊中隨機抽取部分,time2sessionsRDD.flatMapToPair(),取session2extractlistMap中對應時間的list。
task執行的算子flatMapToPair算子,使用了外部的變量session2extractlistMap,每個task都要通過網絡的傳輸獲取一份變量的副本占網絡資源占內存。

  • 在driver上會有一份初始的副本。
  • task在運行的時候,想要使用廣播變量中的數據,此時首先會在自己本地的Executor對應的BlockManager中(負責管理某個Executor對應的內存和磁盤上的數據),嘗試獲取變量副本;
  • 如果本地沒有,那么就從Driver遠程拉取變量副本,並保存在本地的BlockManager中;
  • 此后這個executor上的task,都會直接使用本地的BlockManager中的副本。
  • BlockManager除了從driver上拉取,也可能從其他節點的BlockManager上拉取變量副本,距離越近越好。

==============

優化序列化格式
默認情況下,Spark內部是使用Java的對象輸入輸出流序列化機制,ObjectOutputStream / ObjectInputStream
這種默認序列化機制的好處在於,處理起來比較方便;也不需要我們手動去做什么事情,只是,你在算子里面使用的變量,必須是實現Serializable接口的,可序列化即可。
但是默認的序列化機制的效率不高速度慢;序列化數據占用的內存空間大。

Spark支持使用Kryo序列化機制。
Kryo序列化機制,比默認的Java序列化機制,速度要快,序列化后的數據要更小,大概是Java序列化機制的1/10。讓網絡傳輸的數據變少;耗費的內存資源大大減少。

Kryo序列化機制,一旦啟用以后,會生效的幾個地方:
1、算子函數中使用到的外部變量
2、持久化RDD時進行序列化,StorageLevel.MEMORY_ONLY_SER
3、shuffle

第一步:

SparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

第二步,注冊你使用到的,需要通過Kryo序列化的自定義類。

SparkConf.registerKryoClasses(new Class[]{CategorySortKey.class});

Kryo要求,如果要達到它的最佳性能的話,那么就一定要注冊你自定義的類(比如,你的算子函數中使用到了外部自定義類型的對象變量,就要求必須注冊你的類,否則Kryo達不到最佳性能)。

============

fastutil優化
由於java的集合類型在每個數據中除了數據,還有元素的位置長度等都要占用了空間,所以一般不推薦使用集合,而是使用java數組。

fastutil是擴展了Java標准集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的類庫,提供了特殊類型的map、set、list和queue;能夠提供更小的內存占用,更快的存取速度。

Spark中應用fastutil的場景:
1、如果算子函數使用了外部變量是某種比較大的集合,那么可以考慮使用fastutil改寫外部變量,首先從源頭上就減少內存的占用,通過廣播變量進一步減少內存占用,再通過Kryo序列化類庫進一步減少內存占用。
2、在你的算子函數里,也就是task要執行的計算邏輯里面,要創建比較大的Map、List等集合,可以考慮將這些集合類型使用fastutil類庫重寫,減少task創建出來的集合類型的內存占用。

fastutil的使用,在pom.xml中引用fastutil的包

基本都是類似於IntList的格式,前綴就是集合的元素類型;特殊的就是Map,Int2IntMap,代表了key-value映射的元素類型。

==============

調節數據本地化等待時長

背景:所謂任務跟着數據跑,Driver對Application的每一個stage的task進行分配之前,都會計算出每個task要計算的是哪個分片數據。
分配算法會希望每個task正好分配到它要計算的數據所在的節點。
但是可能某task要分配過去的那個節點的計算資源和計算能力都滿了,Spark會等待一段時間,默認情況下是3s。
超過時間了會選擇一個比較差的本地化級別,將task分配到離要計算的數據所在節點比較近的一個節點,然后進行計算。

task會通過其所在節點的BlockManager來獲取數據,BlockManager發現自己本地沒有數據,
會通過一個getRemote()方法,通過TransferService(網絡數據傳輸組件)從數據所在節點的BlockManager中,獲取數據,通過網絡傳輸回task所在節點。

所以我們可以調節等待時長就是讓spark再等待多一下,不要到低一級的本地化級別。

spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack

new SparkConf()
  .set("spark.locality.wait", "10")

先用client模式,在本地就直接可以看到比較全的日志。
觀察日志,spark作業的運行日志顯示,觀察大部分task的數據本地化級別。
如果是發現,好多的級別都是NODE_LOCAL、ANY,那么最好就去調節一下數據本地化的等待時長。
看看大部分的task的本地化級別有沒有提升,spark作業的運行時間有沒有縮短。
如果spark作業的運行時間反而增加了,那就還是不要調節了。

本地化級別

  • PROCESS_LOCAL:進程本地化,代碼和數據在同一個進程中,也就是在同一個executor中;計算數據的task由executor執行,數據在executor的BlockManager中;性能最好
  • NODE_LOCAL:節點本地化,代碼和數據在同一個節點中;比如說,數據作為一個HDFS block塊,就在節點上,而task在節點上某個executor中運行;或者是,數據和task在一個節點上的不同executor中;數據需要在進程間進行傳輸
  • NO_PREF:對於task來說,數據從哪里獲取都一樣,沒有好壞之分
  • RACK_LOCAL:機架本地化,數據和task在一個機架的兩個節點上;數據需要通過網絡在節點之間進行傳輸
  • ANY:數據和task可能在集群中的任何地方,而且不在一個機架中,性能最差

1.5 JVM調優

首先估計GC的影響

GC調優的第一步就是去統計GC發生的頻率和GC消耗時間。
通過添加:

./bin/spark-submit \
--name "My app" \
--master local[4] \
--conf spark.eventLog.enabled=false \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
myApp.jar

在作業運行的時候能夠看到worker的日志里面在每次GC的時候就打印出GC信息。

使用jvisualvm來監控Spark應用程序

可以看到Spark應用程序堆,線程的使用情況,看不到GC回收的次數時間什么的,從而根據這些數據去優化您的程序。

  • 在$SPARK_HOME/conf目錄下配置spark-default.conf文件,加入如下配置:
    spark.driver.extraJavaOptions   -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false
    
  • 啟動Spark應用程序
  • 打開jvisualvm.exe監控
    在JDK安裝的bin目錄下有個jvisualvm.exe,雙擊它,然后進行配置,依次選擇 文件-> 添加JMX連接,在連接文本框填上Driver機器的地址和端口

了解GC相關原理

了解更多GC調優方法前我們需要了解JVM內存管理:

  • Java堆空間被分成 Young 和 Old 兩個regions。Young generation 顧名思義保存短期使用的對象,而 Old generation 用於保存有更長使用周期的對象。

  • Young generation 被細分成三個regions:[Eden, Survivor1, Survivor2]。

  • 簡述GC:

    1. 當 Eden 空間滿了, Eden 會進行一次較小的minor GC,依然存活的對象會從Eden and Survivor1 復制到 Survivor2。
    2. 當Survivor2的 Object 足夠老或者 Survivor2 空間滿了, 對象就會被移到 Old。最后當 Old 空間也接近使用完,就會發生full GC。
    3. Eden的內存太小會頻繁的進行minor gc,導致有些短生命周期對象在沒有被回收掉,年齡變大放到老年代了,導致full gc。
      minor gc后會將存活下來的對象,放入之前空閑的那一個survivor區域中。默認eden、survior1和survivor2的內存占比是8:1:1。如果存活下來的對象是1.5,一個survivor區域放不下。
  • GC調優的目標

    1. 保證在Old generation只存儲有長期存活的RDD;
    2. Young generation有足夠的空間存儲短期的對象,避免full GC將作業執行時創建的短期的對象也回收掉
    3. 避免Young generation頻繁minor GC
  • GC調優做法:

    1. 通過收集GC狀態檢查是否有大量的垃圾回收,如果在作業完成前有多次 full GC,意味着沒有足夠的內存給執行的task。
    2. 如果進行了多次 minorGC,分配更多的內存給Eden也許會有幫助。只要將Eden的內存E設置成大於每個task需要的內存大小,Young generation 則設置成 -Xmn=4/3*E。
    3. 如果OldGen將要完全占滿,可以減少spark.memory.fraction。改變JVM的NewRatio參數,默認設置是2,表示Old generation占用了 2/3的 heap內存,應該設置得足夠大超過並超過spark.memory.fraction。另外可以考慮減少Young generation的大小通過調低 -Xmn。
    4. 嘗試使用G1GC 收集器 通過配置 -XX:+UseG1GC 。如果executor需要大的堆內存,那么通過配置-XX:G1HeapRegionSize來提高G1 region size 是很重要的。
    5. 如果作業從HDFS中讀取數據,可通過作業使用的block大小推測使用的內存大小,讀取出來的block通常是存儲大小的2-3倍。如果有4個作業去使用一個HDFS的128MB的block,我們預估Eden需要43128MB。

spark-submit腳本里面,去用--conf的方式,去添加配置:

--conf spark.memory.fraction,0.6 -> 0.5 -> 0.4 -> 0.2
--conf spark.shuffle.memoryFraction=0.3

調節executor堆外內存

有時候,如果你的spark作業處理的數據量特別特別大;
然后spark作業一運行,時不時的報錯,shuffle file cannot find,executor、task lost,out of memory(內存溢出);
可能是說executor的堆外內存不太夠用,導致executor在運行的過程中會內存溢出;
然后導致后續的stage的task在運行的時候,可能要從一些executor中去拉取shuffle map output文件,但是executor可能已經掛掉了,關聯的Block manager也沒有了;spark作業徹底崩潰。

上述情況下,就可以去考慮調節一下executor的堆外內存。避免掉某些JVM OOM的異常問題。
此外,堆外內存調節的比較大的時候,對於性能來說,也會帶來一定的提升。

spark-submit腳本里面,去用--conf的方式,去添加基於yarn的提交模式配置;

--conf spark.yarn.executor.memoryOverhead=2048

默認情況下,這個堆外內存上限大概是300多M;真正處理大數據的時候,這里都會出現問題,導致spark作業反復崩潰,無法運行;此時就會去調節這個參數到至少1G(1024M),甚至說2G、4G。

調節連接等待時長

我們知道 Executor,優先從自己本地關聯的BlockManager中獲取某份數據。
如果本地block manager沒有的話,那么會通過TransferService,去遠程連接其他節點上executor的block manager去獲取。
正好碰到那個exeuctor的JVM在垃圾回收,就會沒有響應,無法建立網絡連接;
spark默認的網絡連接的超時時長,是60s;
就會出現某某file。一串file id。uuid(dsfsfd-2342vs--sdf--sdfsd)。not found。file lost。
報錯幾次,幾次都拉取不到數據的話,可能會導致spark作業的崩潰。也可能會導致DAGScheduler,反復提交幾次stage。TaskScheduler,反復提交幾次task。大大延長我們的spark作業的運行時間。

可以考慮在spark-submit腳本里面,調節連接的超時時長:

--conf spark.core.connection.ack.wait.timeout=300

OOM相關

Spark中的OOM問題不外乎兩種情況:

  • map執行中內存溢出
  • shuffle后內存溢出,shuffle后內存溢出的shuffle操作包括join,reduceByKey,repartition等操作。

可以先理一下Spark內存模型,再總結各種OOM的情況相對應的解決辦法和性能優化方面的總結。

  • map過程產生大量對象導致內存溢出

這種溢出的原因是在單個map中產生了大量的對象導致的,

例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString)

這個操作在rdd每個對象都產生了10000個對象,這肯定很容易產生內存溢出的問題。
針對這種問題,在不增加內存的情況下,可以通過減少每個Task的大小,讓Executor的內存也能夠裝得下。
具體做法可以在會產生大量對象的map操作之前調用repartition方法,分區成更小的塊傳入map。

例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。

面對這種問題注意,不能使用rdd.coalesce方法,這個方法只能減少分區,不能增加分區,不會有shuffle的過程。

  • 數據不平衡導致內存溢出

也可如上面進行repartition

  • coalesce減少分區導致內存溢出

因為hdfs中不適合存小文件,所以Spark計算后如果產生的文件太小,會調用coalesce合並文件再存入hdfs中。
這會導致一個問題:有100個文件,現在調用coalesce(10),意味着能夠有100個Task,最后只產生10個文件,
因為coalesce並不是shuffle操作,意味着coalesce並不是先執行100個Task,再將Task的執行結果合並成10個,
而是從頭到位只有10個Task在執行,每個Task同時一次讀取10個文件,使用的內存是原來的10倍,這導致了OOM。

解決這個問題的方法是令程序按照我們想的先執行100個Task再將結果合並成10個文件,
可以通過repartition解決,調用repartition(10),
因為這就有一個shuffle的過程,shuffle前后是兩個Stage,一個100個分區,一個是10個分區,就能按照我們的想法執行。

  • shuffle后內存溢出:

shuffle內存溢出的情況可以說都是shuffle后,單個文件過大導致的。
在Spark中,join,reduceByKey這一類型的過程,都會有shuffle的過程,在shuffle的使用,需要傳入一個partitioner,
默認的partitioner都是HashPatitioner,默認值是父RDD中最大的分區數,這個參數通過spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) ,
spark.default.parallelism參數只對HashPartitioner有效,所以如果是別的Partitioner或者自己實現的Partitioner就不能使用spark.default.parallelism這個參數來控制shuffle的並發量了。
如果是別的partitioner導致的shuffle內存溢出,就需要從partitioner的代碼增加partitions的數量。

1.6 Suffle調優

1.6.1 shuffle流程

shuffle,一定是分為兩個stage來完成的。
因為這其實是個逆向的過程,不是stage決定shuffle,是shuffle決定stage。
在某個action觸發job的時候,DAGScheduler會負責划分job為多個stage。划分的依據,就是如果發現有會觸發shuffle操作的算子(reduceByKey)。

每一個shuffle的前半部分stage的task,每個task都會創建下一個stage的task數量相同的文件
比如下一個stage會有10個task,那么當前stage每個task都會創建10份文件;
會將同一個key對應的values,寫入同一個文件中的;
不同節點上的task,也一定會將同一個key對應的values,寫入下一個stage的同一個task對應的文件中。

shuffle的后半部分stage的task,每個task都會從各個節點上的task寫的屬於自己的那一份文件中,拉取key, value對;
然后task會有一個內存緩沖區,然后會用HashMap,進行key, values的匯聚;
task會用我們自己定義的聚合函數reduceByKey(+),把所有values進行一對一的累加;聚合出來最終的值。

1.6.2 合並map端輸出文件(優化的hashShuffle)

sparConf().set("spark.shuffle.consolidateFiles", "true")

開啟了map端輸出文件的合並機制之后:

第一個stage,同時運行cpu core個task,比如cpu core是2個,並行運行2個task;每個task都創建下一個stage的task數量個文件;

第一個stage,並行運行的2個task執行完以后;就會執行另外兩個task;另外2個task不會再重新創建輸出文件;而是復用之前的task創建的map端輸出文件,將數據寫入上一批task的輸出文件中

第二個stage,task在拉取數據的時候,就不會去拉取上一個stage每一個task為自己創建的那份輸出文件了;而是拉取少量的輸出文件,每個輸出文件中,可能包含了多個task給自己的map端輸出。

1.6.3 調節map端內存緩沖與reduce端內存占比

深入一下shuffle原理:
shuffle的map task:
輸出到磁盤文件的時候,統一都會先寫入每個task自己關聯的一個內存緩沖區。
這個緩沖區大小,默認是32kb。當內存緩沖區滿溢之后,會進行spill溢寫操作到磁盤文件中去
數據量比較大的情況下可能導致多次溢寫。

shuffle的reduce端task:
在拉取到數據之后,會用hashmap的數據格式,來對各個key對應的values進行匯聚的時候。
使用的就是自己對應的executor的內存,executor(jvm進程,堆),默認executor內存中划分給reduce task進行聚合的比例,是0.2。
要是拉取過來的數據很多,那么在內存中,放不下;就將在內存放不下的數據,都spill(溢寫)到磁盤文件中去。
數據大的時候磁盤上溢寫的數據量越大,后面在進行聚合操作的時候,很可能會多次讀取磁盤中的數據,進行聚合。

怎么調節?
看Spark UI,shuffle的磁盤讀寫的數據量很大,就意味着最好調節一些shuffle的參數。進行調優。

set(spark.shuffle.file.buffer,64)      // 默認32k  每次擴大一倍,看看效果。
set(spark.shuffle.memoryFraction,0.2)  // 每次提高0.1,看看效果。

很多資料都會說這兩個參數,是調節shuffle性能的不二選擇,實際上不是這樣的。
以實際的生產經驗來說,這兩個參數沒有那么重要,shuffle的性能不是因為這方面的原因導致的。

1.6.4 HashShuffleManager與SortShuffleManager

上面我們所講shuffle原理是指HashShuffleManager,是很過時的shuffle manager。
之前講解的一些調優的點,比如consolidateFiles機制、map端緩沖、reduce端內存占比。這些對任何shuffle manager都是有用的。

在spark 1.5.x后,又出來了一種tungsten-sort shuffleMnager。效果跟sort shuffle manager是差不多的。
唯一的不同之處在於tungsten-sort shuffleMnager,是使用了自己實現的一套內存管理機制以及堆內存,性能上有很大的提升可以避免shuffle過程中產生的大量的OOM,GC。

SortShuffleManager與HashShuffleManager兩點不同:

  1. SortShuffleManager會對每個reduce task要處理的數據,進行排序(默認的)。
  2. SortShuffleManager會一個task,只會寫入一個磁盤文件,不同reduce task的數據,用offset來划分界定。

hash、sort、tungsten-sort。如何來選擇?

  1. 需不需要數據默認就讓spark給你進行排序?就好像mapreduce,默認就是有按照key的排序。
    如果不需要的話,其實還是建議搭建就使用最基本的HashShuffleManager,因為最開始就是考慮的是不排序,換取高性能;

2、什么時候需要用sort shuffle manager?如果你需要你的那些數據按key排序了,那么就選擇這種吧。
而且要注意,reduce task的數量應該是超過200的,這樣sort、merge(多個文件合並成一個)的機制,才能生效把。
但是這里要注意,你一定要自己考量一下,有沒有必要在shuffle的過程中,就做這個事情,畢竟對性能是有影響的。

3、如果你不需要排序,而且你希望你的每個task輸出的文件最終是會合並成一份的,你自己認為可以減少性能開銷;
可以去調節bypassMergeThreshold這個閾值,比如你的reduce task數量是500,默認閾值是200,所以默認還是會進行sort和直接merge的;
可以將閾值調節成550,不會進行sort,按照hash的做法,每個reduce task創建一份輸出文件,最后合並成一份文件。
(一定要提醒大家,這個參數,其實我們通常不會在生產環境里去使用,也沒有經過驗證說,這樣的方式,到底有多少性能的提升)

4、如果你想選用sort based shuffle manager,而且你們公司的spark版本比較高,
是1.5.x版本的,那么可以考慮去嘗試使用tungsten-sort shuffle manager。
看看性能的提升與穩定性怎么樣。(唉,開源出來的項目都是落后了快五年了的)

總結:
1、在生產環境中,不建議大家貿然使用第三點和第四點:
2、如果你不想要你的數據在shuffle時排序,那么就自己設置一下,用hash shuffle manager。
3、如果你的確是需要你的數據在shuffle時進行排序的,那么就默認不用動,默認就是sort shuffle manager;或者是什么?如果你壓根兒不care是否排序這個事兒,那么就默認讓他就是sort的。調節一些其他的參數(consolidation機制)。(80%,都是用這種)

new SparkConf().set("spark.shuffle.manager", "hash")  // hash、tungsten-sort、默認為sort
new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "550")   // 默認200

當reduce task數量少於等於200;map task創建的輸出文件小於等於200的;會將所有的輸出文件合並為一份文件。且不進行sort排序,節省了性能開銷。

1.7 算子調優

1.7.1 MapPartitions提升Map類操作性能

這里需要稍微講一下RDD和DataFrame的區別。
RDD強調的是不可變對象,每個RDD都是不可變的,當調用RDD的map類型操作的時候,都是產生一個新的對象。
這就導致如果對一個RDD調用大量的map類型操作的話,每個map操作會產生一個到多個RDD對象,
這雖然不一定會導致內存溢出,但是會產生大量的中間數據,增加了gc操作。
另外RDD在調用action操作的時候,會出發Stage的划分,但是在每個Stage內部可優化的部分是不會進行優化的,
例如rdd.map(+1).map(+1),這個操作在數值型RDD中是等價於rdd.map(_+2)的,但是RDD內部不會對這個過程進行優化。

DataFrame則不同,DataFrame由於有類型信息所以是可變的,並且在可以使用sql的程序中,都有除了解釋器外,都會有一個sql優化器Catalyst,

上面說到的這些RDD的弊端,有一部分就可以使用mapPartitions進行優化,
mapPartitions可以同時替代rdd.map,rdd.filter,rdd.flatMap的作用,
所以在長操作中,可以在mapPartitons中將RDD大量的操作寫在一起,避免產生大量的中間rdd對象,
另外是mapPartitions在一個partition中可以復用可變類型,這也能夠避免頻繁的創建新對象。

普通的mapToPair,當一個partition中有1萬條數據,function要執行和計算1萬次。
但是,使用MapPartitions操作之后,一個task僅僅會執行一次function,function一次接收所有的partition數據。只要執行一次就可以了,性能比較高。
但是MapPartitions操作,對於大量數據來說,一次傳入一個function以后,可能一下子內存不夠而且又沒法騰出內存空間的話,可能就OOM!

在項目中,自己先去估算一下RDD的數據量,以及每個partition的量,還有自己分配給每個executor的內存資源,看看一下子內存容納所有的partition數據。

1.7.2 使用coalesce減少分區數量

repartition 是 coalesce

從源碼中可以看出repartition方法其實就是調用了coalesce方法,shuffle為true的情況. 現在假設RDD有X個分區,需要重新划分成Y個分區.
1.如果x<y,說明x個分區里有數據分布不均勻的情況,利用HashPartitioner把x個分區重新划分成了y個分區,此時,需要把shuffle設置成true才行,因為如果設置成false,不會進行shuffle操作,此時父RDD和子RDD之間是窄依賴,這時並不會增加RDD的分區.

2.如果x>y,需要先把x分區中的某些個分區合並成一個新的分區,然后最終合並成y個分區,此時,需要把coalesce方法的shuffle設置成false.

總結:如果想要增加分區的時候,可以用repartition或者coalesce+true。但是一定要有shuffle操作,分區數量才會增加。

RDD這種filter之后,RDD中的每個partition的數據量,可能都不太一樣了。
問題:
1、每個partition數據量變少了,但是在后面進行處理的時候,還跟partition數量一樣數量的task,來進行處理;有點浪費task計算資源。
2、每個partition的數據量不一樣,會導致后面的每個task處理每個partition的時候,每個task要處理的數據量就不同,處理速度相差大,導致數據傾斜。。。。

針對上述的兩個問題,能夠怎么解決呢?

1、針對第一個問題,希望可以進行partition的壓縮吧,因為數據量變少了,那么partition其實也完全可以對應的變少。
比如原來是4個partition,現在完全可以變成2個partition。
那么就只要用后面的2個task來處理即可。就不會造成task計算資源的浪費。

2、針對第二個問題,其實解決方案跟第一個問題是一樣的;也是去壓縮partition,盡量讓每個partition的數據量差不多。
那么這樣的話,后面的task分配到的partition的數據量也就差不多。
不會造成有的task運行速度特別慢,有的task運行速度特別快。避免了數據傾斜的問題。

主要就是用於在filter操作之后,添加coalesce算子,針對每個partition的數據量各不相同的情況,來壓縮partition的數量。
減少partition的數量,而且讓每個partition的數據量都盡量均勻緊湊。

1.7.3 foreachPartition優化寫數據庫性能

默認的foreach的性能缺陷在哪里?

  1. 對於每條數據,都要單獨去調用一次function,task為每個數據都要去執行一次function函數。如果100萬條數據,(一個partition),調用100萬次。性能比較差。
  2. 浪費數據庫連接資源。

在生產環境中,都使用foreachPartition來寫數據庫
1、對於我們寫的function函數,就調用一次,一次傳入一個partition所有的數據,但是太多也容易OOM。
2、主要創建或者獲取一個數據庫連接就可以
3、只要向數據庫發送一次SQL語句和多組參數即可

local模式跑的時候foreachPartition批量入庫會卡住,可能資源不足,因為用standalone集群跑的時候不會出現。

1.7.4 repartition 解決Spark SQL低並行度的性能問題

並行度:可以這樣調節:
1、spark.default.parallelism 指定為全部executor的cpu core總數的2~3倍
2、textFile(),傳入第二個參數,指定partition數量(比較少用)
但是通過spark.default.parallelism參數指定的並行度,只會在沒有Spark SQL的stage中生效
Spark SQL自己會默認根據hive表對應的hdfs文件的block,自動設置Spark SQL查詢所在的那個stage的並行度。

比如第一個stage,用了Spark SQL從hive表中查詢出了一些數據,然后做了一些transformation操作,接着做了一個shuffle操作(groupByKey),這些都不會應用指定的並行度可能會有非常復雜的業務邏輯和算法,就會導致第一個stage的速度,特別慢;下一個stage,在shuffle操作之后,做了一些transformation操作才會變成你自己設置的那個並行度。

解決上述Spark SQL無法設置並行度和task數量的辦法,是什么呢?

repartition算子,可以將你用Spark SQL查詢出來的RDD,使用repartition算子時,去重新進行分區,此時可以分區成多個partition,比如從20個partition分區成100個。
就可以避免跟Spark SQL綁定在一個stage中的算子,只能使用少量的task去處理大量數據以及復雜的算法邏輯。

1.7.5 reduceByKey本地聚合介紹

reduceByKey,相較於普通的shuffle操作(比如groupByKey),它有一個特點:底層基於CombineByKey,會進行map端的本地聚合。
對map端給下個stage每個task創建的輸出文件中,寫數據之前,就會進行本地的combiner操作,也就是說對每一個key,對應的values,都會執行你的算子函數。

用reduceByKey對性能的提升:
1、在本地進行聚合以后,在map端的數據量就變少了,減少磁盤IO。而且可以減少磁盤空間的占用。
2、下一個stage,拉取數據的量,也就變少了。減少網絡的數據傳輸的性能消耗。
3、在reduce端進行數據緩存的內存占用變少了,要進行聚合的數據量也變少了。

reduceByKey在什么情況下使用呢?
1、簡單的wordcount程序。
2、對於一些類似於要對每個key進行一些字符串拼接的這種較為復雜的操作,可以自己衡量一下,其實有時,也是可以使用reduceByKey來實現的。
但是不太好實現。如果真能夠實現出來,對性能絕對是有幫助的。

1.8 troubleshooting調優

1.8.1 控制shuffle reduce端緩沖大小以避免OOM

map端的task是不斷的輸出數據的,數據量可能是很大的。
但是,在map端寫過來一點數據,reduce端task就會拉取一小部分數據,先放在buffer中,立即進行后面的聚合、算子函數的應用。
每次reduece能夠拉取多少數據,就由buffer來決定。然后才用后面的executor分配的堆內存占比(0.2),hashmap,去進行后續的聚合、函數的執行。

reduce端緩沖默認是48MB(buffer),可能會出什么問題?
緩沖達到最大極限值,再加上你的reduce端執行的聚合函數的代碼,可能會創建大量的對象。reduce端的內存中,就會發生內存溢出的問題。
這個時候,就應該減少reduce端task緩沖的大小。我寧願多拉取幾次,但是每次同時能夠拉取到reduce端每個task的數量,比較少,就不容易發生OOM內存溢出的問題。

另外,如果你的Map端輸出的數據量也不是特別大,然后你的整個application的資源也特別充足,就可以嘗試去增加這個reduce端緩沖大小的,比如從48M,變成96M
這樣每次reduce task能夠拉取的數據量就很大。需要拉取的次數也就變少了。
最終達到的效果,就應該是性能上的一定程度上的提升。
設置
spark.reducer.maxSizeInFlight

1.8.2 解決JVM GC導致的shuffle文件拉取失敗

比如,executor的JVM進程,內存不夠了,發生GC,導致BlockManger,netty通信都停了。
下一個stage的executor,可能是還沒有停止掉的,task想要去上一個stage的task所在的exeuctor,去拉取屬於自己的數據,結果由於對方正在GC,就導致拉取了半天沒有拉取到。
可能會報錯shuffle file not found。
但是,可能下一個stage又重新提交了stage或task以后,再執行就沒有問題了,因為可能第二次就沒有碰到JVM在gc了。
有的時候,出現這種情況以后,會重新去提交stage、task。重新執行一遍,發現就好了。沒有這種錯誤了。

spark.shuffle.io.maxRetries 3
spark.shuffle.io.retryWait 5s

針對這種情況,我們完全可以進行預備性的參數調節。
增大上述兩個參數的值,達到比較大的一個值,盡量保證第二個stage的task,一定能夠拉取到上一個stage的輸出文件。
盡量避免因為gc導致的shuffle file not found,無法拉取到的問題。

1.8.3 yarn-cluster模式的JVM內存溢出無法執行問題

總結一下yarn-client和yarn-cluster模式的不同之處:
yarn-client模式,driver運行在本地機器上的;yarn-cluster模式,driver是運行在yarn集群上某個nodemanager節點上面的。
yarn-client的driver運行在本地,通常來說本地機器跟yarn集群都不會在一個機房的,所以說性能可能不是特別好;yarn-cluster模式下,driver是跟yarn集群運行在一個機房內,性能上來說,也會好一些。

實踐經驗,碰到的yarn-cluster的問題:

有的時候,運行一些包含了spark sql的spark作業,可能會碰到yarn-client模式下,可以正常提交運行;yarn-cluster模式下,可能是無法提交運行的,會報出JVM的PermGen(永久代)的內存溢出,會報出PermGen Out of Memory error log。

yarn-client模式下,driver是運行在本地機器上的,spark使用的JVM的PermGen的配置,是本地的spark-class文件(spark客戶端是默認有配置的),JVM的永久代的大小是128M,這個是沒有問題的;但是在yarn-cluster模式下,driver是運行在yarn集群的某個節點上的,使用的是沒有經過配置的默認設置(PermGen永久代大小),82M。

spark-submit腳本中,加入以下配置即可:
--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"

另外,sql有大量的or語句。可能就會出現一個driver端的jvm stack overflow。
基本上就是由於調用的方法層級過多,因為產生了非常深的,超出了JVM棧深度限制的,遞歸。spark sql內部源碼中,在解析sqlor特別多的話,就會發生大量的遞歸。
建議不要搞那么復雜的spark sql語句。
采用替代方案:將一條sql語句,拆解成多條sql語句來執行。
每條sql語句,就只有100個or子句以內;一條一條SQL語句來執行。

1.9 數據傾斜條調優

1.9.1 數據傾斜的原理、現象、產生原因與定位

原因
在執行shuffle操作的時候,是按照key,來進行values的數據的輸出、拉取和聚合的。
同一個key的values,一定是分配到一個reduce task進行處理的。
假如多個key對應的values,總共是90萬。
可能某個key對應了88萬數據,分配到一個task上去面去執行,執行很慢。而另外兩個task,可能各幾十個key分配到了1萬數據,出現數據傾斜。
定位:在自己的程序里面找找,哪些地方用了會產生shuffle的算子,groupByKey、countByKey、reduceByKey、join。
看log,看看是執行到了第幾個stage。
哪一個stage,task特別慢或者只有一部分task在工作,就能夠從spark代碼的stage划分,通過stage定位到你的代碼,哪里發生了數據傾斜。

第一個方案:聚合源數據
做一些聚合的操作:groupByKey、reduceByKey,說白了就是對每個key對應的values執行一定的計算。

spark作業的數據來源如果是hive,可以直接在生成hive表的hive etl中,對數據進行聚合。
比如按key來分組,將key對應的所有的values,全部用一種特殊的格式,拼接到一個字符串里面去,每個key就只對應一條數據。比如

key=sessionid, value: action_seq=1|user_id=1|search_keyword=火鍋|category_id=001;action_seq=2|user_id=1|search_keyword=涮肉|category_id=001”。

然后在spark中,拿到key=sessionid,values<Iterable>。

或者在hive里面就進行reduceByKey計算。
spark中就不需要再去執行groupByKey+map這種操作了。
直接對每個key對應的values字符串進行map進行你需要的操作即可。也就根本不可能導致數據傾斜。

具體怎么去在hive etl中聚合和操作,就得根據你碰到數據傾斜問題的時候,你的spark作業的源hive表的具體情況,具體需求,具體功能,具體分析。

具體對於我們的程序來說,完全可以將aggregateBySession()這一步操作,放在一個hive etl中來做,形成一個新的表。
對每天的用戶訪問行為數據,都按session粒度進行聚合,寫一個hive sql。

在spark程序中,就不要去做groupByKey+mapToPair這種算子了
直接從當天的session聚合表中,用SparkSQL查詢出來對應的數據,即可。
這個RDD在后面就可以使用了。

第二個方案:過濾導致傾斜的key
如果你能夠接受某些數據,在spark作業中直接就摒棄掉,不使用。
比如說,總共有100萬個key。只有2個key,是數據量達到10萬的。其他所有的key,對應的數量都是幾十。
這個時候,你自己可以去取舍,如果業務和需求可以理解和接受的話,在你從hive表查詢源數據的時候,直接在sql中用where條件,過濾掉某幾個key。
那么這幾個原先有大量數據,會導致數據傾斜的key,被過濾掉之后,那么在你的spark作業中,自然就不會發生數據傾斜了。

1.9.2 提高shuffle操作的reduce並行度

將reduce task的數量,變多,就可以讓每個reduce task分配到更少的數據量。
這樣的話,也許就可以緩解,或者甚至是基本解決掉數據傾斜的問題。
提升shuffle reduce端並行度,怎么來操作?
在調用shuffle算子的時候,傳入進去一個參數。
就代表了那個shuffle操作的reduce端的並行度。那么在進行shuffle操作的時候,就會對應着創建指定數量的reduce task。
按照log,找到發生數據傾斜的shuffle操作,給它傳入一個並行度數字,這樣的話,原先那個task分配到的數據,肯定會變少。就至少可以避免OOM的情況,程序至少是可以跑的。

但是沒有從根本上改變數據傾斜的本質和問題。
不像第一個和第二個方案(直接避免了數據傾斜的發生)。
原理沒有改變,只是說,盡可能地去緩解和減輕shuffle reduce task的數據壓力,以及數據傾斜的問題。

實際生產環境中的經驗。
1、如果最理想的情況下,提升並行度以后,減輕了數據傾斜的問題,那么就最好。就不用做其他的數據傾斜解決方案了。
2、不太理想的情況下,就是比如之前某個task運行特別慢,要5個小時,現在稍微快了一點,變成了4個小時;或者是原先運行到某個task,直接OOM,現在至少不會OOM了,但是那個task運行特別慢,要5個小時才能跑完。
那么,如果出現第二種情況的話,各位,就立即放棄第三種方案,開始去嘗試和選擇后面的方案。

1.9.3 使用隨機key實現雙重聚合

1、原理
第一輪聚合的時候,對key進行打散,將原先一樣的key,變成不一樣的key,相當於是將每個key分為多組;比如原來是

(5,44)、(6,45)、(7,45)
就可以對key添加一個隨機數
(1_5,44)、(3_6,45)、(2_7,45)
針對多個組,進行key的局部聚合;
接着,再去除掉每個key的前綴,恢復成
(5,44)、(6,45)、(7,45)
然后對所有的key,進行全局的聚合。

對groupByKey、reduceByKey造成的數據傾斜,有比較好的效果。

2、使用場景
(1)groupByKey
(2)reduceByKey

1.9.4 將導致傾斜的key單獨進行join

這個方案關鍵之處在於:
將發生數據傾斜的key,單獨拉出來,放到一個RDD中去;
用這個原本會傾斜的key RDD跟其他RDD,單獨去join一下,
key對應的數據,可能就會分散到多個task中去進行join操作。
這個key跟之前其他的key混合在一個RDD中時,肯定是會導致一個key對應的所有數據,都到一個task中去,就會導致數據傾斜。

這種方案什么時候適合使用?

針對你的RDD的數據,你可以自己把它轉換成一個中間表,或者是直接用countByKey()的方式,你可以看一下這個RDD各個key對應的數據量;
RDD有一個或少數幾個key,是對應的數據量特別多;
此時可以采用咱們的這種方案,單拉出來那個最多的key;
單獨進行join,盡可能地將key分散到各個task上去進行join操作。

1.9.5 使用隨機數以及擴容表進行join

這個方案是沒辦法徹底解決數據傾斜的,更多的,是一種對數據傾斜的緩解。
局限性:
1、因為join兩個RDD都很大,就沒有辦法去將某一個RDD擴的特別大,一般是10倍。
2、如果就是10倍的話,那么數據傾斜問題,只能說是緩解和減輕,不能說徹底解決。

步驟:
1、選擇一個RDD,要用flatMap,進行擴容,將每條數據,映射為多條數據,每個映射出來的數據,都帶了一個n以內的隨機數,通常來說,會選擇10。
2、將另外一個RDD,做普通的map映射操作,每條數據,都打上一個10以內的隨機數。
3、最后,將兩個處理后的RDD,進行join操作。
4、join完以后,可以執行map操作,去將之前打上的隨機數給去掉,然后再和另外一個普通RDD join以后的結果,進行union操作。

sample采樣傾斜key並單獨進行join
將key,從另外一個RDD中過濾出的數據,可能只有一條,或者幾條,此時,咱們可以任意進行擴容,擴成1000倍。
將從第一個RDD中拆分出來的那個傾斜key RDD,打上1000以內的一個隨機數。
打散成100份,甚至1000份去進行join,那么就肯定沒有數據傾斜的問題了吧。
這種情況下,還可以配合上,提升shuffle reduce並行度,join(rdd, 1000)。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM