Spark筆記之數據本地性(data locality)


 

一、什么是數據本地性(data locality)

大數據中有一個很有名的概念就是“移動數據不如移動計算”,之所以有數據本地性就是因為數據在網絡中傳輸會有不小的I/O消耗,如果能夠想辦法盡量減少這個I/O消耗就能夠提升效率。那么如何減少I/O消耗呢,當然是盡量不讓數據在網絡上傳輸,即使無法避免數據在網絡上傳輸,也要盡量縮短傳輸距離,這個數據需要傳輸多遠的距離(實際意味着數據傳輸的代價)就是數據本地性,數據本地性根據傳輸距離分為幾個級別,不在網絡上傳輸肯定是最好的級別,其它級別划分依據傳輸距離越遠級別越低,Spark在分配任務的時候會考慮到數據本地性,優先將任務分配給數據本地性最好的Executor執行。

在執行任務時查看Task的執行情況,經常能夠看到Task的狀態中有這么一列: image

這一列就是在說這個Task任務讀取數據的本地性是哪個級別,數據本地性共分為五個級別:

PROCESS_LOCAL:顧名思義,要處理的數據就在同一個本地進程中,即數據和Task在同一個Executor JVM中,這種情況就是RDD的數據在之前就已經被緩存過了,因為BlockManager是以Executor為單位的,所以只要Task所需要的Block在所屬的Executor的BlockManager上已經被緩存,這個數據本地性就是PROCESS_LOCAL,這種是最好的locality,這種情況下數據不需要在網絡中傳輸。

NODE_LOCAL:數據在同一台節點上,但是並不不在同一個jvm中,比如數據在同一台節點上的另外一個Executor上,速度要比PROCESS_LOCAL略慢。還有一種情況是讀取HDFS的塊就在當前節點上,數據本地性也是NODE_LOCAL。

NO_PREF:數據從哪里訪問都一樣,表示數據本地性無意義,看起來很奇怪,其實指的是從MySQL、MongoDB之類的數據源讀取數據。

RACK_LOCAL:數據在同一機架上的其它節點,需要經過網絡傳輸,速度要比NODE_LOCAL慢。

ANY:數據在其它更遠的網絡上,甚至都不在同一個機架上,比RACK_LOCAL更慢,一般情況下不會出現這種級別,萬一出現了可能是有什么異常需要排查下原因。

使用一張圖來表示五個傳輸級別:

image

 

二、延遲調度策略(Delay Scheduler)

Spark在調度程序的時候並不一定總是能按照計算出的數據本地性執行,因為即使計算出在某個Executor上執行時數據本地性最好,但是Executor的core也是有限的,有可能計算出TaskFoo在ExecutorBar上執行數據本地性最好,但是發現ExecutorBar的所有core都一直被用着騰不出資源來執行新來的TaskFoo,所以當TaskFoo等待一段時間之后發現仍然等不到資源的話就嘗試降低數據本地性級別讓其它的Executor去執行。

比如當前有一個RDD,有四個分區,稱為A、B、C、D,當前Stage中這個RDD的每個分區對應的Task分別稱為TaskA、TaskB、TaskC、TaskD,在之前的Stage中將這個RDD cache在了一台機器上的兩個Executor上,稱為ExecutorA、ExecutorB,每個Executor的core是2,ExecutorA上緩存了RDD的A、B、C分區,ExecutorB上緩存了RDD的D分區,然后分配Task的時候會把TaskA、TaskB、TaskC分配給ExecutorA,TaskD分配給ExecutorB,但是因為每個Executor只有兩個core,只能同時執行兩個Task,所以ExecutorA能夠執行TaskA和TaskB,但是TaskC就只能等着,盡管它在ExecutorA上執行的數據本地性是PROCESS_LOCAL,但是人家沒有資源啊,於是TaskC就等啊等,但是等了一會兒它發現不太對勁,搞這個數據本地性不就是為了加快Task的執行速度以提高Stage的整體執行速度嗎,我擱這里干等着可不能加快Stage的整體速度,我要看下邊上有沒有其它的Executor是閑着的,假設我在ExecutorA需要再排隊10秒才能拿到core資源執行,拿到資源之后我需要執行30秒,那么我只需要找到一個其它的Executor,即使因為數據本地性不好但是如果我能夠在40秒內執行完的話還是要比在這邊繼續傻等要快的,所以TaskC就給自己設定了一個時間,當超過n毫秒之后還等不到就放棄PROCESS_LOCAL級別,轉而嘗試NODE_LOCAL級別的Executor,然后它看到了ExecutorB,ExecutorB和ExecutorA在同一台機器上,只是兩個不同的jvm,所以在ExecutorB上執行需要從ExecutorA上拉取數據,通過BlockManager的getRemote,底層通過BlockTransferService去把數據拉取過來,因為是在同一台機器上的兩個進程之間使用socket數據傳輸,走的應該是回環地址,速度會非常快,所以對於這種數據存儲在同一台機器上的不同Executor上因為降級導致的NODE_LOCAL的情況,理論上並不會比PROCESS_LOCAL慢多少,TaskC在ExecutorB上執行並不會比ExecutorA上執行慢多少。但是對於比如HDFS塊存儲在此節點所以將Task分配到此節點的情況導致的NODE_LOCAL,因為要跟HDFS交互,還要讀取磁盤文件,涉及到了一些I/O操作,這種情況就會耗費較長時間,相比較於PROCESS_LOCAL級別就慢上不少了。

上面舉的例子中提到了TaskC會等待一段時間,根據數據本地性不同,等待的時間間隔也不一致,不同數據本地性的等待時間設置參數:

spark.locality.wait:設置所有級別的數據本地性,默認是3000毫秒

spark.locality.wait.process:多長時間等不到PROCESS_LOCAL就降級,默認為${spark.locality.wait}

spark.locality.wait.node:多長時間等不到NODE_LOCAL就降級,默認為${spark.locality.wait}

spark.locality.wait.rack:多長時間等不到RACK_LOCAL就降級,默認為${spark.locality.wait}

總結一下數據延遲調度策略:當使用當前的數據本地性級別等待一段時間之后仍然沒有資源執行時,嘗試降低數據本地性級別使用更低的數據本地性對應的Executor執行,這個就是Task的延遲調度策略。

 

最后探討一下什么樣的Task可以針對數據本地性延遲調度的等待時間做優化?

如果Task的輸入數據比較大,那么耗費在數據讀取上的時間會比較長,一個好的數據本地性能夠節省很長時間,所以這種情況下最好還是將延遲調度的降級等待時間調長一些。而對於輸入數據比較小的,即使數據本地性不好也只是多花一點點時間,那么便不必在延遲調度上耗費太長時間。總結一下就是如果數據本地性對任務的執行時間影響較大的話就稍稍調高延遲調度的降級等待時間。

 

 

相關資料:

1. spark on yarn 中的延遲調度(delay scheduler)

2. 談談spark 的計算本地性

 

.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM