Join 背景介紹
Join 是數據庫查詢永遠繞不開的話題,傳統查詢 SQL 技術總體可以分為簡單操作(過濾操作、排序操作 等),聚合操作-groupby 以及 Join 操作等。其中 Join 操作是最復雜、代價最大的操作類型,也是 OLAP 場景中使用相對較多的操作。
另外,從業務層面來講,用戶在數倉建設的時候也會涉及 Join 使用的問題。通常情況下,數據倉庫中的表一般會分為“低層次表”和“高層次表”。
- 所謂“低層次表”:就是數據源導入數倉之后直接生成的表,單表列值較少,一般可以明顯歸為維度表或事實表,表和表之間大多存在外健依賴,所以查詢起來會遇到大量 Join 運算,查詢效率很差。
- “高層次表”:是在“低層次表”的基礎上加工轉換而來,通常做法是使用 SQL 語句將需要 Join 的表預先進行合並形成“寬表”,在寬表上的查詢不需要執行大量 Join,效率很高。但寬表缺點是數據會有大量冗余,且相對生成較滯后,查詢結果可能並不及時。
為了獲得時效性更高的查詢結果,大多數場景都需要進行復雜的 Join 操作。Join 操作之所以復雜,主要是通常情況下其時間空間復雜度高,且有很多算法,在不同場景下需要選擇特定算法才能獲得最好的優化效果。
本文將介紹 SparkSQL 所支持的幾種常見的 Join 算法及其適用場景。
基本實現機制
shuffle hash join、broadcast hash join 兩者歸根到底都屬於 hash join,只不過在 hash join 之前需要先 shuffle 還是先 broadcast。其實,hash join 算法來自於傳統數據庫,而 shuffle 和 broadcast 是大數據的皮(分布式),兩者一結合就成了大數據的算法了。因此可以說,大數據的根就是傳統數據庫。既然 hash join 是“內核”,那就刨出來看看,看完把“皮”再分析一下。
1、hash join
先來看看這樣一條 SQL 語句:
- select * from order,item where item.id = order.i_id
很簡單一個 Join 節點,參與 join 的兩張表是 item 和 order,join key 分別是 item.id 以及 order.i_id。現在假設這個 Join 采用的是 hash join 算法,整個過程會經歷三步:
分步解釋:
- 1. 確定 Build Table 以及 Probe Table:這個概念比較重要,Build Table 使用 join key 構建 Hash Table,而 Probe Table 使用 join key 進行探測,探測成功就可以 join 在一起。通常情況下,小表會作為 Build Table,大表作為 Probe Table。此事例中 item 為 Build Table,order 為 Probe Table。
- 2. 構建 Hash Table:依次讀取 Build Table(item)的數據,對於每一行數據根據 join key(item.id)進行 hash,hash 到對應的 Bucket,生成 hash table 中的一條記錄。數據緩存在內存中,如果內存放不下需要 dump 到外存。
- 3. 探測:再依次掃描 Probe Table(order)的數據,使用相同的 hash 函數映射 Hash Table 中的記錄,映射成功之后再檢查 join 條件(item.id = order.i_id),如果匹配成功就可以將兩者 join 在一起。
基本流程可以參考上圖,這里有兩個小問題需要關注:
1. hash join 性能如何?
- 很顯然,hash join 基本都只掃描兩表一次,可以認為 o(a+b),較之最極端的笛卡爾集運算 a*b,不知甩了多少條街。
2. 為什么 Build Table 選擇小表?
- 道理很簡單,因為構建的 Hash Table 最好能全部加載在內存,效率最高;這也決定了 hash join 算法只適合至少一個小表的 join 場景,對於兩個大表的 join 場景並不適用。
上文說過,hash join 是傳統數據庫中的單機 join 算法,在分布式環境下需要經過一定的分布式改造,就是盡可能利用分布式計算資源進行並行化計算,提高總體效率。hash join 分布式改造一般有兩種經典方案:
1.broadcast hash join
- 將其中一張小表廣播分發到另一張大表所在的分區節點上,分別並發地與其上的分區記錄進行 hash join。broadcast 適用於小表很小,可以直接廣播的場景。
2. shuffler hash join
- 一旦小表數據量較大,此時就不再適合進行廣播分發。這種情況下,可以根據 join key 相同必然分區相同的原理,將兩張表分別按照 join key 進行重新組織分區,這樣就可以將 join 分而治之,划分為很多小 join,充分利用集群資源並行化。
broadcast hash join
如下圖所示,broadcast hash join 可以分為兩步:
分步解釋:
1.broadcast 階段:
- 將小表廣播分發到大表所在的所有主機。廣播算法可以有很多,最簡單的是先發給 driver,driver 再統一分發給所有 executor;要不就是基於 BitTorrent 的 TorrentBroadcast。
2. hash join 階段:
- 在每個 executor 上執行單機版 hash join,小表映射,大表試探。
SparkSQL 規定 broadcast hash join 執行的基本條件為被廣播小表必須小於參數 spark.sql.autoBroadcastJoinThreshold,默認為 10M。
shuffle hash join
在大數據條件下如果一張表很小,執行 join 操作最優的選擇無疑是 broadcast hash join,效率最高。但是一旦小表數據量增大,廣播所需內存、帶寬等資源必然就會太大,broadcast hash join 就不再是最優方案。此時可以按照 join key 進行分區,根據 key 相同必然分區相同的原理,就可以將大表 join 分而治之,划分為很多小表的 join,充分利用集群資源並行化。如下圖所示
shuffle hash join 也可以分為兩步:
1.shuffle 階段:
- 分別將兩個表按照 join key 進行分區,將相同 join key 的記錄重分布到同一節點,兩張表的數據會被重分布到集群中所有節點。這個過程稱為 shuffle。
2. hash join 階段:
- 每個分區節點上的數據單獨執行單機 hash join 算法。
看到這里,可以初步總結出來如果兩張小表 join 可以直接使用單機版 hash join;如果一張大表 join 一張極小表,可以選擇 broadcast hash join 算法;而如果是一張大表 join 一張小表,則可以選擇 shuffle hash join 算法;那如果是兩張大表進行 join 呢?
sort merge join
SparkSQL 對兩張大表 join 采用了全新的算法-sort-merge join,如下圖所示
整個過程分為三個步驟:
1. shuffle 階段:
- 將兩張大表根據 join key 進行重新分區,兩張表數據會分布到整個集群,以便分布式並行處理。
2. sort 階段:
- 對單個分區節點的兩表數據,分別進行排序。
3. merge 階段:
- 對排好序的兩張分區表數據執行 join 操作。join 操作很簡單,分別遍歷兩個有序序列,碰到相同 join key 就 merge 輸出,否則取更小一邊。如下圖所示:
經過上文的分析,很明顯可以得出來這幾種 Join 的代價關系:cost(broadcast hash join) < cost(shuffle hash join) < cost(sort merge join),數據倉庫設計時最好避免大表與大表的 join 查詢,SparkSQL 也可以根據內存資源、帶寬資源適量將參數 spark.sql.autoBroadcastJoinThreshold 調大,讓更多 join 實際執行為 broadcast hash join。
參考資料