Spark數據本地化-->如何達到性能調優的目的
1.Spark數據的本地化:移動計算,而不是移動數據
2.Spark中的數據本地化級別:
TaskSetManager 的 Locality Levels 分為以下五個級別: |
PROCESS_LOCAL |
NODE_LOCAL |
NO_PREF |
RACK_LOCAL |
ANY |
PROCESS_LOCAL
進程本地化:task要計算的數據在同一個Executor中

NODE_LOCAL
節點本地化:速度比 PROCESS_LOCAL 稍慢,因為數據需要在不同進程之間傳遞或從文件中讀取
情況一:task要計算的數據是在同一個Worker的不同Executor進程中
情況二:task要計算的數據是在同一個Worker的
磁盤上,或在 HDFS 上,恰好有 block 在同一個節點上。
Spark計算數據來源於HDFS,那么最好的數據本地化級別就是NODE_LOCAL

NODE_PREF 沒有最佳
位置這一說,數據從哪里訪問都一樣快,不需要位置優先。比如說SparkSQL讀取MySql中的數據
RACK_LOCAL
機架本地化,數據在同一機架的不同節點上。需要通過網絡傳輸數據及文件 IO,比 NODE_LOCAL 慢
情況一:task計算的數據在Worker2的Executor中
情況二:task計算的數據在Worker2的磁盤上

ANY 跨機架,數據在非同一機架的網絡上,速度最慢
3.Spark中的數據本地化由誰負責?
DAGScheduler,TaskScheduler
val rdd1 = rdd1.cache
rdd1.map.filter.count()
Driver(TaskScheduler)在發送task之前,首先應該拿到RDD1緩存在哪一些節點上(node1,node2)-->
這一步就是由DAGScheduler通過cacheManager對象調用getPreferredLocations()來拿到RDD1緩存在哪些節點上,
TaskScheduler根據這些節點來發送task。
val rdd1 = sc.textFile("hdfs://...") //rdd1中封裝了是這個文件所對應的block的位置,getPreferredLocation()-->TaskScheduler調用拿到partition所對應的數據的位置
rdd1.map.filter.count()
Driver(TaskScheduler)在發送task之前,首先應該拿到rdd1數據所在的位置(node1,node2)-->RDD1封裝了這個文件所對應的block的位置,
TaskScheduler通過調用getPreferredLocations()拿到partition所對應的數據的位置,
TaskScheduler根據這些位置來發送相應的task
總的來說:
Spark中的數據本地化由DAGScheduler和TaskScheduler共同負責。
DAGScheduler切割Job,划分Stage, 通過調用submitStage來提交一個Stage對應的tasks,submitStage會調用submitMissingTasks,
submitMissingTasks 確定每個需要計算的 task 的preferredLocations,通過調用getPreferrdeLocations()得到partition 的優先位置,就是這個 partition 對應的 task 的優先位置,對於要提交到TaskScheduler的TaskSet中的每一個task,該task優先位置與其對應的partition對應的優先位置一致。
TaskScheduler接收到了TaskSet后,TaskSchedulerImpl 會為每個 TaskSet 創建一個 TaskSetManager 對象,該對象包含taskSet 所有 tasks,並管理這些 tasks 的執行,其中就包括計算 TaskSetManager 中的 tasks 都有哪些locality levels,以便在調度和延遲調度 tasks 時發揮作用。
4.Spark中的數據本地化流程圖
即某個 task 計算節點與其輸入數據的位置關系,下面將要挖掘Spark 的調度系統如何產生這個結果,這一過程涉及 RDD、DAGScheduler、TaskScheduler,搞懂了這一過程也就基本搞懂了 Spark 的 PreferredLocations(位置優先策略)

第一步:PROCESS_LOCAL-->TaskScheduler首先根據數據所在的節點發送task,
如果task在Worker1的Executor1中等待了3s(這個3s是spark的默認等待時間,通過spark.locality.wait來設置,可以在SparkConf()中修改),重試了5次,還是無法執行
TaskScheduler會降低數據本地化的級別,從PROCESS_LOCAL降到NODE_LOCAL
第二步:NODE_LOCAL-->TaskScheduler重新發送task到Worker1中的Executor2中執行,
如果task在Worker1的Executor2中等待了3s,重試了5次,還是無法執行
TaskScheduler會降低數據本地化的級別,從
NODE_LOCAL降到
RACK_LOCAL
第三步:
RACK_LOCAL
-->TaskScheduler重新發送task到Worker2中的Executor1中執行。
第四步:當task分配完成之后,task會通過所在Worker的Executor中的BlockManager來獲取數據,如果BlockManager發現自己沒有數據,那么它會調用getRemote()方法,通過ConnectionManager與原task所在節點的BlockManager中的ConnectionManager先建立連接,然后通過
TransferService(網絡傳輸組件)
獲取數據,通過網絡傳輸回task所在節點(這時候性能大幅下降,大量的網絡IO占用資源),計算后的結果返回給Driver。
總結:
TaskScheduler在發送task的時候,會根據數據所在的節點發送task,這時候的數據本地化的級別是最高的,如果這個task在這個Executor中等待了三秒,重試發射了5次還是依然無法執行,那么TaskScheduler就會認為這個Executor的計算資源滿了,TaskScheduler會降低一級數據本地化的級別,重新發送task到其他的Executor中執行,如果還是依然無法執行,那么繼續降低數據本地化的級別...
現在想讓每一個task都能拿到最好的數據本地化級別,那么調優點就是等待時間加長。注意!如果過度調大等待時間,雖然為每一個task都拿到了最好的數據本地化級別,但是我們job執行的時間也會隨之延長
spark.locality.wait 3s//相當於是全局的,下面默認以3s為准,手動設置了,以手動的為准
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack
newSparkConf.set("spark.locality.wait","100")