SQL 中 left join 的底層原理


轉載自:https://www.cnblogs.com/jmcui/archive/2020/12/10/14117113.html#_label1

好好學習數據結構吧,畢業也幾年了,很多東西都稀里糊塗,不能再這樣下去了。

突然想起來了高中物理老師的一句話,做了十道題,不如弄懂一道題。

01. 前言

寫過或者學過 SQL 的人應該都知道 left join,知道 left join 的實現的效果,就是保留左表的全部信息,然后把右表往左表上拼接,如果拼不上就是 null。除了 left join 以外,還有 inner join、outer join、right join,這些不同的 join 能達到的什么樣的效果,大家應該都了解了,如果不了解的可以看看網上的帖子或者隨便一本 SQL 書都有講的。今天我們不講這些 join 能達到什么效果,我們主要講這些 join 的底層原理是怎么實現的,也就是具體的效果是怎么呈現出來的。

join 主要有 Nested Loop、Hash Join、Merge Join 這三種方式,我們這里只講最普遍的,也是最好的理解的 Nested Loop,Nested Loop 翻譯過來就是嵌套循環的意思,那什么又是嵌套循環呢?嵌套大家應該都能理解,就是一層套一層;那循環呢,你可以理解成是 for 循環。

Nested Loop 里面又有三種細分的連接方式,分別是 Simple Nested-Loop Join、Index Nested-Loop Join、Block Nested-Loop Join,接下來我們就分別去看一下這三種細分的連接方式。

在正式開始之前,先介紹兩個概念:驅動表(也叫外表)和被驅動表(也叫非驅動表,還可以叫匹配表,亦可叫內表),簡單來說,驅動表就是主表,left join 中的左表就是驅動表,right join 中的右表是驅動表。一個是驅動表,那另一個就只能是非驅動表了,在 join 的過程中,其實就是從驅動表里面依次(注意理解這里面的依次)取出每一個值,然后去非驅動表里面進行匹配,那具體是怎么匹配的呢?這就是我們接下來講的這三種連接方式。

02.Simple Nested-Loop Join

Simple Nested-Loop Join 是這三種方法里面最簡單,最好理解,也是最符合大家認知的一種連接方式,現在有兩張表 table A 和 table B,我們讓 table A left join table B,如果是用第一種連接方式去實現的話,會是怎么去匹配的呢?直接上圖:

上面的 left join 會從驅動表 table A 中依次取出每一個值,然后去非驅動表 table B 中從上往下依次匹配,然后把匹配到的值進行返回,最后把所有返回值進行合並,這樣我們就查找到了 table A left join table B 的結果。是不是和你的認知是一樣的呢?利用這種方法,如果 table A 有10行,table B 有10行,總共需要執行 10 x 10 = 100 次查詢。

這種暴力匹配的方式在數據庫中一般不使用。

03.Index Nested-Loop Join

Index Nested-Loop Join 這種方法中,我們看到了 Index,大家應該都知道這個就是索引的意思,這個 Index 是要求非驅動表上要有索引,有了索引以后可以減少匹配次數,匹配次數減少了就可以提高查詢的效率了。

為什么會有了索引以后可以減少查詢的次數呢?這個其實就涉及到數據結構里面的一些知識了,給大家舉個例子就清楚了。

上圖中左邊就是普通列的存儲方式,右邊是樹結構索引,什么是樹結構呢?就是數據分布的像樹這樣一層一層的,樹結構有一個特點就是左邊的數據小於頂點的數,右邊的數大於頂點的數,你看右圖中,左邊的數3是不是小於頂點6,右邊的數7是不是大於頂點6;左邊的數1是不是小於頂點3,右邊的數4是不是大於頂點3。

假如我們現在要匹配數值9,如果是左邊這種數據存儲方式的話,我們需要從第一行依次匹配到最后一行才能找到數值9,總共需要匹配7次;但是如果我們是用右邊這種樹結構索引的話,我們先拿9和最上層頂點6去匹配,發現9比6大,我們就去頂點的右邊去找,再去和7匹配,發現9仍然比7大,再去7的右邊找,就找到了9,這樣我們只匹配了3次就把我們想要的9找到了。是不是相比匹配7次節省了很多時間。

數據庫中的索引一般用 B+ 樹,為了讓大家更好的理解,我上面畫的圖只是最簡單的一種樹結構,而非真實的 B+ 樹,但是原理是一樣的。

如果索引是主鍵的話,效率會更高,因為主鍵必須是唯一的,所以如果被驅動表是用主鍵去連接,只會出現多對一或者一對一的情況,而不會出現多對多和一對多的情況。

 

04.Block Nested-Loop Join

理想情況下,用索引匹配是最高效的一種方式,但是在現實工作中,並不是所有的列都是索引列,這個時候就需要用到 Block Nested-Loop Join 方法了,這種方法與第一種方法比較類似,唯一的區別就是會把驅動表中 left join 涉及到的所有列(不止是用來on的列,還有select部分的列)先取出來放到一個緩存區域,然后再去和非驅動表進行匹配,這種方法和第一種方法相比所需要的匹配次數是一樣的,差別就在於驅動表的列數不同,也就是數據量的多少不同。所以雖然匹配次數沒有減少,但是總體的查詢性能還是有提升的。

spark的三種jion

轉載自:https://zhuanlan.zhihu.com/p/91510137

引言Join是SQL語句中的常用操作,良好的表結構能夠將數據分散在不同的表中,使其符合某種范式,減少表冗余、更新容錯等。而建立表和表之間關系的最佳方式就是Join操作。

 

對於Spark來說有3中Join的實現,每種Join對應着不同的應用場景:

 

Broadcast Hash Join :適合一張較小的表和一張大表進行join

 

Shuffle Hash Join : 適合一張小表和一張大表進行join,或者是兩張小表之間的join

 

Sort Merge Join :適合兩張較大的表之間進行join

 

前兩者都基於的是Hash Join,只不過在hash join之前需要先shuffle還是先broadcast。下面將詳細的解釋一下這三種不同的join的具體原理。

 

 

Hash Join先來看看這樣一條SQL語句:

 

select * from order,item where item.id = order.i_id

 

  1. 確定Build Table以及Probe Table:這個概念比較重要,Build Table使用join key構建Hash Table,而Probe Table使用join key進行探測,探測成功就可以join在一起。通常情況下,小表會作為Build Table,大表作為Probe Table。此事例中item為Build Table,order為Probe Table;很簡單一個Join節點,參與join的兩張表是item和order,join key分別是item.id以及order.i_id。現在假設這個Join采用的是hash join算法,整個過程會經歷三步:

 

  1. 構建Hash Table:依次讀取Build Table(item)的數據,對於每一行數據根據join key(item.id)進行hash,hash到對應的Bucket,生成hash table中的一條記錄。數據緩存在內存中,如果內存放不下需要dump到外存;

 

  1. 探測:再依次掃描Probe Table(order)的數據,使用相同的hash函數映射Hash Table中的記錄,映射成功之后再檢查join條件(item.id = order.i_id),如果匹配成功就可以將兩者join在一起。

 

 

 

 

 

基本流程可以參考上圖,這里有兩個小問題需要關注:

 

  1. hash join性能如何?很顯然,hash join基本都只掃描兩表一次,可以認為o(a+b),較之最極端的笛卡爾集運算a*b,不知甩了多少條街;

 

  1. 為什么Build Table選擇小表?道理很簡單,因為構建的Hash Table最好能全部加載在內存,效率最高;這也決定了hash join算法只適合至少一個小表的join場景,對於兩個大表的join場景並不適用。

 

 

上文說過,hash join是傳統數據庫中的單機join算法,在分布式環境下需要經過一定的分布式改造,說到底就是盡可能利用分布式計算資源進行並行化計算,提高總體效率。hash join分布式改造一般有兩種經典方案:

 

  1. broadcast hash join:將其中一張小表廣播分發到另一張大表所在的分區節點上,分別並發地與其上的分區記錄進行hash join。broadcast適用於小表很小,可以直接廣播的場景;

 

  1. shuffler hash join:一旦小表數據量較大,此時就不再適合進行廣播分發。這種情況下,可以根據join key相同必然分區相同的原理,將兩張表分別按照join key進行重新組織分區,這樣就可以將join分而治之,划分為很多小join,充分利用集群資源並行化。

 

 

Broadcast Hash Join大家知道,在數據庫的常見模型中(比如星型模型或者雪花模型),表一般分為兩種:事實表和維度表。維度表一般指固定的、變動較少的表,例如聯系人、物品種類等,一般數據有限。而事實表一般記錄流水,比如銷售清單等,通常隨着時間的增長不斷膨脹。

 

因為Join操作是對兩個表中key值相同的記錄進行連接,在SparkSQL中,對兩個表做Join最直接的方式是先根據key分區,再在每個分區中把key值相同的記錄拿出來做連接操作。但這樣就不可避免地涉及到shuffle,而shuffle在Spark中是比較耗時的操作,我們應該盡可能的設計Spark應用使其避免大量的shuffle。

 

當維度表和事實表進行Join操作時,為了避免shuffle,我們可以將大小有限的維度表的全部數據分發到每個節點上,供事實表使用。executor存儲維度表的全部數據,一定程度上犧牲了空間,換取shuffle操作大量的耗時,這在SparkSQL中稱作Broadcast Join,如下圖所示:

 

 

 

 

Table B是較小的表,黑色表示將其廣播到每個executor節點上,Table A的每個partition會通過block manager取到Table A的數據。根據每條記錄的Join Key取到Table B中相對應的記錄,根據Join Type進行操作。這個過程比較簡單,不做贅述。

 

Broadcast Join的條件有以下幾個:

 

  1. 被廣播的表需要小於spark.sql.autoBroadcastJoinThreshold所配置的值,默認是10M (或者加了broadcast join的hint)

 

  1. 基表不能被廣播,比如left outer join時,只能廣播右表

 

 

看起來廣播是一個比較理想的方案,但它有沒有缺點呢?也很明顯。這個方案只能用於廣播較小的表,否則數據的冗余傳輸就遠大於shuffle的開銷;另外,廣播時需要將被廣播的表現collect到driver端,當頻繁有廣播出現時,對driver的內存也是一個考驗。

 

 

如下圖所示,broadcast hash join可以分為兩步:

 

  1. broadcast階段:將小表廣播分發到大表所在的所有主機。廣播算法可以有很多,最簡單的是先發給driver,driver再統一分發給所有executor;要不就是基於bittorrete的p2p思路;

 

  1. hash join階段:在每個executor上執行單機版hash join,小表映射,大表試探;

 

 

 

 

 

SparkSQL規定broadcast hash join執行的基本條件為被廣播小表必須小於參數spark.sql.autoBroadcastJoinThreshold,默認為10M。

 

 

Shuffle Hash Join當一側的表比較小時,我們選擇將其廣播出去以避免shuffle,提高性能。但因為被廣播的表首先被collect到driver段,然后被冗余分發到每個executor上,所以當表比較大時,采用broadcast join會對driver端和executor端造成較大的壓力。

 

但由於Spark是一個分布式的計算引擎,可以通過分區的形式將大批量的數據划分成n份較小的數據集進行並行計算。這種思想應用到Join上便是Shuffle Hash Join了。利用key相同必然分區相同的這個原理,兩個表中,key相同的行都會被shuffle到同一個分區中,SparkSQL將較大表的join分而治之,先將表划分成n個分區,再對兩個表中相對應分區的數據分別進行Hash Join,這樣即在一定程度上減少了driver廣播一側表的壓力,也減少了executor端取整張被廣播表的內存消耗。其原理如下圖:

 

 

Shuffle Hash Join分為兩步:

 

  1. 對兩張表分別按照join keys進行重分區,即shuffle,目的是為了讓有相同join keys值的記錄分到對應的分區中

 

  1. 對對應分區中的數據進行join,此處先將小表分區構造為一張hash表,然后根據大表分區中記錄的join keys值拿出來進行匹配

 

 

Shuffle Hash Join的條件有以下幾個:

 

  1. 分區的平均大小不超過spark.sql.autoBroadcastJoinThreshold所配置的值,默認是10M

 

  1. 基表不能被廣播,比如left outer join時,只能廣播右表

 

  1. 一側的表要明顯小於另外一側,小的一側將被廣播(明顯小於的定義為3倍小,此處為經驗值)

 

 

我們可以看到,在一定大小的表中,SparkSQL從時空結合的角度來看,將兩個表進行重新分區,並且對小表中的分區進行hash化,從而完成join。在保持一定復雜度的基礎上,盡量減少driver和executor的內存壓力,提升了計算時的穩定性。

 

 

在大數據條件下如果一張表很小,執行join操作最優的選擇無疑是broadcast hash join,效率最高。但是一旦小表數據量增大,廣播所需內存、帶寬等資源必然就會太大,broadcast hash join就不再是最優方案。此時可以按照join key進行分區,根據key相同必然分區相同的原理,就可以將大表join分而治之,划分為很多小表的join,充分利用集群資源並行化。如下圖所示,shuffle hash join也可以分為兩步:

 

  1. shuffle階段:分別將兩個表按照join key進行分區,將相同join key的記錄重分布到同一節點,兩張表的數據會被重分布到集群中所有節點。這個過程稱為shuffle

 

  1. hash join階段:每個分區節點上的數據單獨執行單機hash join算法。

 

 

 

 

 

看到這里,可以初步總結出來如果兩張小表join可以直接使用單機版hash join;如果一張大表join一張極小表,可以選擇broadcast hash join算法;而如果是一張大表join一張小表,則可以選擇shuffle hash join算法;那如果是兩張大表進行join呢?

 

 

Sort Merge Join上面介紹的兩種實現對於一定大小的表比較適用,但當兩個表都非常大時,顯然無論適用哪種都會對計算內存造成很大壓力。這是因為join時兩者采取的都是hash join,是將一側的數據完全加載到內存中,使用hash code取join keys值相等的記錄進行連接。

 

當兩個表都非常大時,SparkSQL采用了一種全新的方案來對表進行Join,即Sort Merge Join。這種實現方式不用將一側數據全部加載后再進星hash join,但需要在join前將數據排序,如下圖所示:

 

 

 

 

可以看到,首先將兩張表按照join keys進行了重新shuffle,保證join keys值相同的記錄會被分在相應的分區。分區后對每個分區內的數據進行排序,排序后再對相應的分區內的記錄進行連接,如下圖示:

 

 

 

 

看着很眼熟吧?也很簡單,因為兩個序列都是有序的,從頭遍歷,碰到key相同的就輸出;如果不同,左邊小就繼續取左邊,反之取右邊。

 

可以看出,無論分區有多大,Sort Merge Join都不用把某一側的數據全部加載到內存中,而是即用即取即丟,從而大大提升了大數據量下sql join的穩定性。

 

 

SparkSQL對兩張大表join采用了全新的算法-sort-merge join,如下圖所示,整個過程分為三個步驟:

 

 

 

 

 

  1. shuffle階段:將兩張大表根據join key進行重新分區,兩張表數據會分布到整個集群,以便分布式並行處理;

 

  1. sort階段:對單個分區節點的兩表數據,分別進行排序;

 

  1. merge階段:對排好序的兩張分區表數據執行join操作。join操作很簡單,分別遍歷兩個有序序列,碰到相同join key就merge輸出,否則取更小一邊,見下圖示意:

 

 

 

 

 

經過上文的分析,可以明確每種Join算法都有自己的適用場景,數據倉庫設計時最好避免大表與大表的join查詢,SparkSQL也可以根據內存資源、帶寬資源適量將參數spark.sql.autoBroadcastJoinThreshold調大,讓更多join實際執行為broadcast hash join。

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM