Spark為什么比Hadoop要快?
Spark比hadoop快的原因,我認為主要是spark的DAG機制優於hadoop太多,spark的DAG機制以及RDD的設計避免了很多落盤的操作,在窄依賴的情況下可以在內存中完成end to end的計算,相比於hadoop的map reduce編程模型來說,少了很多IO的開銷。其次還有幾個其他方面的助攻
- shuffl機制的不同
雖然2者都會遇到shuffle的場景,但是spark的shuffle更加先進。對於MapReduce,其在Shuffle時需要花費大量時間進行排序,排序在MapReduce的Shuffle中似乎是不可避免的。而spark在shuffle的時候,不一定會用到排序,有可能會使用Hash的形式 - mr是多進程,spark則是多線程
線程相比於進程,少了很多CPU調度下的資源切換。
spark的shfflue機制
上面說到spark比mr優越的一個原因也是shffle機制的不同。具體來說,spark的shffle機制從它誕生到現在一共有這么幾種。
shuffle的意思就是打亂數據順序使得相同的key被統一到同一個分區。
spark中只有2類stage:ShuffleMapStage和ResultStage。但其總的過程包含有map階段(shuffle write)和reduce階段(shuffle read),兩個階段位於不同的stage中。
ResultStage基本上對應代碼中的action算子, 而ShuffleMapStage 的則伴隨着 shuffle IO。spark的mr和hadoop的mapreduce不是一個東西。map端task和reduce端task不在相同的stage中,map task位於ShuffleMapStage,reduce task位於ResultStage。map task會先執行,將上一個stage得到的最后結果寫出,后執行的reduce task拉取上一個stage進行合並。
對於一次shuffle,map過程和reduce過程都有若干個task來執行。對於task的個數,map端的task個數和RDD的partition個數相同,reduce 端的 stage 默認取spark.default.parallelism 這個配置項的值作為分區數,如果沒有配置,則以 map 端的最后一個 RDD 的分區數作為其分區數,分區數就將決定 reduce 端的 task 的個數。
在Spark的源碼中,負責shuffle過程的執行、計算和處理的組件主要就是ShuffleManager。
spark1.2
在Spark 1.2以前,默認的shuffle計算引擎是HashShuffleManager。它的計算模式比較的簡單粗暴,詳細如下:
- shuffle write階段
這個階段將stage中每個task處理的數據根據算子進行“划分”。比如reduceByKey,就是對相同的key執行hash算法,從而將相同都寫入同一個磁盤文件中,而每一個磁盤文件都只屬於下游stage的一個task。在將數據寫入磁盤之前,會先將數據寫入內存緩沖中,當內存緩沖填滿之后,才會溢寫到磁盤文件中去。
- shuffle read階段
stage的每一個task就需要將上一個stage的計算結果中的所有相同key,從各個節點上通過網絡都拉取到自己所在的節點上,然后進行key的聚合或連接等操作。由於shuffle write的過程中,task給下游stage的每個task都創建了一個磁盤文件,因此shuffle read的過程中,每個task只要從上游stage的所有task所在節點上,拉取屬於自己的那一個磁盤文件即可。
那么針對這種簡單粗暴的HashShuffleManager,有着一個非常嚴重的弊端:會產生大量的中間磁盤文件,這樣大量的磁盤IO操作會很影響性能。磁盤文件的數量由下一個stage的task數量決定,即下一個stage的task有多少個,當前stage的每個task就要創建多少份磁盤文件。比如下一個 stage 總共有 100 個 task,那么當前 stage 的每個 task 都要創建 100 份磁盤文件,如果當前stage有50個 task,那么總共會建立5000個磁盤文件。
優化之后的HashShuffleManager
由於原版的HashShuffleManager,HashShuffleManager后期進行了優化,這里說的優化是指可以設置一個參數,
spark.shuffle.consolidateFiles=true。該參數默認值為false,通常來說如果我們使用HashShuffleManager,那么都建議開啟這個選項。開啟consolidate機制之后,在shuffle write過程中,task不會為下游stage的每個task創建一個磁盤文件,此時會出現shuffleFileGroup的概念,每個 shuffleFileGroup會對應一批磁盤文件,磁盤文件的數量與下游stage的task數量是相同的。而此時就會根據Executor數,並行執行task。第一批並行執行的每個task都會創建一個shuffleFileGroup,並將數據寫入對應的磁盤文件內。當Executor執行完一批task,接着執行下一批task時,下一批task就會復用之前已有的shuffleFileGroup,將數據寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。即運行在同一個Executor的task會復用之前的磁盤文件。 這樣就可以有效將多個task的磁盤文件進行一定程度上的合並,從而大幅度減少磁盤文件的數量,進而提升shuffle write的性能。
當前spark默認使用的SortShuffleManager
在Spark 1.2以后的版本中,默認的ShuffleManager改成了SortShuffleManager。SortShuffleManager的運行機制主要分成兩種,一種是普通運行機制,另一種是bypass運行機制。當shuffle read task的數量小於等於spark.shuffle.sort.bypassMergeThreshold參數的值時(默認為200),就會啟用bypass機制。
普通運行模式
在普通模式下,數據會先寫入一個內存數據結構中,此時根據不同的shuffle算子,可以選用不同的數據結構。如果是由聚合操作的shuffle算子,就是用map的數據結構(邊聚合邊寫入內存),如果是join的算子,就使用array的數據結構(直接寫入內存)。等到內存容量到了臨界值就准備溢寫到磁盤。在溢寫到磁盤文件之前,會先根據key對內存數據結構中已有的數據進行排序,排序之后,會分批將數據寫入磁盤文件,每批次默認1萬條數據。此時task往磁盤溢寫,會產生多個臨時文件,最后會將所有的臨時文件都進行合並,合並成一個大文件。最終只剩下兩個文件,一個是合並之后的數據文件,一個是索引文件,標識了下游各個task的數據在文件中的start offset與end offset。下游的task根據索引文件讀取相應的數據文件。需要注意的是,此處所說的兩個文件,是指上游一個task生成兩個文件,而非所有的task最終只有兩個文件。
bypass機制
觸發bypass機制的條件:
- shuffle map task的數量小於spark.shuffle.sort.bypassMergeThreshold參數的值(默認200)
- 不是聚合類的shuffle算子(比如groupByKey)
排序的時間復雜度最高不能優於O(nlogn),那么如果將排序的時間復雜度省下,那么shuffle性能將會提升很多。bypass機制與普通SortShuffleManager運行機制的不同在於,bypass機制就是利用了hash的O(1)時間復雜度取代了排序的操作開銷,提升了這部分的性能。
task會為每個下游task都創建一個臨時磁盤文件,並將數據按key進行hash然后根據key的hash值,將key寫入對應的磁盤文件之中。如上,寫入磁盤文件時也是先寫入內存緩沖,緩沖寫滿之后再溢寫到磁盤文件的。最后,同樣會將所有臨時磁盤文件都合並成一個磁盤文件,並創建一個單獨的索引文件。
針對shuffle機制的調優策略
- spark.shuffle.file.buffer
該參數用於設置shuffle write task的BufferedOutputStream的buffer緩沖大小(默認是32K)。將數據寫到磁盤文件之前,會先寫入buffer緩沖中,待緩沖寫滿之后,才會溢寫到磁盤。
如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如64k),從而減少shuffle write過程中溢寫磁盤文件的次數,也就可以減少磁盤IO次數,進而提升性能。在實踐中發現,合理調節該參數。
- spark.reducer.maxSizeInFlight:
該參數用於設置shuffle read task的buffer緩沖大小,而這個buffer緩沖決定了每次能夠拉取多少數據。如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如96m),從而減少拉取數據的次數,也就可以減少網絡傳輸的次數,進而提升性能。
- spark.shuffle.io.maxRetries & spark.shuffle.io.retryWait:
spark.shuffle.io.retryWait:huffle read task從shuffle write task所在節點拉取屬於自己的數據時,如果因為網絡異常導致拉取失敗,是會自動進行重試的。該參數就代表了可以重試的最大次數。(默認是3次)
spark.shuffle.io.retryWait:該參數代表了每次重試拉取數據的等待間隔。(默認為5s)
一般的調優都是將重試次數調高,不調整時間間隔。
- spark.shuffle.memoryFraction:
該參數代表了Executor內存中,分配給shuffle read task進行聚合操作的內存比例。
- spark.shuffle.manager
該參數用於設置shufflemanager的類型(默認為sort)。Spark1.5x以后有三個可選項:
Hash:spark1.x版本的默認值,HashShuffleManager
Sort:spark2.x版本的默認值,普通機制,當shuffle read task 的數量小於等於spark.shuffle.sort.bypassMergeThreshold參數,自動開啟bypass 機制
tungsten-sort:
復制代碼spark.shuffle.sort.bypassMergeThreshold
參數說明:當ShuffleManager為SortShuffleManager時,如果shuffle read task的數量小於這個閾值(默認是200),則shuffle write過程中不會進行排序操作。
調優建議:當你使用SortShuffleManager時,如果的確不需要排序操作,那么建議將這個參數調大一些
- spark.shuffle.consolidateFiles:
如果使用HashShuffleManager,該參數有效。如果設置為true,那么就會開啟consolidate機制,也就是開啟優化后的HashShuffleManager。
如果的確不需要SortShuffleManager的排序機制,那么除了使用bypass機制,還可以嘗試將spark.shffle.manager參數手動指定為hash,使用HashShuffleManager,同時開啟consolidate機制。在實踐中嘗試過,發現其性能比開啟了bypass機制的SortShuffleManager要高出10%~30%。
Spark基石--RDD
RDD又叫彈性分布式數據集,表示一個只讀的包含多個並行分區的集合。其中
- 彈性意味着:當計算過程中內存不足時可刷寫到磁盤等外存上,可與外存做靈活的數據交換
- 分布式:分區使得可以在多個機器上並行計算
- 一組只讀的、可分區的分布式數據集合,集合內包含了多個分區
一個最簡單的rdd至少要包含以下幾個東西,這些都被定義在spark core組件的rdd的抽象類中
- 一組分區
- 定義在這個分區上的操作
- 當前rdd和其他rdd的依賴
可選的有 - Preferred Location
是一個列表,用於存儲每個 Partition 的優先位置。對於每個 HDFS 文件來說,這個列表保存的是每個 Partition 所在的塊的位置,也就是該文件的「划分點」
- 分區方式
RDD 的分區方式主要包含兩種:Hash Partitioner 和 Range Partitioner,這兩種分區類型都是針對 Key-Value 類型的數據,如是非 Key-Value 類型則分區函數為 None。Hash 是以 Key 作為分區條件的散列分布,分區數據不連續,極端情況也可能散列到少數幾個分區上導致數據不均等;Range 按 Key 的排序平衡分布,分區內數據連續,大小也相對均等。
那么我們把rdd畫出來之后它大概是這個樣子
圖所示是 RDD 的內部結構圖,它是一個只讀、有屬性的數據集。它的屬性用來描述當前數據集的狀態。
數據集data由數據的分區(partition)組成,並由(block)映射成真實數據。
RDD 的主要屬性可以分為 3 類:與上述圖中代碼對應,細節上來說主要有:
-
- 與其他 RDD 的關系(parents、dependencies)
-
- 數據(partitioner、checkpoint、storage level、iterator 等)
-
- RDD 自身屬性(sparkcontext、sparkconf)
RDD 自身屬性
從自身屬性說起,自身屬性包含spark上下文,以及初始化sc時候的配置信息conf
SparkContext 是 Spark job 的入口,由 Driver 創建在 client 端, 並向集群資源管理器如yarn等申請資源這些都是由sc完成
conf就是我們設置的一些job參數
數據集list
RDD 內部的數據集合在邏輯上和物理上被划分成多個小子集合,這樣的每一個子集合我們將其稱為分區(Partitions),分區的個數會決定並行計算的粒度,有多少分區就有多少的task,而每一個分區數值的計算都是在一個單獨的任務中進行的,因此並行任務的個數也是由 RDD分區的個數決定的。但事實上 RDD 只是數據集的抽象,分區內部並不會存儲具體的數據。Partition 類內包含一個 index 成員,表示該分區在 RDD 內的編號,通過 RDD 編號+分區編號可以確定該分區對應的唯一塊編號,再利用底層數據存儲層提供的接口就能從存儲介質(如:HDFS、Memory)中提取出分區對應的數據。
RDD 的分區方式主要包含兩種:Hash Partitioner 和 Range Partitioner,這兩種分區類型都是針對 Key-Value 類型的數據,如是非 Key-Value 類型則分區函數為 None。Hash 是以 Key 作為分區條件的散列分布,分區數據不連續,極端情況也可能散列到少數幾個分區上導致數據不均等;Range 按 Key 的排序平衡分布,分區內數據連續,大小也相對均等。
Preferred Location 是一個列表,用於存儲每個 Partition 的優先位置。對於每個 HDFS 文件來說,這個列表保存的是每個 Partition 所在的塊的位置,也就是該文件的「划分點」。
Storage Level 是 RDD 持久化的存儲級別,RDD 持久化可以調用兩種方法:cache 和 persist:persist 方法可以自由的設置存儲級別,默認是持久化到內存;cache 方法是將 RDD 持久化到內存,cache 的內部實際上是調用了persist 方法,由於沒有開放存儲級別的參數設置,所以是直接持久化到內存。
checkpoint
Checkpoint 是 Spark 提供的一種緩存機制,當需要計算依賴鏈非常長又想避免重新計算之前的 RDD 時,可以對 RDD 做 Checkpoint 處理,檢查 RDD 是否被物化或計算,並將結果持久化到磁盤或 HDFS 內。Checkpoint 會把當前 RDD 保存到一個目錄,要觸發 action 操作的時候它才會執行。在 Checkpoint 應該先做持久化(persist 或者 cache)操作,否則就要重新計算一遍。若某個 RDD 成功執行 checkpoint,它前面的所有依賴鏈會被銷毀。
與 Spark 提供的另一種緩存機制 cache 相比:cache 緩存數據由 executor 管理,若 executor 消失,它的數據將被清除,RDD 需要重新計算;而 checkpoint 將數據保存到磁盤或 HDFS 內,job 可以從 checkpoint 點繼續計算。Spark 提供了 rdd.persist(StorageLevel.DISK_ONLY) 這樣的方法,相當於 cache 到磁盤上,這樣可以使 RDD 第一次被計算得到時就存儲到磁盤上,它們之間的區別在於:persist 雖然可以將 RDD 的 partition 持久化到磁盤,但一旦作業執行結束,被 cache 到磁盤上的 RDD 會被清空;而 checkpoint 將 RDD 持久化到 HDFS 或本地文件夾,如果不被手動 remove 掉,是一直存在的。
spark血統
一個作業從開始到結束的計算過程中產生了多個 RDD,RDD 之間是彼此相互依賴的,我們把這種父子依賴的關系稱之為「血統」。RDD 只支持粗顆粒變換,即只記錄單個塊(分區)上執行的單個操作,然后創建某個 RDD 的變換序列(血統 lineage)存儲下來。
變換序列指每個 RDD 都包含了它是如何由其他 RDD 變換過來的以及如何重建某一塊數據的信息。因此 RDD 的容錯機制又稱「血統」容錯。 要實現這種「血統」容錯機制,最大的難題就是如何表達父 RDD 和子 RDD 之間的依賴關系。
分區機制
RDD 的分區機制有兩個關鍵點:一個是關鍵參數,即 Spark 的默認並發數 spark.default.parallelism;另一個是關鍵原則,RDD 分區盡可能使得分區的個數等於集群核心數目。
設置的時候可以在spark conf下設置
- spark.default.parallelism
- 或者在rdd轉換的時候做repartition
RDD常用操作/算子
action算子
transform算子
spark 數據傾斜問題的處理方案
數據傾斜也是我們經常遇到的問題之一,那么通常情況下,判斷spark數據傾斜的方法有2個
- client模式下,會有“進度條”,會顯示當前位於那個stage,以及當前stage的task數量,當進度條突然變得很慢的時候,或者卡在某個stage不動的話,一般都是出現了數據傾斜(很少會出現資源被占)
- spark UI: 從spark ui中可以看出每個task處理的數據量和消耗的時間
數據傾斜發生的原因
數據傾斜的原理很簡單:在進行shuffle的時候,必須將各個節點上相同的key拉取到某個節點上的一個task來進行處理,比如按照key進行聚合或join等操作。此時如果某個key對應的數據量特別大的話,就會發生數據傾斜。比如大部分key對應10條數據,但是個別key卻對應了100萬條數據,那么大部分task可能就只會分配到10條數據,然后1秒鍾就運行完了;但是個別task可能分配到了100萬數據,要運行一兩個小時。因此,整個Spark作業的運行進度是由運行時間最長的那個task決定的。
因此出現數據傾斜的時候,Spark作業看起來會運行得非常緩慢,甚至可能因為某個task處理的數據量過大導致內存溢出。
下圖就是一個很清晰的例子:
hello這個key,在三個節點上對應了總共7條數據,這些數據都會被拉取到同一個task中進行處理;而world和you這兩個key分別才對應1條數據,所以另外兩個task只要分別處理1條數據即可。此時第一個task的運行時間可能是另外兩個task的7倍,而整個stage的運行速度也由運行最慢的那個task所決定。
那么面對數據傾斜大概有如下幾個方案
map側代替join側
小表join大表,如果小表能夠被存儲在work的內存中的話,通過Spark的Broadcast機制,將Reduce側Join轉化為Map側Join,避免Shuffle, 整個流程從寬依賴轉變為窄依賴,從而完全消除Shuffle帶來的數據。
隨機前綴后綴key
為數據量特別大的Key增加隨機前/后綴,使得原來Key相同的數據變為Key不相同的數據,從而使傾斜的數據集分散到不同的Task中,徹底解決數據傾斜問題。Join另一則的數據中,與傾斜Key對應的部分數據,與隨機前綴集作笛卡爾乘積,從而保證無論數據傾斜側傾斜Key如何加前綴,都能與之正常Join。
增加task的並行度
也可以被稱為提高shfflue操作的並行度,在對RDD執行shuffle算子時,給shuffle算子傳入一個參數,比如reduceByKey(1000),該參數就設置了這個shuffle算子執行時shuffle read task的數量。對於Spark SQL中的shuffle類語句,比如group by、join等,需要設置一個參數,即spark.sql.shuffle.partitions,該參數代表了shuffle read task的並行度,該值默認是200
增加shuffle read task的數量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數據。舉例來說,如果原本有5個key,每個key對應10條數據,這5個key都是分配給一個task的,那么這個task就要處理50條數據。而增加了shuffle read task以后,每個task就分配到一個key,即每個task就處理10條數據,那么自然每個task的執行時間都會變短了。具體原理如下圖所示。
參考文獻或轉載
shfflue機制 https://juejin.im/post/6844904099872243719
spark rdd
[數據傾斜1](http://www.jasongj.com/spark/skew/, https://blog.csdn.net/u012501054/article/details/101371050)