Spark數據本地化-->如何達到性能調優的目的


Spark數據本地化-->如何達到性能調優的目的

1.Spark數據的本地化:移動計算,而不是移動數據

2.Spark中的數據本地化級別:

TaskSetManager 的 Locality Levels 分為以下五個級別:
PROCESS_LOCAL
 NODE_LOCAL
NO_PREF
   RACK_LOCAL
ANY
    
PROCESS_LOCAL    進程本地化:task要計算的數據在同一個Executor中
 
 
    NODE_LOCAL     節點本地化:速度比 PROCESS_LOCAL 稍慢,因為數據需要在不同進程之間傳遞或從文件中讀取
                                        情況一:task要計算的數據是在同一個Worker的不同Executor進程中
                                        情況二:task要計算的數據是在同一個Worker的 磁盤上,或在 HDFS 上,恰好有 block 在同一個節點上。
                                  Spark計算數據來源於HDFS,那么最好的數據本地化級別就是NODE_LOCAL
 
 
     NODE_PREF    沒有最佳 位置這一說,數據從哪里訪問都一樣快,不需要位置優先。比如說SparkSQL讀取MySql中的數據
 
    RACK_LOCAL  機架本地化,數據在同一機架的不同節點上。需要通過網絡傳輸數據及文件 IO,比 NODE_LOCAL 慢
                                         情況一:task計算的數據在Worker2的Executor中
                                         情況二:task計算的數據在Worker2的磁盤上
 
    ANY   跨機架,數據在非同一機架的網絡上,速度最慢
            

3.Spark中的數據本地化由誰負責?

         DAGScheduler,TaskScheduler
 
            val rdd1 = rdd1.cache
            rdd1.map.filter.count()
             Driver(TaskScheduler)在發送task之前,首先應該拿到RDD1緩存在哪一些節點上(node1,node2)--> 這一步就是由DAGScheduler通過cacheManager對象調用getPreferredLocations()來拿到RDD1緩存在哪些節點上, TaskScheduler根據這些節點來發送task。
 
            val rdd1 = sc.textFile("hdfs://...")    //rdd1中封裝了是這個文件所對應的block的位置,getPreferredLocation()-->TaskScheduler調用拿到partition所對應的數據的位置
            rdd1.map.filter.count()
             Driver(TaskScheduler)在發送task之前,首先應該拿到rdd1數據所在的位置(node1,node2)-->RDD1封裝了這個文件所對應的block的位置, TaskScheduler通過調用getPreferredLocations()拿到partition所對應的數據的位置, TaskScheduler根據這些位置來發送相應的task
 
    總的來說:
       Spark中的數據本地化由DAGScheduler和TaskScheduler共同負責。
       DAGScheduler切割Job,划分Stage, 通過調用submitStage來提交一個Stage對應的tasks,submitStage會調用submitMissingTasks, submitMissingTasks 確定每個需要計算的 task 的preferredLocations,通過調用getPreferrdeLocations()得到partition 的優先位置,就是這個 partition 對應的 task 的優先位置,對於要提交到TaskScheduler的TaskSet中的每一個task,該task優先位置與其對應的partition對應的優先位置一致。
      TaskScheduler接收到了TaskSet后TaskSchedulerImpl 會為每個 TaskSet 創建一個 TaskSetManager 對象,該對象包含taskSet 所有 tasks,並管理這些 tasks 的執行,其中就包括計算 TaskSetManager 中的 tasks 都有哪些locality levels,以便在調度和延遲調度 tasks 時發揮作用。

4.Spark中的數據本地化流程圖

某個 task 計算節點與其輸入數據的位置關系,下面將要挖掘Spark 的調度系統如何產生這個結果,這一過程涉及 RDD、DAGScheduler、TaskScheduler,搞懂了這一過程也就基本搞懂了 Spark 的 PreferredLocations(位置優先策略)
 
    第一步:PROCESS_LOCAL-->TaskScheduler首先根據數據所在的節點發送task,
如果task在Worker1的Executor1中等待了3s(這個3s是spark的默認等待時間,通過spark.locality.wait來設置,可以在SparkConf()中修改),重試了5次,還是無法執行
 
    TaskScheduler會降低數據本地化的級別,從PROCESS_LOCAL降到NODE_LOCAL
 
    第二步:NODE_LOCAL-->TaskScheduler重新發送task到Worker1中的Executor2中執行,
如果task在Worker1的Executor2中等待了3s,重試了5次,還是無法執行
 
    TaskScheduler會降低數據本地化的級別,從 NODE_LOCAL降到 RACK_LOCAL 
 
    第三步: RACK_LOCAL  -->TaskScheduler重新發送task到Worker2中的Executor1中執行。
 
   第四步:當task分配完成之后,task會通過所在Worker的Executor中的BlockManager來獲取數據,如果BlockManager發現自己沒有數據,那么它會調用getRemote()方法,通過ConnectionManager與原task所在節點的BlockManager中的ConnectionManager先建立連接,然后通過 TransferService(網絡傳輸組件) 獲取數據,通過網絡傳輸回task所在節點(這時候性能大幅下降,大量的網絡IO占用資源),計算后的結果返回給Driver。
 
總結:
   TaskScheduler在發送task的時候,會根據數據所在的節點發送task,這時候的數據本地化的級別是最高的,如果這個task在這個Executor中等待了三秒,重試發射了5次還是依然無法執行,那么TaskScheduler就會認為這個Executor的計算資源滿了,TaskScheduler會降低一級數據本地化的級別,重新發送task到其他的Executor中執行,如果還是依然無法執行,那么繼續降低數據本地化的級別...
 
     現在想讓每一個task都能拿到最好的數據本地化級別,那么調優點就是等待時間加長。注意!如果過度調大等待時間,雖然為每一個task都拿到了最好的數據本地化級別,但是我們job執行的時間也會隨之延長
  1. spark.locality.wait 3s//相當於是全局的,下面默認以3s為准,手動設置了,以手動的為准
  2. spark.locality.wait.process
  3. spark.locality.wait.node
  4. spark.locality.wait.rack
  5. newSparkConf.set("spark.locality.wait","100")

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM