hive------ Group by、join、distinct等實現原理


1. Hive 的 distribute by

       Order by 能夠預期產生完全排序的結果,但是它是通過只用一個reduce來做到這點的。所以對於大規模的數據集它的效率非常低。在很多情況下,並不需要全局排序,此時可以換成Hive的非標准擴展sort by。Sort by為每個reducer產生一個排序文件。在有些情況下,你需要控制某個特定行應該到哪個reducer,通常是為了進行后續的聚集操作。Hive的distribute by 子句可以做這件事。

// 根據年份和氣溫對氣象數據進行排序,以確保所有具有相同年份的行最終都在一個reducer分區中  

from record2  

select year, temperature  

distribute by year  

sort by year asc, temperature desc;  

2. Distinct 的實現

准備數據

語句

SELECT COUNT, COUNT(DISTINCT uid) FROM logs GROUP BY COUNT;
hive> SELECT * FROM logs; OK a 蘋果 3 a 橙子 3 a 燒雞 1 b 燒雞 3 hive> SELECT COUNT, COUNT(DISTINCT uid) FROM logs GROUP BY COUNT;

根據count分組,計算獨立用戶數。

計算過程

hive-distinct-cal

1. 第一步先在mapper計算部分值,會以count和uid作為key,如果是distinct並且之前已經出現過,則忽略這條計算。第一步是以組合為key,第二步是以count為key.
2. ReduceSink是在mapper.close()時才執行的,在GroupByOperator.close()時,把結果輸出。注意這里雖然key是count和uid,但是在reduce時分區是按count來的!
3. 第一步的distinct計算的值沒用,要留到reduce計算的才准確。這里只是減少了key組合相同的行。不過如果是普通的count,后面是會合並起來的。
4. distinct通過比較lastInvoke判斷要不要+1(因為在reduce是排序過了的,所以判斷distict的字段變了沒有,如果沒變,則不+1)

Operator

hive-distinct-op

Explain

hive> explain select count, count(distinct uid) from logs group by count; OK ABSTRACT SYNTAX TREE: (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME logs))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL count)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL uid)))) (TOK_GROUPBY (TOK_TABLE_OR_COL count)))) STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 is a root stage STAGE PLANS: Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: logs TableScan //表掃描 alias: logs Select Operator//列裁剪,取出uid,count字段就夠了 expressions: expr: count type: int expr: uid type: string outputColumnNames: count, uid Group By Operator //先來map聚集 aggregations: expr: count(DISTINCT uid) //聚集表達式 bucketGroup: false keys: expr: count type: int expr: uid type: string mode: hash //hash方式 outputColumnNames: _col0, _col1, _col2 Reduce Output Operator key expressions: //輸出的鍵 expr: _col0 //count type: int expr: _col1 //uid type: string sort order: ++ Map-reduce partition columns: //這里是按group by的字段分區的 expr: _col0 //這里表示count type: int tag: -1 value expressions: expr: _col2 type: bigint Reduce Operator Tree: Group By Operator //第二次聚集 aggregations: expr: count(DISTINCT KEY._col1:0._col0) //uid:count bucketGroup: false keys: expr: KEY._col0 //count type: int mode: mergepartial //合並 outputColumnNames: _col0, _col1 Select Operator //列裁剪 expressions: expr: _col0 type: int expr: _col1 type: bigint outputColumnNames: _col0, _col1 File Output Operator //輸出結果到文件 compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Stage: Stage-0 Fetch Operator limit: -1

3.
Group By 的實現
數據准備
SELECT uid, SUM(COUNT) FROM logs GROUP BY uid;
hive> SELECT * FROM logs; a 蘋果 5 a 橙子 3 a 蘋果 2 b 燒雞 1 hive> SELECT uid, SUM(COUNT) FROM logs GROUP BY uid; a 10 b 1

計算過程

hive-groupby-cal
默認設置了hive.map.aggr=true,所以會在mapper端先group by一次,最后再把結果merge起來,為了減少reducer處理的數據量。注意看explain的mode是不一樣的。mapper是hash,reducer是mergepartial。如果把hive.map.aggr=false,那將groupby放到reducer才做,他的mode是complete.

Operator

hive-groupby-op

Explain

hive> explain SELECT uid, sum(count) FROM logs group by uid; OK ABSTRACT SYNTAX TREE: (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME logs))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL uid)) (TOK_SELEXPR (TOK_FUNCTION sum (TOK_TABLE_OR_COL count)))) (TOK_GROUPBY (TOK_TABLE_OR_COL uid)))) STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 is a root stage STAGE PLANS: Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: logs TableScan // 掃描表 alias: logs Select Operator //選擇字段 expressions: expr: uid type: string expr: count type: int outputColumnNames: uid, count Group By Operator //這里是因為默認設置了hive.map.aggr=true,會在mapper先做一次聚合,減少reduce需要處理的數據 aggregations: expr: sum(count) //聚集函數 bucketGroup: false keys: //鍵 expr: uid type: string mode: hash //hash方式,processHashAggr() outputColumnNames: _col0, _col1 Reduce Output Operator //輸出key,value給reducer key expressions: expr: _col0 type: string sort order: + Map-reduce partition columns: expr: _col0 type: string tag: -1 value expressions: expr: _col1 type: bigint Reduce Operator Tree: Group By Operator aggregations: expr: sum(VALUE._col0) //聚合 bucketGroup: false keys: expr: KEY._col0 type: string mode: mergepartial //合並值 outputColumnNames: _col0, _col1 Select Operator //選擇字段 expressions: expr: _col0 type: string expr: _col1 type: bigint outputColumnNames: _col0, _col1 File Output Operator //輸出到文件 compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Stage: Stage-0 Fetch Operator limit: -1
4. join原理 

准備數據

語句
SELECT a.uid,a.name,b.age FROM logs a JOIN users b ON (a.uid=b.uid);
我們希望的結果是把users表join進來獲取age字段。

hive> SELECT * FROM logs; OK a 蘋果 5 a 橙子 3 b 燒雞 1 hive> SELECT * FROM users; OK a 23 b 21 hive> SELECT a.uid,a.name,b.age FROM logs a JOIN users b ON (a.uid=b.uid); a 蘋果 23 a 橙子 23 b 燒雞 21

計算過程

hive-join-cal

  1. key這里后面的數字是tag,后面在reduce階段用來區分來自於那個表的數據。tag是附屬在key后面的。那為什么會把a(0)和a(1)匯集在一起了呢,是因為對先對a求了hashcode,設在了HiveKey上,所以同一個key還是在一起的。
  2. Map階段只是拆分key和value。
  3. reduce階段主要看它是如何把它合並起來了,從圖上可以直觀的看到,其實就是把tag=1的內容,都加到tag=0的后面,就是這么簡單。
  4. 代碼實現上,就是先臨時用個變量把值存儲起來在storage里面, storage(0) = [{a, 蘋果}, {a, 橙子}] storage(1) = [{23}],當key變化(如a變為b)或全部結束時,會調用endGroup()方法,把內容合並起來。變成[{a,蘋果,23}, {a, 橙子,23}]

Operator

hive-join-op

Explain

hive> explain SELECT a.uid,a.name,b.age FROM logs a JOIN users b ON (a.uid=b.uid); OK //語法樹 ABSTRACT SYNTAX TREE: (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME logs) a) (TOK_TABREF (TOK_TABNAME users) b) (= (. (TOK_TABLE_OR_COL a) uid) (. (TOK_TABLE_OR_COL b) uid)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) uid)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) name)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL b) age))))) //階段 STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 is a root stage STAGE PLANS: Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: //mapper階段 a TableScan //掃描表, 就只是一行一行的傳遞下去而已 alias: a Reduce Output Operator //輸出給reduce的內容 key expressions: // key啦,這里的key是uid,就是我們寫在ON子句那個,你可以試試加多幾個條件 expr: uid type: string sort order: + //排序 Map-reduce partition columns://分區字段,貌似是和key一樣的 expr: uid type: string tag: 0 //用來區分這個key是來自哪個表的 value expressions: //reduce用到的value字段 expr: uid type: string expr: name type: string b TableScan //掃描表, 就只是一行一行的傳遞下去而已 alias: b Reduce Output Operator //輸出給reduce的內容 key expressions: //key expr: uid type: string sort order: + Map-reduce partition columns: //分區字段 expr: uid type: string tag: 1 //用來區分這個key是來自哪個表的 value expressions: //值 expr: age type: int Reduce Operator Tree: // reduce階段 Join Operator // JOIN的Operator condition map: Inner Join 0 to 1 // 內連接0和1表 condition expressions: // 第0個表有兩個字段,分別是uid和name, 第1個表有一個字段age {VALUE._col0} {VALUE._col1} {VALUE._col1} handleSkewJoin: false //是否處理傾斜join,如果是,會分為兩個MR任務 outputColumnNames: _col0, _col1, _col6 //輸出字段 Select Operator //列裁剪(我們sql寫的select字段) expressions: expr: _col0 type: string expr: _col1 type: string expr: _col6 type: int outputColumnNames: _col0, _col1, _col2 File Output Operator //把結果輸出到文件 compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Stage: Stage-0 Fetch Operator limit: -1

可以看到里面都是一個個Operator順序的執行下來

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM