Join大致包括三個要素:Join方式、Join條件以及過濾條件。其中過濾條件也可以通過AND語句放在Join條件中。
二、Hive/MR中的Join可分為Common Join(Reduce階段完成join)和Map Join(Map階段完成join)。介紹兩種join的原理和機制。
(1)Common Join:如果不指定MapJoin或者不符合MapJoin的條件,那么Hive解析器會將Join操作轉換成Common Join,即:在Reduce階段完成join.整個過程包含Map、Shuffle、Reduce階段。
1、Map階段:讀取源表的數據,Map輸出時候以Join on條件中的列為key,如果Join有多個關聯鍵,則以這些關聯鍵的組合作為key;Map輸出的value為join之后所關心的列即(select或者where中需要用到的);同時在value中還會包含表的Tag信息,用於標明此value對應哪個表;
2、shuffle階段:根據key的值進行hash,並將key/value按照hash值推送至不同的reduce中,這樣確保兩個表中相同的key位於同一個reduce中。
3、reduce階段:通過Tag來判斷每一個value是來自table1還是table2,在內部分成2組,做集合笛卡爾乘積。
缺點:shuffle的網絡傳輸和排序性能很低,reduce 端對2個集合做乘積計算,很耗內存,容易導致OOM。
4、實例:SELECT a.id,a.dept,b.age FROM a join b ON (a.id = b.id);如下圖所示;
(2)Map join:MapJoin通常用於一個很小的表和一個大表進行join的場景,具體小表有多小,由參數hive.mapjoin.smalltable.filesize來決定,該參數表示小表的總大小,默認值為25000000字節,即25M。
Hive0.7之前,需要使用hint提示 /*+ mapjoin(table) */才會執行MapJoin,否則執行Common Join,但在0.7版本之后,默認自動會轉Map Join,由參數hive.auto.convert.join來控制,默認為true.以0.7之后的HQL來解析,假設a表為一張大表,b為小表,並且hive.auto.convert.join=true,那么Hive在執行時候會自動轉化為MapJoin。
在map 端進行join,其原理是broadcast join,即把小表作為一個完整的驅動表來進行join操作。通常情況下,要連接的各個表里面的數據會分布在不同的Map中進行處理。即同一個Key對應的Value可能存在不同的Map中。這樣就必須等到 Reduce中去連接。要使MapJoin能夠順利進行,那就必須滿足這樣的條件:除了一份表的數據分布在不同的Map中外,其他連接的表的數據必須在每 個Map中有完整的拷貝。MAPJION會把小表全部讀入內存中,在map階段直接拿另外一個表的數據和內存中表數據做匹配,由於在map是進行了join操作,省去了reduce運行的效率也會高很多。
1、執行流程:
1)如圖中的流程,首先是Task A,它是一個Local Task(在客戶端本地執行的Task),負責掃描小表b的數據,將其轉換成一個HashTable的數據結構,並寫入本地的文件中,之后將該文件加載到DistributeCache中(使用靜態方法DistributedCache.addCacheFile()指定要復制的文件,它的參數是文件的URI)。
2)接下來是Task B,該任務是一個沒有Reduce的MR,啟動MapTasks掃描大表a,在Map階段,根據a的每一條記錄去和DistributeCache中b表對應的HashTable關聯,並直接輸出結果。
3)相關參數
a、小表自動選擇Mapjoin:set hive.auto.convert.join=true;默認值:false。該參數為true時,Hive自動對左邊的表統計量,若是小表就加入內存,即對小表使用Map join
b、小表閥值:set hive.mapjoin.smalltable.filesize=25M;
二、Spark的join
兩種方式使用SparkSQL,一種是直接寫sql語句,這個需要有元數據庫支持,例如Hive等,另一種是通過Dataset/DataFrame編寫Spark應用程序。Join的物理執行解析如下。
(1)join的基本實現過程
Spark將參與Join的兩張表抽象為流式遍歷表(streamIter)和查找表(buildIter),通常streamIter為大表,buildIter為小表。Spark根據join語句自動區分大小表。
(2)spark提供了三種join實現:hash join,sort merge join以及broadcast join。
1、hash join:通過分區的形式將大批量的數據通過hash划分成n份較小的數據集進行並行計算。
1)對兩張表分別按照join keys進行重分區,即shuffle,目的是為了讓有相同join keys值的記錄分到對應的分區中
2)對對應分區中的數據進行join,此處先將小表分區構造為一張hash表,然后根據大表分區中記錄的join keys值拿出來進行匹配
總結:要將來自buildIter的記錄放到hash表中,那么每個分區來自buildIter的記錄不能太大,否則就存不下,默認情況下hash join的實現是關閉狀態。buildIter總體估計大小以及分區后的大小要超過spark.sql.autoBroadcastJoinThreshold設定的值,即不滿足broadcast join條件
2、sort join:hash join對於實現大小表比較合適,但是兩個表都非常大時,對內存計算造成很大的壓力。
1)實現方式:不需要將一側數據全部加載后再進行hash join,但需要在join前將數據排序
在shuffle read階段,分別對streamIter和buildIter進行merge sort,在遍歷streamIter時,對於每條記錄,都采用順序查找的方式從buildIter查找對應的記錄,由於兩個表都是排序的,每次處理完streamIter的一條記錄后,對於streamIter的下一條記錄,只需從buildIter中上一次查找結束的位置開始查找,所以說每次在buildIter中查找不必重頭開始。
3、broadCast join:Broadcast不會內存溢出,因為數據保存級別StoreageLevel是MEMORY_AND_DISK模式
1)設計思想:避免大量的shuffle。若buildIter是一個非常小的表,其實沒必要做shuffle了,直接將buildIter廣播到每個計算節點,然后將buildIter放到hash表中。
2)步驟:
a、broadcast階段:將小表廣播分發到大表所在的所有主機。分發方式可以有driver分發。
b、在每個executor上執行單機版hash join,小表映射,大表試探。