join 是sql語句中的常用操作,良好的表結構能夠將數據分散在不同的表中,使其符合某種范式,減少表冗余,更新容錯等。而建立表和表之間關系的最佳方式就是Join操作。
sparksql作為大數據領域的sql實現,自然也對join操作做了不少優化,今天主要看一下在spark sql中對於join,常見的3種實現。
sparksql的3種join實現
1、Broadcast Join (小表對大表)
在數據庫的常見模型中(比如星型模型或者雪花模型),表一般分為兩種:事實表和維度表。
維度表一般指固定的、變動較少的表,例如聯系人、物品種類等,一般數據有限。
事實表一般記錄流水,比如銷售清單等,通常隨着時間的增長不斷膨脹。
因為Join 操作是對兩個表中key值相同的記錄進行連接,在SparkSQL中,對兩個表做join最直接的方式是先根據key分區,再在每個分區中把key值相同的記錄拿出來做
連接操作。但這樣就不可避免地涉及到shuffle,而shuffle在spark中比較耗時的操作,我們應該盡可能的設計Spark應用使其避免大量的shuffle。
當維度表和事實表進行join操作時,為了避免shuffle,我們可以將大小有限的維度表的全部數據分發到每個節點上,供事實表使用。executor存儲維度表的全部數據,一定程度上犧牲了
空間,換取shuffle操作大量的耗時,這在SparkSQL中稱作 Broadcast Join。
Table B 是較小的表,黑色表示將其廣播到每個executor節點上,Table A 的每個partition 會通過 block manager取到Table A的數據。根據每條記錄的 Join Key 取到
Table B中相對應的記錄,根據 Join Type進行操作。這個過程比較簡單,不做贅述。
Broadcast Join 的條件有以下幾個:
(1)被廣播的表需要小於 spark.sql.autoBroadcastJoinThreshold所配置的值,默認是10M(或者加了 broadcast join的 hint)
(2)基表不能被廣播,比如 left outer join時,只能廣播右表。
看起來廣播是一個比較理想的方案,但它有沒有缺點呢?也很明顯。這個方案只能用於廣播較小的表,否則數據的冗余傳輸就遠大於shuffle的開銷;
另外,廣播時需要將被廣播的表collect 到driver端,然后由driver端將數據分發到其他executor,當頻繁有廣播出現時,對driver的內存也是一個考驗。
2、Shuffle Hash Join
當一側的表比較小時,我們選擇將其廣播出去以避免shuffle,提高性能。但因為被廣播的表首先被collect到driver端,然后被冗余分發到每個executor上,所以當表比較大時,
采用 broadcast join 會對driver端和executor端造成較大的壓力。
但由於Spark 是一個分布式的計算引擎,可以通過分區的形式將大批量的數據划分成n份較小的數據集進行並行計算。這種思想應用到Join上便是 Shuffle Hash Join 了。
利用key相同必然分區相同的這個原理,Spark SQL將較大表的 join 分而治之,先將表划分成 n 個分區,再對兩個表中相對應分區的數據分別進行 Hash Join,這樣即在
一定程度上減少了driver廣播一側表的壓力,也減少了executor端取整張被廣播表的內存消耗。
Shuffle Hash Join 分為兩步:
1、對兩張表分別按照 join keys進行重分區,即shuffle,目的就是為了讓有相同 join keys值的記錄分到對應的分區中。
2、對對應分區中的數據進行 join,此處先將小表分區構造為一張hash 表,然后根據大表分區中記錄的join keys值拿出來進行匹配。
Shuffle Hash Join 的條件有以下幾個:
1、分區的平均大小不超過 spark.sql.autoBroadcastJoinThreshold 所配置的值,默認是 10M。
2、基表不能被廣播,比如 left outer join 時,只能廣播右表。
3、一側的表要明顯小於另外一側,小的一側將被廣播(明顯小於的定義為3倍小)
我們可以看到,在一定大小的表中,SparkSQL從時空結合的角度來看,將兩個表進行重新分區,並且對小表中的分區進行hash化,從而完成join。
在保持一定復雜度的基礎上,盡量減少driver和executor的內存壓力,提升了計算時的穩定性。
Sort Merge Join (大表對大表)
上面介紹的兩種實現對於一定大小的表表適用,但當兩個表都非常大時,顯然無論用哪種都會對計算內存造成很大壓力。這是因為join 時兩者采取的都是 hash join,
是將一側的數據完全加載到內存中,使用 hash code取 join keys值相等的記錄進行連接。
當兩個表都非常大時,SparkSQL 采用了一種全新的方案來對標進行 join,即 Sort Merge Join 。這種實現方式不用將一側數據全部加載后再進行 hash join,但需要在
join 前將數據排序。
可以看到,首先將兩張表按照 join keys 進行了重新shuffle,保證 join keys值相同的記錄會被分在相應的分區。分區后對每個分區內的數據進行排序,排序后
再對相應的分區內的記錄進行連接。
因為兩個序列都是有序的,從頭遍歷,碰到 key 相同的就輸出,如果不同,左邊小就繼續取左邊,反之取右邊。
可以看出,無論分區有多大,Sort Merge Join 都不用把某一側的數據全部加載到內存中,而是即用即丟,從而大大提升了大數量下 sql join 的穩定性。