上一篇說到的是Hive是如何對sql進行解析,生成ASTNode
那么Hive拿到ASTNode之后,就會觸發:BaseSemanticAnalyzer.analyze這個方法;
這個方法非常的重要,從AST到task的生成這一系列的操作,都會在這個調用棧下進行的;
如下圖:
按照:基於calcite做傻瓜式的sql優化給出的sql示例,我們提前看下,經過hive各階段優化后,會改變什么樣子
sql:
select * from ( select Sname, Sex, Sage, Sdept, count(1) as num from student_ext group by Sname, Sex, Sage, Sdept ) t1 left join student_ext t2 on t1.Sname = t2.Sname where t1.Sage > 10 and t2.Sdept = 'MA';

##########################Gen Calcite Plan############################################## HiveProject(sname=[$0], sex=[$1], sage=[$2], sdept=[$3], num=[$4], sno=[$5], sname1=[$6], sex1=[$7], sage1=[$8], sdept1=[$9]) HiveFilter(condition=[AND(>($2, 10), =($9, 'MA'))]) HiveJoin(condition=[=($0, $6)], joinType=[left], algorithm=[none], cost=[not available]) HiveProject(sname=[$0], sex=[$1], sage=[$2], sdept=[$3], num=[$4]) HiveAggregate(group=[{0, 1, 2, 3}], agg#0=[count($4)]) HiveProject($f0=[$1], $f1=[$2], $f2=[$3], $f3=[$4], $f4=[1]) HiveTableScan(table=[[default.student_ext]]) HiveTableScan(table=[[default.student_ext]]) ##########################applyPreJoinOrderingTransforms-0############################################## HiveProject(sname=[$0], sex=[$1], sage=[$2], sdept=[$3], num=[$4], sno=[$5], sname1=[$6], sex1=[$7], sage1=[$8], sdept1=[$9]) HiveFilter(condition=[AND(>($2, 10), =($9, 'MA'))]) HiveJoin(condition=[=($0, $6)], joinType=[left], algorithm=[none], cost=[not available]) HiveProject(sname=[$0], sex=[$1], sage=[$2], sdept=[$3], num=[$4]) HiveAggregate(group=[{0, 1, 2, 3}], agg#0=[count($4)]) HiveProject($f0=[$1], $f1=[$2], $f2=[$3], $f3=[$4], $f4=[1]) HiveTableScan(table=[[default.student_ext]]) HiveTableScan(table=[[default.student_ext]]) ##########################Push Down Semi Joins############################################## HiveProject(sname=[$0], sex=[$1], sage=[$2], sdept=[$3], num=[$4], sno=[$5], sname1=[$6], sex1=[$7], sage1=[$8], sdept1=[$9]) HiveFilter(condition=[AND(>($2, 10), =($9, 'MA'))]) HiveJoin(condition=[=($0, $6)], joinType=[left], algorithm=[none], cost=[not available]) HiveProject(sname=[$0], sex=[$1], sage=[$2], sdept=[$3], num=[$4]) HiveAggregate(group=[{0, 1, 2, 3}], agg#0=[count($4)]) HiveProject($f0=[$1], $f1=[$2], $f2=[$3], $f3=[$4], $f4=[1]) HiveTableScan(table=[[default.student_ext]]) HiveTableScan(table=[[default.student_ext]]) ##########################JOIN Add not null filters############################################## HiveProject(sname=[$0], sex=[$1], sage=[$2], sdept=[$3], num=[$4], sno=[$5], sname1=[$6], sex1=[$7], sage1=[$8], sdept1=[$9]) HiveFilter(condition=[AND(>($2, 10), =($9, 'MA'))]) HiveJoin(condition=[=($0, $6)], joinType=[left], algorithm=[none], cost=[not available]) HiveProject(sname=[$0], sex=[$1], sage=[$2], sdept=[$3], num=[$4]) HiveAggregate(group=[{0, 1, 2, 3}], agg#0=[count($4)]) HiveProject($f0=[$1], $f1=[$2], $f2=[$3], $f3=[$4], $f4=[1]) HiveTableScan(table=[[default.student_ext]]) HiveTableScan(table=[[default.student_ext]]) ##########################Constant propagation, common filter extraction, and PPD############################################## HiveProject(sname=[$0], sex=[$1], sage=[$2], sdept=[$3], num=[$4], sno=[$5], sname1=[$6], sex1=[$7], sage1=[$8], sdept1=[$9]) HiveFilter(condition=[AND(>($2, 10), =($9, 'MA'))]) HiveJoin(condition=[=($0, $6)], joinType=[left], algorithm=[none], cost=[not available]) HiveProject(sname=[$0], sex=[$1], sage=[$2], sdept=[$3], num=[$4]) HiveAggregate(group=[{0, 1, 2, 3}], agg#0=[count($4)]) HiveProject($f0=[$1], $f1=[$2], $f2=[$3], $f3=[$4], $f4=[1]) HiveTableScan(table=[[default.student_ext]]) HiveTableScan(table=[[default.student_ext]]) ##########################basePlan############################################## HiveProject(sname=[$0], sex=[$1], sage=[$2], sdept=[$3], num=[$4], sno=[$5], sname1=[$6], sex1=[$7], sage1=[$8], sdept1=[$9]) HiveFilter(condition=[AND(>($2, 10), =($9, 'MA'))]) HiveJoin(condition=[=($0, $6)], joinType=[left], algorithm=[none], cost=[not available]) HiveProject(sname=[$0], sex=[$1], sage=[$2], sdept=[$3], num=[$4]) HiveAggregate(group=[{0, 1, 2, 3}], agg#0=[count($4)]) HiveProject($f0=[$1], $f1=[$2], $f2=[$3], $f3=[$4], $f4=[1]) HiveTableScan(table=[[default.student_ext]]) HiveTableScan(table=[[default.student_ext]]) ##########################Projection Pruning############################################## HiveFilter(condition=[AND(>($2, 10), =($9, 'MA'))]) HiveJoin(condition=[=($0, $6)], joinType=[left], algorithm=[none], cost=[not available]) HiveAggregate(group=[{0, 1, 2, 3}], agg#0=[count($4)]) HiveProject($f0=[$0], $f1=[$1], $f2=[$2], $f3=[$3], $f4=[1]) HiveProject(sname=[$1], sex=[$2], sage=[$3], sdept=[$4]) HiveTableScan(table=[[default.student_ext]]) HiveProject(sno=[$0], sname=[$1], sex=[$2], sage=[$3], sdept=[$4]) HiveTableScan(table=[[default.student_ext]]) ##########################Apply Pre Join Order optimizations############################################## HiveFilter(condition=[AND(>($2, 10), =($9, 'MA'))]) HiveJoin(condition=[=($0, $6)], joinType=[left], algorithm=[none], cost=[not available]) HiveAggregate(group=[{0, 1, 2, 3}], agg#0=[count($4)]) HiveProject($f0=[$0], $f1=[$1], $f2=[$2], $f3=[$3], $f4=[1]) HiveProject(sname=[$1], sex=[$2], sage=[$3], sdept=[$4]) HiveTableScan(table=[[default.student_ext]]) HiveProject(sno=[$0], sname=[$1], sex=[$2], sage=[$3], sdept=[$4]) HiveTableScan(table=[[default.student_ext]]) ##########################優化后的執行計划############################################## HiveFilter(condition=[AND(>($2, 10), =($9, 'MA'))]) HiveJoin(condition=[=($0, $6)], joinType=[left], algorithm=[none], cost=[not available]) HiveAggregate(group=[{0, 1, 2, 3}], agg#0=[count($4)]) HiveProject($f0=[$0], $f1=[$1], $f2=[$2], $f3=[$3], $f4=[1]) HiveProject(sname=[$1], sex=[$2], sage=[$3], sdept=[$4]) HiveTableScan(table=[[default.student_ext]]) HiveProject(sno=[$0], sname=[$1], sex=[$2], sage=[$3], sdept=[$4]) HiveTableScan(table=[[default.student_ext]])
上面就是在各階段優化,產生的執行計划
-
1. 生成Calcite的執行計划:RelNode
持續跟進,最終會調用到一個抽象方法:
public abstract void analyzeInternal(ASTNode ast) throws SemanticException;
上文提到過,在Hive中,使用Calcite來進行核心優化,它將AST Node轉換成QB,又將QB轉換成Calcite的RelNode,在Calcite優化完成后,又會將RelNode轉換成Operator Tree,說起來很簡單,但這又是一條很長的調用鏈。
簡答來說:Hive是基於antlr做的詞法和語法解析后生成的語法樹,然后基於Calcite對語法樹做深度優先遍歷,在遍歷過程中通過匹配規則來剪掉部分Operator或者合並Operattor等,這樣就大大減小了shuffle數據量(其實就是RBO和CBO);
因此程序走到這個抽象方法后,就會跳到hive的優化實現類:CalcitePlanner類上
這樣程序就進入:CalcitePlanner.analyzeInternal; 然后判斷,是否需要進行CBO優化;
當然不管執行的是CBO還是RBO,其實最終走的都是:analyzeInternal(ASTNode ast, PlannerContext plannerCtx),如下圖:
接下來就會走到非常重要的代碼:
//TODO 2. Gen OP Tree from resolved Parse Tree rbo優化的地方 Operator sinkOp = genOPTree(ast, plannerCtx);
上面這段代碼,就會基於Calcite對ast進行各種規則的優化,然后返回Operator
所以跟進genOPTree方法:
-
2. 對Join操作進行規則優化
直接跳到他的實現方法上:CalcitePlanner.genOPTree(ASTNode ast, PlannerContext plannerCtx)
上面這段代碼意思非常明確,是否需要進行CBO優化,如果不需要的話,會直接執行最下面的代碼,返回未經優化的Operator
if (skipCalcitePlan) { sinkOp = super.genOPTree(ast, plannerCtx); }
如果需要進行CBO優化,代碼既執行else內部的邏輯
其中優化核心代碼是:
ASTNode newAST = getOptimizedAST();
這段代碼產生的hive優化流程:
1. 生成Calcite的執行計划:RelNode 2. 對Join操作進行規則優化 2.1、聚合去重 2.2、Semi Joins的下推 2.3、Add not null filters 2.4、Join的謂詞下推,投影提取、常量合並等工作 2.5、謂詞推送到下游並進行分區修剪 2.6、投影修剪 2.7、列剪枝 3. Appy Join Order Optimizations using Hep Planner (MST Algorithm)
那么接下來看下,hive是如何完成上述操作的:
Calcite優化的主要類是CalcitePlanner
,更加細節點,是在CalcitePlannerAction.apply()
這個方法,如下圖:
進入CalcitePlannerAction這個內部類,優化的重點就在CalcitePlannerAction.apply()
這個方法:
這個apply方法由三個重要的局部變量:
//calcite基於QB生成一個初始化的RelNode RelNode calciteGenPlan = null; //執行CBO優化后生成的RelNode RelNode calcitePreCboPlan = null; //經過一些列規則優化之后,返回的結果RelNode RelNode calciteOptimizedPlan = null;
1、calciteGenPlan
hive會根據事先生成好的QB,來轉化為初始化的RelNode,而calcite對sql優化,其實就是針對RelNode進行優化
其中代碼:
calciteGenPlan = genLogicalPlan(getQB(), true);
就是QB中獲取成員變量的值,然后將這些值重組成RelNode(類似)
這樣當代碼拿到基於QB重組成的RelNode之后(calciteGenPlan),然后就開始進行CBO的規則優化
-
2.1、聚合去重
程序進入applyPreJoinOrderingTransforms方法后:首先做一個計算引擎的判斷:
如果當前支持tez計算引擎,並且支持優化去重重寫操作,那么Hive會進行一次HiveExpandDistinctAggregatesRule的優化:
HiveExpandDistinctAggregatesRule
那么問題來了,HiveExpandDistinctAggregatesRule是什么優化?
這種優化簡單來說就是將:count dintict進行擴展為聚合的優化方式
聽起來很別扭是吧?
舉個栗子:
count(distinct colA)就是將colA中所有出現過的不同值取出來,相信只要接觸過數據庫的同學都能明白什么意思!
count(distinct colA)的操作也可以用group by的方式完成,具體代碼如下:
select count(distinct colA) from table1; select count(1) from (select colA from table1 group by colA)alias_1;
這兩者最后得出的結果是一致的! , 但是具體的實現方式,有什么不同呢?
上面兩種方式本質就是時間與空間的權衡。
distinct需要將colA中的所有內容都加載到內存中,大致可以理解為一個hash結構,key自然就是colA的所有值。因為是hash結構,那運算速度自然就快。最后計算hash中有多少key就是最終的結果。
那么問題來了,在現在的海量數據環境下,需要將所有不同的值都存起來,這個內存消耗,是可想而知的。所以如果數據量特別大,可能會out of memory。。。
group by的實現方式是先將colA排序。排序大家都不陌生,拿最見得快排來說,時間復雜度為O(nlogn),而空間復雜度只有O(1)。這樣一來,即使數據量再大一些,group by基本也能hold住。但是因為需要做一次O(nlogn) 的排序,時間自然會稍微慢點
雖然時間慢了,但是在海量數據,比如:10T大小的表情況下,相比count dintict肯定優先選擇group by
-
2.2、Semi Joins的下推
接下來就會走到常規的第一次優化,代碼如下:
// 1. Push Down Semi Joins basePlan = hepPlan(basePlan, true, mdProvider, SemiJoinJoinTransposeRule.INSTANCE, SemiJoinFilterTransposeRule.INSTANCE, SemiJoinProjectTransposeRule.INSTANCE);
這里面涉及到的優化規則是:
SemiJoinJoinTransposeRule
SemiJoinFilterTransposeRule
SemiJoinProjectTransposeRule
這三個規則都是關於SemiJoin的優化
簡單介紹下semi join的作用:
常規聯接中,結果可能會出現重復值,而子查詢可以獲得無重復的結果。
比如需要找出有人口大於 2000萬的城市的國家,如果用普通聯接,則可能出現重復結果:
select country.* from country join city on country.code=city.country_code \ and population>20000000; +---------+----------+ | code | name | +---------+----------+ | 1 | china | | 1 | china | +---------+----------+ 2 rows in set (0.00 sec)
出現這種情況,一般會使用子查詢來解決,比如:
select * from country where code in (select country_code from city where population>20000000); +------+---------+ | code | name | +------+---------+ | 1 | china | +------+---------+ 1 row in set (0.00 sec)
但是,仔細觀察sql會發現,這種子查詢的性能很糟糕,因為where后面的子查詢每掃描一條數據,Where子查詢都會被重新執行一遍,這樣效率就會很低如果父表數據很多帶來什么問題?那么就有了將子查詢的結果提升到FROM中,不需要再父表中每個符合條件的數據都要去把子查詢執行一輪了;所以為了完成同樣目標,我們可以選擇semi join來做優化。
比如:
select country.* from country semi join city on country.code = city.country_code where population > 20000000;
現在我們在拿這個例子semi join來做優化,經過:SemiJoinJoinTransposeRule,SemiJoinFilterTransposeRule,SemiJoinProjectTransposeRule這些規則處理后
最后生成的sql就是:
select country.* from counttry semi join (select country_code from city where population > 20000000) a on country.code = a.country_code
經過Push Down Semi Joins流程后,代碼接下來執行到如圖所示的地方:
-
2.3、Add not null filters
這里面涉及到一個規則:HiveJoinAddNotNullRule
此優化規則Rule主要功能是將SQL語句中Inner Join關聯時,出現在關聯條件中的字段存在為null可能的字段,都加上相應字段 is not null條件限制(因為hive在做關聯的時候,並不會對null = null這樣的條件進行關聯)
貼一個連接,將HiveJoinAddNotNullRule講的非常透徹,建議深入看一下:連接
比如執行這樣一個sql:
select * from (select Sname , Sex , Sage , Sdept , count(1) as num from student_ext group by Sname , Sex , Sage , Sdept) t1 inner join student_ext t2 on t1.Sname = t2.Sname where t1.Sage>10 and t2.Sdept = 'MA';
在HiveJoinAddNotNullRule規則優化前后對比的執行計划如下:
在經過HiveJoinAddNotNullRule優化前:
HiveProject(sname=[$0], sex=[$1], sage=[$2], sdept=[$3], num=[$4], sno=[$5], sname1=[$6], sex1=[$7], sage1=[$8], sdept1=[$9]) HiveFilter(condition=[AND(>($2, 10), =($9, 'MA'))]) HiveJoin(condition=[=($0, $6)], joinType=[inner], algorithm=[none], cost=[not available]) HiveProject(sname=[$0], sex=[$1], sage=[$2], sdept=[$3], num=[$4]) HiveAggregate(group=[{0, 1, 2, 3}], agg#0=[count($4)]) HiveProject($f0=[$1], $f1=[$2], $f2=[$3], $f3=[$4], $f4=[1]) HiveTableScan(table=[[default.student_ext]]) HiveTableScan(table=[[default.student_ext]])
在經過HiveJoinAddNotNullRule優化后:
HiveProject(sname=[$0], sex=[$1], sage=[$2], sdept=[$3], num=[$4], sno=[$5], sname1=[$6], sex1=[$7], sage1=[$8], sdept1=[$9]) HiveFilter(condition=[AND(>($2, 10), =($9, 'MA'))]) HiveJoin(condition=[=($0, $6)], joinType=[inner], algorithm=[none], cost=[not available]) HiveFilter(condition=[isnotnull($0)]) HiveProject(sname=[$0], sex=[$1], sage=[$2], sdept=[$3], num=[$4]) HiveAggregate(group=[{0, 1, 2, 3}], agg#0=[count($4)]) HiveProject($f0=[$1], $f1=[$2], $f2=[$3], $f3=[$4], $f4=[1]) HiveTableScan(table=[[default.student_ext]]) HiveFilter(condition=[isnotnull($1)]) HiveTableScan(table=[[default.student_ext]])
仔細觀察,多了 HiveFilter(condition=[isnotnull($1)]) ,這個就是HiveJoinAddNotNullRule規則的作用,出現在關聯條件中的字段存在為null可能的字段,都加上相應字段 is not null條件限制
2.4、Constant propagation, common filter extraction, and PPD
繼續跟進debug,程序走到 3. Constant propagation, common filter extraction, and PPD
第一個規則:ReduceExpressionsRule.PROJECT_INSTANCE 叫做常量折疊 , 比如我們寫這樣一個sql:
select 1+2 , a.name , a.age ,b.money from a left join b on a.id=b.id where a.name='張三' and b.department='it'
在沒有進行常量折疊優化之前,如果不進行常量折疊,那么每行數據都需要進行計算,顯然會增大sql的CPU使用情況
然后是下面的三個規則要放在一起:
ReduceExpressionsRule.FILTER_INSTANCE,
ReduceExpressionsRule.JOIN_INSTANCE
HivePreFilteringRule.INSTANCE
就是在join的過程中幫我們進行謂詞下推操作;
那么Constant propagation, common filter extraction, and PPD這個優化規則組合起來,用一張圖來說明一下,依然是sql:
select 1+2 , a.name , a.age ,b.money from a left join b on a.id=b.id where a.name='張三' and b.department='it'
圖:
從上圖的優化前后對比可以看到,當需要查詢的表數據量很大是后續,這種優化能極大優化:
1、join過程中的數據量 2、CPU計算次數
未完待續.............