一.spark 分區 partition的理解:
spark中是以vcore級別調度task的。
如果讀取的是hdfs,那么有多少個block,就有多少個partition
舉例來說:sparksql 要讀表T, 如果表T有1w個小文件,那么就有1w個partition
這時候讀取效率會較低。假設設置資源為 --executor-memory 2g --executor-cores 2 --num-executors 5。
步驟是拿出1-10號10個小文件(也就是10個partition) 分別給5個executor讀取(spark調度會以vcore為單位,實際就是5個executor,10個task讀10個partition)
如果5個executor執行速度相同,再拿11-20號文件 依次給這5個executor讀取
而實際執行速度不會完全相同,那就是哪個task先執行完,哪個task領取下一個partition讀取執行,以此類推。這樣往往讀取文件的調度時間大於讀取文件本身,而且會頻繁打開關閉文件句柄,浪費較為寶貴的io資源,執行效率也大大降低。
二.coalesce 與 repartition的區別(我們下面說的coalesce都默認shuffle參數為false的情況)
repartition(numPartitions:Int):RDD[T]和coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]
repartition只是coalesce接口中shuffle為true的實現
我們還拿上面的例子說:
有1w的小文件,資源也為--executor-memory 2g --executor-cores 2 --num-executors 5。
repartition(4):產生shuffle。這時會啟動5個executor像之前介紹的那樣依次讀取1w個分區的文件,然后按照某個規則%4,寫到4個文件中,這樣分區的4個文件基本毫無規律,比較均勻。
coalesce(4):這個coalesce不會產生shuffle。那啟動5個executor在不發生shuffle的時候是如何生成4個文件呢,其實會有1個或2個或3個甚至更多的executor在空跑(具體幾個executor空跑與spark調度有關,與數據本地性有關,與spark集群負載有關),他並沒有讀取任何數據!
PS:
1.如果結果產生的文件數要比源RDD partition少,用coalesce是實現不了的,例如有4個小文件(4個partition),你要生成5個文件用coalesce實現不了,也就是說不產生shuffle,無法實現文件數變多。
2.如果你只有1個executor(1個core),源RDD partition有5個,你要用coalesce產生2個文件。那么他是預分partition到executor上的,例如0-2號分區在先executor上執行完畢,3-4號分區再次在同一個executor執行。其實都是同一個executor但是前后要串行讀不同數據。與用repartition(2)在讀partition上有較大不同(串行依次讀0-4號partition 做%2處理)。
三.實例:
T表有10G數據 有100個partition 資源也為--executor-memory 2g --executor-cores 2 --num-executors 5。我們想要結果文件只有一個
1. 如果用coalesce:sql(select * from T).coalesce(1)
5個executor 有4個在空跑,只有1個在真正讀取數據執行,這時候效率是極低的。所以coalesce要慎用,而且它還用產出oom問題,這個我們以后再說。
2. 如果用repartition:sql(select * from T).repartition(1)
這樣效率就會高很多,並行5個executor在跑(10個task),然后shuffle到同一節點,最后寫到一個文件中
那么如果我不想產生一個文件了,我想產生10個文件會怎樣,是不是coalesce 又變得比 repartition高效了呢。(因為coalesce無shuffle,相當於每個executor的 task認領 10個 partition)
那么如果我又不想產生10個文件呢?其實一旦要產生的文件數大於executor x vcore數,coalesce效率就更高(一般是這樣,不絕對)。
四.總結:
我們常認為coalesce不產生shuffle會比repartition 產生shuffle效率高,而實際情況往往要根據具體問題具體分析,coalesce效率不一定高,有時還有大坑,大家要慎用。
coalesce 與 repartition 他們兩個都是RDD的分區進行重新划分,repartition只是coalesce接口中shuffle為true的實現(假設源RDD有N個分區,需要重新划分成M個分區)
1)如果N<M。一般情況下N個分區有數據分布不均勻的狀況,利用HashPartitioner函數將數據重新分區為M個,這時需要將shuffle設置為true(repartition實現,coalesce也實現不了)。
2)如果N>M並且N和M相差不多,(假如N是1000,M是100)那么就可以將N個分區中的若干個分區合並成一個新的分區,最終合並為M個分區,這時可以將shuff設置為false(coalesce實現),如果M>N時,coalesce是無效的,不進行shuffle過程,父RDD和子RDD之間是窄依賴關系,無法使文件數(partiton)變多。
總之如果shuffle為false時,如果傳入的參數大於現有的分區數目,RDD的分區數不變,也就是說不經過shuffle,是無法將RDD的分區數變多的
3)如果N>M並且兩者相差懸殊,這時你要看executor數與要生成的partition關系,如果executor數 <= 要生成partition數,coalesce效率高,反之如果用coalesce會導致(executor數-要生成partiton數)個excutor空跑從而降低效率。如果在M為1的時候,為了使coalesce之前的操作有更好的並行度,可以將shuffle設置為true。
本人水平有限,如果錯誤,請大家批評指正,謝謝大家。