基礎
概述
- Spark計算平台有兩個重要角色,Driver和executor
- Driver
- Driver充當Application的master角色,負責任務執行計划生成和任務分發及調度;
- Driver負責生成邏輯查詢計划、物理查詢計划和把任務派發給executor
- Executor
- Executor充當worker角色,負責實際執行任務的task,計算的結果返回Driver。
- Executor接受任務后進行處理,離線計算也是按這個流程進行。
分工
- RDD Objects生成邏輯查詢計划
- 生成物理查詢計划DAGScheduler
- 任務調度TaskScheduler
- 任務執行Executor
作業提交流程
- client submit作業,通過反射invoke執行用戶代碼main函數,啟動CoarseGrainedExecutorBackend、初始化SparkContext。
- SparkContext初始化包括初始化監控頁面SparkUI、執行環境SparkEnv、安全管理器SecurityManager、stage划分及調度器DAGScheduler、task作業調度器TaskSchedulerImpl和與Executor通信的調度端CoarseGrainedSchedulerBackend。
- DAGScheduler將作業划分后,依次提交stage對應的taskSet給TaskSchedulerImpl。
- TaskSchedulerImpl會submit taskset給driver端的CoarseGrainedSchedulerBackend后端。
- CoarseGrainedSchedulerBackend會逐個LaunchTask
- 在遠端的CoarseGrainedExecutorBackend接收到task提交event后,會調用Executor執行task
- 最終task是由TaskRunner的run方法內運行。
Executor
- 首先executor端的rpc服務端點收到LaunchTask的消息,並對傳過來的任務數據進行反序列化成TaskDescription將任務交給Executor對象運行
- Executor根據傳過來的TaskDescription對象創建一個TaskRunner對象,並放到線程池中運行。這里的線程池用的是Executors.newCachedThreadPool,空閑是不會有線程在跑
- TaskRunner對任務進一步反序列化,調用Task.run方法執行任務運行邏輯
- ShuffleMapTask類型的任務會將rdd計算結果數據經過排序合並之后寫到一個文件中,並寫一個索引文件
- 任務運行完成后會更新一些任務統計量和度量系統中的一些統計量
- 最后會根據結果序列化后的大小選擇不同的方式將結果傳回driver。
共享變量
Broadcast Variable(廣播變量)
- Broadcast Variable會將使用到的變量,僅僅為每個節點拷貝一份,而不是給節點上的每個task拷貝一份。這樣可以優化性能,減少網絡傳輸及內存消耗。
- Broadcast Variable主要用於共享讀,是只讀的,沒法去寫
- 可以通過調用SparkContext的broadcast()方法,來針對某個變量創建廣播變量,返回類型是Broadcast
。然后在算子的函數內,使用廣播變量,此時每個節點都只會拷貝一份,每個節點可以使用廣播變量的value()方法獲取值。廣播變量是只讀的,不可寫。
Accumulator(累加變量)
Accumulator可以讓多個task共同操作一份變量,主要可以進行累加操作。
內存管理
- spark.executor.memory設置executor可用內存,包含
- reservedMemory :默認300M
- usableMemory
- Execution 內存: 存放 Shuffle、Join、Sort、Aggregation 等計算過程中的臨時數據
- Storage 內存: 存儲 spark 的 cache 數據,例如RDD的緩存、unroll數據;
- 用戶內存(User Memory):存儲 RDD 轉換操作所需要的數據,例如 RDD 依賴等信息
相關配置
- spark.memory.storageFraction:配置usableMemory中Storage內存占比
- spark.memory.offHeap.enabled:堆外內存是否開啟,默認不開啟
- spark.memory.offHeap.size:堆外內存大小
堆內內存
默認情況下,Spark 僅僅使用了堆內內存。Executor 端的堆內內存區域大致可以分為以下四大塊
- Execution 內存:主要用於存放 Shuffle、Join、Sort、Aggregation 等計算過程中的臨時數據
- Storage 內存:主要用於存儲 spark 的 cache 數據,例如RDD的緩存、unroll數據;
- 用戶內存(User Memory):主要用於存儲 RDD 轉換操作所需要的數據,例如 RDD 依賴等信息。
- 預留內存(Reserved Memory):系統預留內存,會用來存儲Spark內部對象。
堆外內存
通過 spark.memory.offHeap.enabled 參數啟用,並且通過 spark.memory.offHeap.size 設置堆外內存大小,單位為字節。如果堆外內存被啟用,那么 Executor 內將同時存在堆內和堆外內存,兩者的使用互補影響,
堆外內存只區分 Execution 內存和 Storage 內存
Execution 內存和 Storage 內存動態調整
- 若Execution內存與Storage內存都不足時,按照LRU規則存儲到磁盤;
- 若Execution內存不足,Storage內存有結余,Storage 內存的空間被占用后,目前的實現是無法讓對方"歸還"
- 若Storage內存不足,Execution內存有結余,Execution內存的空間被占用后,可讓對方將占用的部分轉存到硬盤,然后"歸還"借用的空間,因為Cache在內存的數據不一定后面會用
Task之間內存分布
- Task共享着 Execution 內存
- Spark 內部維護了一個 HashMap 用於記錄每個 Task 占用的內存
- 每個Task可用內存為Executor可用內存的 1/2N ~ 1/N,N是Task的個數
Spark Core
spark的shuffle
- 前一個stage 的 ShuffleMapTask 進行 shuffle write, 把數據存儲在 blockManager 上面, 並且把數據位置元信息上報到 driver 的 mapOutTrack 組件中, 下一個 stage 根據數據位置元信息, 進行 shuffle read, 拉取上個stage 的輸出數據。
shuffle write
分為三種writer, 分為 BypassMergeSortShuffleWriter, SortShuffleWriter 和 UnsafeShuffleWriter
BypassMergeSortShuffleWriter
- 開啟map side combine並且分區數較少
- BypassMergeSortShuffleWriter和Hash Shuffle中的HashShuffleWriter實現基本一致, 唯一的區別在於,map端的多個輸出文件會被匯總為一個文件。 所有分區的數據會合並為同一個文件,會生成一個索引文件,是為了索引到每個分區的起始地址,可以隨機 access 某個partition的所有數據。
- 這種方式不宜有太多分區,因為過程中會並發打開所有分區對應的臨時文件,會對文件系統造成很大的壓力。
- 給每個分區分配一個臨時文件,對每個 record的key 使用分區器(模式是hash,如果用戶自定義就使用自定義的分區器)找到對應分區的輸出文件句柄,直接寫入文件,沒有在內存中使用 buffer。 最后copyStream方法把所有的臨時分區文件拷貝到最終的輸出文件中,並且記錄每個分區的文件起始寫入位置,把這些位置數據寫入索引文件中。
SortShuffleWriter
- 使用 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 在內存中進行排序, 排序的 K 是(partitionId, hash(key)) 這樣一個元組。
- 如果超過內存 limit, 我 spill 到一個文件中,這個文件中元素也是有序的,首先是按照 partitionId的排序,如果 partitionId 相同, 再根據 hash(key)進行比較排序
- 如果需要輸出全局有序的文件的時候,就需要對之前所有的輸出文件 和 當前內存中的數據結構中的數據進行 merge sort, 進行全局排序
UnsafeShuffleWriter
- UnsafeShuffleWriter需要Serializer支持relocation
- UnsafeShuffleWriter 里面維護着一個 ShuffleExternalSorter, 用來做外部排序
區別 | UnsafeShuffleWriter | SortShuffleWriter |
---|---|---|
排序方式 | 最終只是 partition 級別的排序 | 先 partition 排序,相同分區 key有序 |
aggregation | 沒有飯序列化,沒有aggregation | 支持 aggregation |
shuffle read
內存管理——Tungsten
- TaskMemoryManager用來統一這兩種內存:堆內內存和堆外內存
- MemoryBlock 繼承 MemoryLocation 代表着對內存的定位,這個對象可以把off-heap 和on-heap 進行統一, MemoryLocation 對於off-heap的memory,obj為null,offset則為絕對的內存地址,對於on-heap的memory,obj則是JVM對象的基地址,offset則是相對於改對象基地址的偏移。
Spark SQL
Parser模塊
- 將sparkSql字符串切分成一個一個token,再根據一定語義規則解析為一個抽象語法樹/AST。Parser模塊目前基本都使用第三方類庫ANTLR來實現,比如Hive,presto,sparkSql等。
- SqlBaseLexer和SqlBaseParser都是使用ANTLR4自動生成的Java類。使用這兩個解析器將SQL字符串語句解析成了ANTLR4的ParseTree語法樹結構。然后在parsePlan過程中,使用AstBuilder.scala將ParseTree轉換成catalyst表達式邏輯計划LogicalPlan。
Analyzer模塊
- 基本的元數據信息schema catalog來表達這些token。最重要的元數據信息就是,
- 表的schema信息,主要包括表的基本定義(表名、列名、數據類型)、表的數據格式(json、text、parquet、壓縮格式等)、表的物理位置
- 基本函數信息,主要是指類信息
- Analyzer會再次遍歷整個AST,對樹上的每個節點進行數據類型綁定以及函數綁定,比如people詞素會根據元數據表信息解析為包含age、id以及name三列的表,people.age會被解析為數據類型為int的變量,sum會被解析為特定的聚合函數,
Optimizer模塊
- Optimizer是catalyst的核心,分為RBO和CBO兩種。
- RBO的優化策略就是對語法樹進行一次遍歷,模式匹配能夠滿足特定規則的節點,再進行相應的等價轉換,即將一棵樹等價地轉換為另一棵樹。SQL中經典的常見優化規則有,
+謂詞下推(predicate pushdown)
+常量累加(constant folding)
+列值裁剪(column pruning)- Limits合並(combine limits)
SparkPlanner模塊
- 至此,OLP已經得到了比較完善的優化,然而此時OLP依然沒有辦法真正執行,它們只是邏輯上可行,實際上spark並不知道如何去執行這個OLP。
- 此時就需要將左邊的OLP轉換為physical plan物理執行計划,將邏輯上可行的執行計划變為spark可以真正執行的計划。
- 比如join算子,spark根據不同場景為該算子制定了不同的算法策略,有broadcastHashJoin、shuffleHashJoin以及sortMergeJoin,物理執行計划實際上就是在這些具體實現中挑選一個耗時最小的算法實現,這個過程涉及到cost model/CBO
WholeStageCodegen
- WholeStageCodegen,將多個operators合並成一個java函數,從而提高執行速度
Spark MLLib
Spark Streaming
- 數據流經過Spark Streaming的receiver,數據切分為DStream(類似RDD,DStream是Spark Streaming中流數據的邏輯抽象),然后DStream被Spark Core的離線計算引擎執行並行處理。
流程
- 實時計算與離線計算一樣,主要組件是Driver和Executor的。不同的是多了數據采集和數據按時間分片過程,數據采集依賴外部數據源,這里用MessageQueue表示
- 數據分片則依靠內部一個時鍾Clock,按batch interval來定時對數據分片,然后把每一個batch interval內的數據提交處理。
- Executor從MessageQueue獲取數據並交給BlockManager管理,然后把元數據信息BlockID返給driver的Receiver Tracker
- driver端的Job Jenerator對一個batch的數據生成JobSet,最后把作業執行計划傳遞給executor處理。
Structure Streaming
將數據抽象為DataFrame,即無邊界的表,通過將數據源映射為一張無界長度的表,通過表的計算,輸出結果映射為另一張表。這樣以結構化的方式去操作流式數據,簡化了實時計算過程,同時還復用了其Catalyst引擎來優化SQL操作。此外還能支持增量計算和基於event time的計算。
Spark thrift server優化
- spark常駐Driver
- 增加用戶概念,application有用戶歸屬
實現流程
- spark thrift server基於hive jdbc服務實現
特性
- 支持手動緩存中間結果集,Statement級別復用
- 支持提交離線作業、監控離線作業、清理離線作業
- 增加SparkListener監控job狀態,然后取數據
- 調用spark restful接口 kill job
ES-Spark優化
Spark DirectQuery ES
-
原生API會解析json,拼接到query中,不靈活
- 例如query+highlight
- query + sort
- 等等
-
Spark-SQL 的 where 語句全部(或者部分)下沉到 ES里進行執行,依賴於倒排索引,DocValues,以及分片,並行化執行,ES能夠獲得比Spark-SQL更優秀的響應時間
-
分片數據Merge(Reduce操作,Spark 可以獲得更好的性能和分布式能力),更復雜的業務邏輯都交給Spark-SQL (此時數據規模已經小非常多了),並且可以做各種自定義擴展,通過udf等函數
-
ES 無需實現Merge操作,可以減輕內存負擔,提升並行Merge的效率(並且現階段似乎ES的Reduce是只能在單個實例里完成)
實現流程
## package.scala
+ 增加接口
def esDirectRDD(resource: String, cfg: scala.collection.Map[String, String]) = EsSpark.esDirectRDD(sc, resource, cfg)
## EsSpark
+ 增加接口
def esDirectRDD(sc: SparkContext, resource: String, cfg: Map[String, String]): RDD[(String, Map[String, AnyRef])] =
new ScalaEsRDD[Map[String, AnyRef]](sc, collection.mutable.Map(cfg.toSeq: _*) += (ES_RESOURCE_READ -> resource))
## ScalaESDirectRDD
+ 繼承AbstractDirectEsRDD
## AbstractDirectEsRDD
+ 抽象ESRDD類
## AbstractEsDirectRDDIterator
+ 對應AbstractEsRDDIterator
+ 處理直連查詢
## PartitionDirectReader
+ 處理director查詢
## SearchRequestBuilder
+ 增加buildDirect方法
## DirectQuery
+ 與ScrollQuery一致
+ 包含DirectReader的引用,處理結果集
+ 調用RestRepository,處理結果集
## RestRepository
### direrctQuery
+ 發起Restful請求,並使用傳入的Reader處理返回的結果;
+ 原始scroll接口,處理scroll查詢
+ 增加directQuery接口,處理direrctQuery查詢
### scanDirect
+ 直接創建DirectQuery
## DirectSearchRequestBuilder
+ 構建查詢;
## DirectReader
+ 對應ScrollReader
+ 處理結果集請求
+ 從Hits開始解析
## SimpleQueryParser
+ 增加parseESQuery
agg支持
- 實現流程類似,但執行節點為節點級別,而不是shard級別
- 數據結果解析,對agg別名設定規則
- 結果解析為同一個partition/各個shard對應一個partition,需要二次處理
- 后續:設定參數,進行shard級別聚合,並對聚合結果改寫、合並
spark MLLib
源碼
thrift server
ES-Hadoop
調優
動態資源分配
- spark.dynamicAllocation.enabled:該配置項用於配置是否使用動態資源分配,根據工作負載調整應用程序注冊的executor的數量。默認為false(至少在spark2.2-spark2.4中如此),在CDH發行版中默認為true,
- 如果啟用動態分配,在executor空閑spark.dynamicAllocation.executorIdleTimeout(默認60s)之后將被釋放。
動態資源分配策略
開啟動態分配策略后,application會在task因沒有足夠資源被掛起的時候去動態申請資源,這種情況意味着該application現有的executor無法滿足所有task並行運行。spark一輪一輪的申請資源,當有task掛起或等待spark.dynamicAllocation.schedulerBacklogTimeout(默認1s)時間的時候,會開始動態資源分配;之后會每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout(默認1s)時間申請一次,直到申請到足夠的資源。每次申請的資源量是指數增長的,即1,2,4,8等。
之所以采用指數增長,出於兩方面考慮:其一,開始申請的少是考慮到可能application會馬上得到滿足;其次要成倍增加,是為了防止application需要很多資源,而該方式可以在很少次數的申請之后得到滿足。
資源回收策略
當application的executor空閑時間超過spark.dynamicAllocation.executorIdleTimeout(默認60s)后,就會被回收。
內存占用
調優內存的使用主要有三個方面的考慮:對象的內存占用量(你可能希望整個數據集都適合內存),訪問這些數據的開銷,垃圾回收的負載。
默認情況下,java的對象是可以快速訪問的,但是相比於內部的原始數據消耗估計2-5倍的空間。主要歸於下面三個原因:
1),每個不同的Java對象都有一個“對象頭”,它大約是16個字節,包含一個指向它的類的指針。對於一個數據很少的對象(比如一個Int字段),這可以比數據大。
2),Java字符串在原始字符串數據上具有大約40字節的開銷(因為它們將它們存儲在一個Chars數組中,並保留額外的數據,例如長度),並且由於String的內部使用UTF-16編碼而將每個字符存儲為兩個字節。因此,一個10個字符的字符串可以容易地消耗60個字節。
3),常用集合類(如HashMap和LinkedList)使用鏈接的數據結構,其中每個條目都有一個“包裝器”對象(例如Map.Entry)。該對象不僅具有頭部,還包括指針(通常為8個字節)到列表中的下一個對象。
4),原始類型的集合通常將它們存儲為“boxed”對象,如java.lang.Integer。
確定內存的消耗
最好的方式去計算一個數據的的內存消耗,就是創建一個RDD,然后加入cache,這樣就可以在web ui中Storage頁面看到了。頁面會告訴你,這個RDD消耗了多少內存。
要估計特定對象的內存消耗,請使用SizeEstimator的估計方法。這對於嘗試使用不同的數據布局來修剪內存使用情況以及確定廣播變量在每個執行程序堆中占用的空間量非常有用。
調優數據結構
減少內存消耗的第一種方法是避免使用增加負擔的java特性,例如基於指針的數據結構和包裝對象。下面幾種方法可以來避免這個。
- 1,將數據結構設計為偏好對象數組和原始類型,而不是標准的Java或Scala集合類(例如HashMap)。fastutil庫(http://fastutil.di.unimi.it/)為與Java標准庫兼容的原始類型提供方便的集合類。
- 2,盡可能避免使用有很多小對象和指針的嵌套結構。
- 3,針對關鍵詞,考慮使用數字ID或者枚舉對象而不是字符串。
- 4,如果您的RAM少於32 GB,請設置JVM標志-XX:+ UseCompressedOops使指針為四個字節而不是八個字節。您可以在spark-env.sh中添加這些選項。
序列化RDD
盡管進行了調優,當您的對象仍然太大而無法有效存儲時,一個簡單的方法來減少內存使用是使用RDD持久性API中的序列化StorageLevel(如MEMORY_ONLY_SER)以序列化形式存儲它們。Spark將會將每個RDD分區存儲為一個大字節數組。以序列化形式存儲數據的唯一缺點是數據訪問變慢,因為必須對每個對象進行反序列化。如果您想以序列化形式緩存數據,我們強烈建議使用Kryo,因為它會使數據比java序列化后的大小更小(而且肯定比原Java對象更小)。
垃圾回收調優
- 垃圾收集的成本與Java對象的數量成正比,因此使用較少對象的數據結構(例如,Ints數組,代替LinkedList)將大大降低了成本。一個更好的方法是以序列化形式持久化對象,如上所述:每個RDD分區將只有一個對象(一個字節數組)。在嘗試其他技術之前,如果GC是一個問題,首先要嘗試的是使用序列化緩存。
由於任務的運行內存和RDD的緩存內存的干擾,GC也會是一個問題。
測量GC的影響
GC調優的第一步是收集關於垃圾收集發生頻率和GC花費的時間的統計信息。通過將-verbose:gc -XX:+ PrintGCDetails -XX:+ PrintGCTimeStamps添加到Java選項來完成。下次運行Spark作業時,每當垃圾收集發生時,都會看到在工作日志中打印的消息。請注意,這些日志將在您的群集的Executor節點上(在其工作目錄中的stdout文件中),而不是您的driver功能中。
高級GC調優
Spark應用程序GC調優的目標是,確保生命周期比較長的RDD保存在老年代,新生代有足夠的空間保存生命周期比較短的對象。這有助於避免觸發Full GC去收集task運行期間產生的臨時變量。下面列舉幾個有用的步驟:
- 1),通過收集垃圾回收信息,判斷是否有太多的垃圾回收過程。假如full gc在一個task完成之前觸發了好幾次,那說明運行task的內存空間不足,需要加內存。
- 2),在gc的統計信息中,如果老年代接近滿了,減少用於緩存的內存(通過減小spark.memory.Fraction)。緩存較少的對象比降低運行速度對我們來說更有好處。另外,可以考慮減少年輕代。可以通過減小-Xmn參數設置的值,假如使用的話。假如沒有設置可以修改JVM的NewRation參數。大多數JVMs默認值是2,意思是老年代占用了三分之二的總內存。這個值要足夠大,相當於擴展了spark.memory.fraction.
- 3),如果有太多的minor gc,較少的major gc,增加Eden區內存會有幫助。將Eden區內存設置的比task運行估計內存稍微大一些。如果Eden區大小確定為E,那就將新生代的內存設置為-Xmn=4/3E,按比例增加內存是考慮到survivor區所占用的內存。
- 4),嘗試通過設置-XX:+UseG1GC垃圾回收器為G1。在垃圾回收器是瓶頸的一些情況下,它可以提高性能。請注意,對於大的Executor堆,通過使用-XX:G!HeapRegionSize去增大G1的堆大小,顯得尤為重要。
- 5),例如,如果您的任務是從HDFS讀取數據,則可以使用從HDFS讀取的數據塊的大小來估計任務使用的內存量。請注意,解壓縮塊的大小通常是塊大小的2或3倍。所以如果我們希望有3或4個任務的工作空間,HDFS塊的大小是64 MB,我們可以估計Eden的大小是4 * 3 * 64MB。
- 6),監控垃圾收集的頻率和時間如何隨着新設置的變化而變化。
經驗表明,GC調整的效果取決於您的應用程序和可用的內存量。
並行度
並發不足會導致集群浪費。
- Spark自動會根據文件的大小,是否可分割等因素來設置map的數目;
- 對於分布式reduce操作,例如groupbykey和reducebykey,reduce默認數量是分區數最大的父RDD的分區數;
- 你也可以通過設置spark.default.parallelism來改變默認值,建議值是每個CPU執行2-3個tasks。
Reduce任務的內存使用
- 內存溢出並一定是RDD不適合放在內存里面,可能因為task的數據集太大了。
- Spark的shuffle操作(sortByKey, groupByKey, reduceByKey, join, etc)會構建一個hash表,每個task執行一個分組的數據,單個往往會很大。
- 最簡單的改善方法是增加並行度,讓每個task的輸入變得更小。
- Spark可以高效的支持短達200ms的任務,因為復用了Executor的JVM,這可以降低啟動成本,所以你可以很安全的增加並行度,使其超過你的集群core數目。
廣播變量
- spark的廣播功能可以大幅度減少每個序列化后的task的大小,也可以減少在集群中執行一個job的代價。
- 如果你的任務中使用了大的對象,比如靜態表,可以考慮將它聲明成廣播變量。
- 在driver節點,spark會打印出每個task序列化后的大小,所以你可以通過查看task的大小判斷你的task是否過大,通常task的大小超過20KB就值得調優了。
數據本地化
- 數據的本地性可能會對Spark jobs產生重大影響。如果數據和在其上操作的代碼在一起,則計算往往是快速的。但如果代碼和數據分開,則必須要有一方進行移動。典型的情況是將序列化后的代碼移動到數據所在的地方,因為數據往往比代碼大很多。Spark構建調度計划的原則就是數據本地性。
- 根據數據和代碼當前的位置,數據本地性等級。
- Spark傾向於調度任務依據最高的數據本地性,但這往往是不可能的。在任何空閑的Executor上沒有未處理數據的情況下,Spark會切換到較低的數據本地性。這種情況下會有兩個選擇:
- 1),等待CPU空閑,然后在相同的server上啟動task。
- 2),立即在一個需要遷移數據的較遠位置啟動一個新的task。
- Spark的典型處理策略是等待繁忙CPU釋放,時間很短。一旦超時,將移動數據到空閑CPU的地方執行任務。每個級別之間的回退等待超時可以在一個參數中單獨配置或全部配置。如果任務較長,且數據本地性較差,可以適當調整Spark.locatity超時時間相關的配置。
從最近到最遠的順序列出如下:
PROCESS_LOCAL
數據和代碼在同一個JVM中,這是最佳的數據本地性。
NODE_LOCAL
數據和代碼在相同的節點。比如數據在同一節點的HDFS上,或者在統一節點的Executor上。由於數據要在多個進程間移動,所以比PROCESS_LOCAL稍慢。
NO_PREF
數據可以從任何地方快速訪問,沒有數據本地性。
RACK_LOCAL
數據和代碼在相同的機架。數據位於同一機架上的不同服務器上,因此需要通過網絡發送,通常通過單個交換機發送
ANY
數據在網絡上的其他地方,而不在同一個機架中。
數據傾斜
數據源
- 盡量使用可切分的格式代替不可切分的格式,或者保證各文件實際包含數據量大致相同。
調整並行度分散同一個Task的不同Key
Spark在做Shuffle時,默認使用HashPartitioner(非Hash Shuffle)對數據進行分區。如果並行度設置的不合適,可能造成大量不相同的Key對應的數據被分配到了同一個Task上,造成該Task所處理的數據遠大於其它Task,從而造成數據傾斜。
如果調整Shuffle時的並行度,使得原本被分配到同一Task的不同Key發配到不同Task上處理,則可降低原Task所需處理的數據量,從而緩解數據傾斜問題造成的短板效應。
適用場景
大量不同的Key被分配到了相同的Task造成該Task數據量過大。
自定義Partitioner
使用自定義的Partitioner(默認為HashPartitioner),將原本被分配到同一個Task的不同Key分配到不同Task。
適用場景
大量不同的Key被分配到了相同的Task造成該Task數據量過大。
將Reduce side Join轉變為Map side Join
正確的使用Broadcast實現Map側Join的方式是,通過SET spark.sql.autoBroadcastJoinThreshold=104857600;將Broadcast的閾值設置得足夠大。
適用場景
參與Join的一邊數據集足夠小,可被加載進Driver並通過Broadcast方法廣播到各個Executor中。
為skew的key增加隨機前/后綴
為數據量特別大的Key增加隨機前/后綴,使得原來Key相同的數據變為Key不相同的數據,從而使傾斜的數據集分散到不同的Task中,徹底解決數據傾斜問題。Join另一則的數據中,與傾斜Key對應的部分數據,與隨機前綴集作笛卡爾乘積,從而保證無論數據傾斜側傾斜Key如何加前綴,都能與之正常Join。
步驟
現通過如下操作,實現傾斜Key的分散處理
- 將leftRDD中傾斜的key(即9500048與9500096)對應的數據單獨過濾出來,且加上1到24的隨機前綴,並將前綴與原數據用逗號分隔(以方便之后去掉前綴)形成單獨的leftSkewRDD
- 將rightRDD中傾斜key對應的數據抽取出來,並通過flatMap操作將該數據集中每條數據均轉換為24條數據(每條分別加上1到24的隨機前綴),形成單獨的rightSkewRDD
- 將leftSkewRDD與rightSkewRDD進行Join,並將並行度設置為48,且在Join過程中將隨機前綴去掉,得到傾斜數據集的Join結果skewedJoinRDD
- 將leftRDD中不包含傾斜Key的數據抽取出來作為單獨的leftUnSkewRDD
- 對leftUnSkewRDD與原始的rightRDD進行Join,並行度也設置為48,得到Join結果unskewedJoinRDD
- 通過union算子將skewedJoinRDD與unskewedJoinRDD進行合並,從而得到完整的Join結果集
適用場景
兩張表都比較大,無法使用Map則Join。其中一個RDD有少數幾個Key的數據量過大,另外一個RDD的Key分布較為均勻。
大表隨機添加N種隨機前綴,小表擴大N倍
如果出現數據傾斜的Key比較多,上一種方法將這些大量的傾斜Key分拆出來,意義不大。此時更適合直接對存在數據傾斜的數據集全部加上隨機前綴,然后對另外一個不存在嚴重數據傾斜的數據集整體與隨機前綴集作笛卡爾乘積(即將數據量擴大N倍)。
適用場景
一個數據集存在的傾斜Key比較多,另外一個數據集數據分布比較均勻。
Adaptive Execution
http://www.jasongj.com/spark/adaptive_execution/