Hive是Hadoop的子項目,它提供了對數據的結構化管理和類SQL語言的查詢功能。SQL的交互方式極大程度地降低了Hadoop生態環境中數據處理的門檻,用戶不需要編寫程序,通過SQL語句就可以對數據進行分析和處理。目前很多計算需求都可以由Hive來完成,極大程度地降低了開發成本。
目前,Hive底層使用MapReduce作為實際計算框架,SQL的交互方式隱藏了大部分MapReduce的細節。這種細節的隱藏在帶來便利性的同時,也對計算作業的調優帶來了一定的難度。未經優化的SQL語句轉化后的MapReduce作業,它的運行效率可能大大低於用戶的預期。本文我們就來分析一個簡單語句的優化過程。
日常統計場景中,我們經常會對一段時期內的字段進行消重並統計數量,SQL語句類似於
SELECT COUNT( DISTINCT id ) FROM TABLE_NAME WHERE ...;
這條語句是從一個表的符合WHERE條件的記錄中統計不重復的id的總數。
該語句轉化為MapReduce作業后執行示意圖如下,圖中還列出了我們實驗作業中Reduce階段的數據規模:
由於引入了DISTINCT,因此在Map階段無法利用combine對輸出結果消重,必須將id作為Key輸出,在Reduce階段再對來自於不同Map Task、相同Key的結果進行消重,計入最終統計值。
我們看到作業運行時的Reduce Task個數為1,對於統計大數據量時,這會導致最終Map的全部輸出由單個的ReduceTask處理。這唯一的Reduce Task需要Shuffle大量的數據,並且進行排序聚合等處理,這使得它成為整個作業的IO和運算瓶頸。
經過上述分析后,我們嘗試顯式地增大Reduce Task個數來提高Reduce階段的並發,使每一個Reduce Task的數據處理量控制在2G左右。具體設置如下:
set mapred.reduce.tasks=100 |
調整后我們發現這一參數並沒有影響實際Reduce Task個數,Hive運行時輸出“Number of reduce tasks determined at compile time: 1”。原來Hive在處理COUNT這種“全聚合(full aggregates)”計算時,它會忽略用戶指定的Reduce Task數,而強制使用1。我們只能采用變通的方法來繞過這一限制。我們利用Hive對嵌套語句的支持,將原來一個MapReduce作業轉換為兩個作業,在第一階段選出全部的非重復id,在第二階段再對這些已消重的id進行計數。這樣在第一階段我們可以通過增大Reduce的並發數,並發處理Map輸出。在第二階段,由於id已經消重,因此COUNT(*)操作在Map階段不需要輸出原id數據,只輸出一個合並后的計數即可。這樣即使第二階段Hive強制指定一個Reduce Task,極少量的Map輸出數據也不會使單一的Reduce Task成為瓶頸。改進后的SQL語句如下:
SELECT COUNT(*) FROM (SELECT DISTINCT id FROM TABLE_NAME WHERE … ) t;
實際運行時,我們發現Hive還對這兩階段的作業做了額外的優化。它將第二個MapReduce作業Map中的Count過程移到了第一個作業的Reduce階段。這樣在第一階Reduce就可以輸出計數值,而不是消重的全部id。這一優化大幅地減少了第一個作業的Reduce輸出IO以及第二個作業Map的輸入數據量。最終在同樣的運行環境下優化后的語句執行只需要原語句20%左右的時間。優化后的MapReduce作業流如下:
從上述優化過程我們可以看出,一個簡單的統計需求,如果不理解Hive和MapReduce的工作原理,它可能會比優化后的執行過程多四、五倍的時間。我們在利用Hive簡化開發的同時,也要盡可能優化SQL語句,提升計算作業的執行效率。
注:文中測試環境Hive版本為0.9
refer to :http://bigdata-blog.net/2013/11/08/hive-sql%E4%BC%98%E5%8C%96%E4%B9%8B-count-distinct/