Impala 源碼分析-FE


 

Impala 源代碼目錄結構

SQL 解析

Impala 的 SQL 解析與執行計划生成部分是由 impala-frontend(Java)實現的,監聽端口是 21000。用戶通過
Beeswax 接口 BeeswaxService.query() 提交一個請求,在 impalad 端的處理邏輯是由
void ImpalaServer::query(QueryHandle& query_handle, const Query& query) 這個函數(
ImpalaServer.h)完成的。

1 void ImpalaServer::query(QueryHandle& query_handle, const Query& query) {
2   VLOG_QUERY << "query(): query=" << query.query;
3   ScopedSessionState session_handle(this);
4   shared_ptr<SessionState> session;
5   RAISE_IF_ERROR(// 為當前連接返回唯一標識,標記會話為使用中並保存
6       session_handle.WithSession(ThriftServer::GetThreadConnectionId(), &session),
7       SQLSTATE_GENERAL_ERROR);
8   TQueryCtx query_ctx;
9   // 將 Query 轉化為 TQueryCtx
10   // raise general error for request conversion error;
11   RAISE_IF_ERROR(QueryToTQueryContext(query, &query_ctx), SQLSTATE_GENERAL_ERROR);
12  
13   // raise Syntax error or access violation; it's likely to be syntax/analysis error
14   // TODO: that may not be true; fix this
15   shared_ptr<QueryExecState> exec_state;
16     // 開始異步執行查詢,內部調用 ImpalaServer::Execute() 函數
17     // 將 TQueryCtx 轉換為 QueryExecState,注冊並調用 Coordinator::Execute()
18   RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state),
19       SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
20  
21   exec_state->UpdateQueryState(QueryState::RUNNING);
22   // start thread to wait for results to become available, which will allow
23   // us to advance query state to FINISHED or EXCEPTION
24   exec_state->WaitAsync();
25   // Once the query is running do a final check for session closure and add it to the
26   // set of in-flight queries.
27   Status status = SetQueryInflight(session, exec_state);
28   if (!status.ok()) {
29     UnregisterQuery(exec_state->query_id(), false, &status);
30     RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
31   }
32   TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle);
33 }

其中 QueryToTQueryContext(query, &query_ctx) 將 Query 裝換為 TQueryCtx。具體代碼實現如下:
(ImpalaServer.h)

1 Status ImpalaServer::QueryToTQueryContext(const Query& query,
2     TQueryCtx* query_ctx) {
3   query_ctx->request.stmt = query.query;
4   VLOG_QUERY << "query: " << ThriftDebugString(query);
5   {
6     shared_ptr<SessionState> session;
7     const TUniqueId& session_id = ThriftServer::GetThreadConnectionId();
8     RETURN_IF_ERROR(GetSessionState(session_id, &session));
9     DCHECK(session != NULL);
10     {
11       // The session is created when the client connects. Depending on the underlying
12       // transport, the username may be known at that time. If the username hasn't been
13       // set yet, set it now.
14       lock_guard<mutex> l(session->lock);
15       if (session->connected_user.empty()) session->connected_user = query.hadoop_user;
16       query_ctx->request.query_options = session->default_query_options;
17     }
18     // 構建該 SessionState 的 Thrift 表示用於序列化到 frontend
19     session->ToThrift(session_id, &query_ctx->session);
20   }
21  
22   // Override default query options with Query.Configuration
23   if (query.__isset.configuration) {
24     BOOST_FOREACH(const string& option, query.configuration) {
25       RETURN_IF_ERROR(ParseQueryOptions(option, &query_ctx->request.query_options));
26     }
27     VLOG_QUERY << "TClientRequest.queryOptions: "
28                << ThriftDebugString(query_ctx->request.query_options);
29   }
30  
31   return Status::OK();
32 }

內部調用 ImpalaServer::Execute()
(ImpalaServer.h)
函數將 TQueryCtx 轉換為 TExecRequest,具體邏輯通過調用 ImpalaServer::ExecuteInternal() 實現。代碼如下:

1 Status ImpalaServer::Execute(TQueryCtx* query_ctx,
2     shared_ptr<SessionState> session_state,
3     shared_ptr<QueryExecState>* exec_state) {
4   PrepareQueryContext(query_ctx);
5   bool registered_exec_state;
6   ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES->Increment(1L);
7  
8   // Redact the SQL stmt and update the query context
9   string stmt = replace_all_copy(query_ctx->request.stmt, "\n"" ");
10   Redact(&stmt);
11   query_ctx->request.__set_redacted_stmt((const string) stmt);
12     // 實現 Execute() 邏輯,出錯時不取消注冊查詢
13   Status status = ExecuteInternal(*query_ctx, session_state, &registered_exec_state,
14       exec_state);
15   if (!status.ok() && registered_exec_state) {
16     UnregisterQuery((*exec_state)->query_id(), false, &status);
17   }
18   return status;
19 }

上面的函數調用 ImpalaServer::ExecuteInternal()
(ImpalaServer.h)
在這個函數里通過 JNI 接口調用 frontend.createExecRequest() 生成 TExecRequest,具體代碼如下:

1 Status ImpalaServer::ExecuteInternal(
2     const TQueryCtx& query_ctx,
3     shared_ptr<SessionState> session_state,
4     bool* registered_exec_state,
5     shared_ptr<QueryExecState>* exec_state) {
6   DCHECK(session_state != NULL);
7   *registered_exec_state = false;
8   if (IsOffline()) {
9     return Status("This Impala server is offline. Please retry your query later.");
10   }
11   exec_state->reset(new QueryExecState(query_ctx, exec_env_, exec_env_->frontend(),
12       this, session_state));
13  
14   (*exec_state)->query_events()->MarkEvent("Start execution");
15  
16   TExecRequest result;
17   {
18     // Keep a lock on exec_state so that registration and setting
19     // result_metadata are atomic.
20     //
21     // Note: this acquires the exec_state lock *before* the
22     // query_exec_state_map_ lock. This is the opposite of
23     // GetQueryExecState(..., true), and therefore looks like a
24     // candidate for deadlock. The reason this works here is that
25     // GetQueryExecState cannot find exec_state (under the exec state
26     // map lock) and take it's lock until RegisterQuery has
27     // finished. By that point, the exec state map lock will have been
28     // given up, so the classic deadlock interleaving is not possible.
29     lock_guard<mutex> l(*(*exec_state)->lock());
30  
31     // register exec state as early as possible so that queries that
32     // take a long time to plan show up, and to handle incoming status
33     // reports before execution starts.
34     RETURN_IF_ERROR(RegisterQuery(session_state, *exec_state));
35     *registered_exec_state = true;
36  
37     RETURN_IF_ERROR((*exec_state)->UpdateQueryStatus(
38     // 通過 JNI 接口調用 frontend.createExecRequest() 生成 TExecRequest
39         exec_env_->frontend()->GetExecRequest(query_ctx, &result)));
40     (*exec_state)->query_events()->MarkEvent("Planning finished");
41     (*exec_state)->summary_profile()->AddEventSequence(
42         result.timeline.name, result.timeline);
43     if (result.__isset.result_set_metadata) {
44       (*exec_state)->set_result_metadata(result.result_set_metadata);
45     }
46   }
47   VLOG(2) << "Execution request: " << ThriftDebugString(result);
48  
49   // start execution of query; also starts fragment status reports
50   RETURN_IF_ERROR((*exec_state)->Exec(&result));
51   if (result.stmt_type == TStmtType::DDL) {
52     Status status = UpdateCatalogMetrics();
53     if (!status.ok()) {
54       VLOG_QUERY << "Couldn't update catalog metrics: " << status.GetDetail();
55     }
56   }
57  
58   if ((*exec_state)->coord() != NULL) {
59     const unordered_set<TNetworkAddress>& unique_hosts =
60         (*exec_state)->schedule()->unique_hosts();
61     if (!unique_hosts.empty()) {
62       lock_guard<mutex> l(query_locations_lock_);
63       BOOST_FOREACH(const TNetworkAddress& port, unique_hosts) {
64         query_locations_[port].insert((*exec_state)->query_id());
65       }
66     }
67   }
68   return Status::OK();
69 }

Frontend::GetExecRequest()
(Frontend.h)
通過 JNI 接口調用 frontend.createExecRequest() 生成 TExecRequest。具體實現代碼如下:

1 Status Frontend::GetExecRequest(
2     const TQueryCtx& query_ctx, TExecRequest* result) {
3   return JniUtil::CallJniMethod(fe_, create_exec_request_id_, query_ctx, result);
4 }

JniUtil::CallJniMethod()
(jni-util.h)
的具體實現代碼如下:

1 /// Utility methods to avoid repeating lots of the JNI call boilerplate. It seems these
2 /// must be defined in the header to compile properly.
3 template <typename T>
4 static Status CallJniMethod(const jobject& obj, const jmethodID& method, const T& arg) {
5   JNIEnv* jni_env = getJNIEnv();
6   jbyteArray request_bytes;
7   JniLocalFrame jni_frame;
8   RETURN_IF_ERROR(jni_frame.push(jni_env));
9   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &arg, &request_bytes));
10   jni_env->CallObjectMethod(obj, method, request_bytes);
11   RETURN_ERROR_IF_EXC(jni_env);
12   return Status::OK();
13 }

至此,將通過 Thrift 轉到 Java Frontend 生成執行計划樹。
public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
(Frontend.java)
是最重要的方法,它根據提供的 TQueryCtx 創建 TExecRequest。具體代碼(分析部分)如下:

 

1 /**
2  * Create a populated TExecRequest corresponding to the supplied TQueryCtx.
3  */
4 public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
5     throws ImpalaException {
6   // Analyzes the SQL statement included in queryCtx and returns the AnalysisResult.
7   AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx);
8   EventSequence timeline = analysisResult.getAnalyzer().getTimeline();
9   timeline.markEvent("Analysis finished");
10   .
11   .
12   .
13   .
14 }

首先通過調用 analyzeStmt()
(Frontend.java)
方法分析提交的 SQL 語句。analyzeStmt() 的具體實現代碼如下:

1 /**
2    * Analyzes the SQL statement included in queryCtx and returns the AnalysisResult.
3    */
4   private AnalysisContext.AnalysisResult analyzeStmt(TQueryCtx queryCtx)
5       throws AnalysisException, InternalException, AuthorizationException {
6     AnalysisContext analysisCtx = new AnalysisContext(dsqldCatalog_, queryCtx,
7         authzConfig_);
8     LOG.debug("analyze query " + queryCtx.request.stmt);
9  
10     // 循環分析直到出現以下某種情形:
11     // 1) 分析成功完成
12     // 2) 由於缺失表分析失敗並拋出 AnalysisException 異常
13     // 3) 分析失敗並拋出 AuthorizationException 異常
14     try {
15       while (true) {
16         try {
17           // 通過調用 AnalyzeContex.analyze() 實現具體的分析邏輯
18           analysisCtx.analyze(queryCtx.request.stmt);
19           Preconditions.checkState(analysisCtx.getAnalyzer().getMissingTbls().isEmpty());
20           return analysisCtx.getAnalysisResult();
21         catch (AnalysisException e) {
22           Set<TableName> missingTbls = analysisCtx.getAnalyzer().getMissingTbls();
23           // Only re-throw the AnalysisException if there were no missing tables.
24           if (missingTbls.isEmpty()) throw e;
25  
26           // Some tables/views were missing, request and wait for them to load.
27           if (!requestTblLoadAndWait(missingTbls, MISSING_TBL_LOAD_WAIT_TIMEOUT_MS)) {
28             LOG.info(String.format("Missing tables were not received in %dms. Load " +
29                 "request will be retried.", MISSING_TBL_LOAD_WAIT_TIMEOUT_MS));
30           }
31         }
32       }
33     finally {
34       // Authorize all accesses.
35       // AuthorizationExceptions must take precedence over any AnalysisException
36       // that has been thrown, so perform the authorization first.
37       analysisCtx.getAnalyzer().authorize(getAuthzChecker());
38     }
39   }

AnalyzerContext.AnalyzeResult.Analyzer 對象是個存放這個 SQL 所涉及到的所有信息
(包含Table, conjunct, slot,slotRefMap, eqJoinConjuncts等)的知識庫,所有跟這個
SQL 有關的東西都會存到 Analyzer對象里面。該類的定義可以查看
Analyzer.java
AnalyzerContex.analyze()
(AnalyzeContext.java)
的具體實現代碼如下:

1 /**
2  * Parse and analyze 'stmt'. If 'stmt' is a nested query (i.e. query that
3  * contains subqueries), it is also rewritten by performing subquery unnesting.
4  * The transformed stmt is then re-analyzed in a new analysis context.
5  */
6 public void analyze(String stmt) throws AnalysisException {
7   Analyzer analyzer = new Analyzer(catalog_, queryCtx_, authzConfig_);
8   analyze(stmt, analyzer);
9 }

上面的 analyze() 函數通過調用同名的重載函數 analyze(String stmt, Analyzer analyzer)
(AnalyzeContext.java)
實現具體的分析,代碼如下:

1 /**
2  * Parse and analyze 'stmt' using a specified Analyzer.
3  */
4 public void analyze(String stmt, Analyzer analyzer) throws AnalysisException {
5   SqlScanner input = new SqlScanner(new StringReader(stmt));
6   SqlParser parser = new SqlParser(input);
7   try {
8     analysisResult_ = new AnalysisResult();
9     analysisResult_.analyzer_ = analyzer;
10     if (analysisResult_.analyzer_ == null) {
11       analysisResult_.analyzer_ = new Analyzer(catalog_, queryCtx_, authzConfig_);
12     }
13     analysisResult_.stmt_ = (StatementBase) parser.parse().value;
14     if (analysisResult_.stmt_ == null)
15       return;
16  
17     // For CTAS(Create Table As Select), we copy the create statement
18     // in case we have to create a new CTAS statement after a query rewrite.
19     if (analysisResult_.stmt_ instanceof CreateTableAsSelectStmt) {
20       analysisResult_.tmpCreateTableStmt_ =
21           ((CreateTableAsSelectStmt) analysisResult_.stmt_).getCreateStmt().clone();
22     }
23  
24     analysisResult_.stmt_.analyze(analysisResult_.analyzer_);
25     boolean isExplain = analysisResult_.isExplainStmt();
26  
27     // Check if we need to rewrite the statement.
28     if (analysisResult_.requiresRewrite()) {
29       StatementBase rewrittenStmt = StmtRewriter.rewrite(analysisResult_);
30       // Re-analyze the rewritten statement.
31       Preconditions.checkNotNull(rewrittenStmt);
32       analysisResult_ = new AnalysisResult();
33       analysisResult_.analyzer_ = new Analyzer(catalog_, queryCtx_, authzConfig_);
34       analysisResult_.stmt_ = rewrittenStmt;
35       analysisResult_.stmt_.analyze(analysisResult_.analyzer_);
36       LOG.trace("rewrittenStmt: " + rewrittenStmt.toSql());
37       if (isExplain)
38         analysisResult_.stmt_.setIsExplain();
39     }
40   catch (AnalysisException e) {
41     // Don't wrap AnalysisExceptions in another AnalysisException
42     throw e;
43   catch (Exception e) {
44     throw new AnalysisException(parser.getErrorMsg(stmt), e);
45   }
46 }

上面的函數通過調用 SqlScanner 和 SqlParser 類實現具體的分析。可以查看
sql-scanner.flex

sql-parser.y

分析 SQL 語句的大概流程如下:

  1. 處理這個 SQL 所涉及到的 Table(即TableRefs),這些 Table 是在 from 從句中提取出來的(包含關鍵字
    from, join, on/using)。注意 JOIN 操作以及 on/using 條件是存儲在參與 JOIN 操作的右邊的表的 TableRef
    中並分析的。依次 analyze() 每個 TableRef,向 Analyzer 注冊 registerBaseTableRef(填充TupleDescriptor)。
    如果對應的 TableRef 涉及到 JOIN 操作,還要 analyzeJoin()。在 analyzeJoin() 時會向 Analyzer registerConjunct()
    填充 Analyzer 的一些成員變量:conjuncts,tuplePredicates(TupleId 與 conjunct 的映射),slotPredicates(SlotId
    與 conjunct 的映射),eqJoinConjuncts。
  2. 處理 select 從句(包含關鍵字 select, MAX(), AVG()等聚集函數):分析這個 SQL 都 select 了哪幾項,每一項都是個
    Expr 類型的子類對象,把這幾項填入 resultExprs 數組和 colLabels。然后把 resultExprs 里面的 Expr 都遞歸 analyze
    一下,要分析到樹的最底層,向 Analyzer 注冊 SlotRef 等。
  3. 分析 where 從句(關鍵字 where),首先遞歸 Analyze 從句中 Expr 組成的樹,然后向 Analyzer registerConjunct()
    填充 Analyzer 的一些成員變量(同1,此外還要填充 whereClauseConjuncts) 。
  4. 處理 sort 相關信息(關鍵字 order by)。先是解析 aliases 和 ordinals,然后從 order by 后面的從句中提取 Expr 填入
    orderingExprs,接着遞歸 Analyze 從句中 Expr 組成的樹,最后創建 SortInfo 對象。
  5. 處理 aggregation 相關信息(關鍵字 group by, having, avg, max 等)。首先遞歸分析 group by 從句里的 Expr,然后如果有
    having 從句就像 where 從句一樣,先是 analyze having 從句中 Expr 組成的樹,然后向 Analyzer registerConjunct()等。
  6. 處理 InlineView。

至此,詞法分析和語法分析都完成了,回到 frontend.createExecRequest()
(Frontend.java)
函數,開始填充 TExecRequest 內的成員變量。代碼如下(部分):

1 /**
2  * Create a populated TExecRequest corresponding to the supplied TQueryCtx.
3  */
4 public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
5     throws ImpalaException {
6   // Analyzes the SQL statement included in queryCtx and returns the AnalysisResult.
7   AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx);
8   EventSequence timeline = analysisResult.getAnalyzer().getTimeline();
9   timeline.markEvent("Analysis finished");
10    
11   // 開始填充 TExecRequest
12   Preconditions.checkNotNull(analysisResult.getStmt());
13   TExecRequest result = new TExecRequest();
14   result.setQuery_options(queryCtx.request.getQuery_options());
15   result.setAccess_events(analysisResult.getAccessEvents());
16   result.analysis_warnings = analysisResult.getAnalyzer().getWarnings();
17  
18   if (analysisResult.isCatalogOp()) {
19     result.stmt_type = TStmtType.DDL;
20     createCatalogOpRequest(analysisResult, result);
21     String jsonLineageGraph = analysisResult.getJsonLineageGraph();
22     if (jsonLineageGraph != null && !jsonLineageGraph.isEmpty()) {
23       result.catalog_op_request.setLineage_graph(jsonLineageGraph);
24     }
25     // All DDL operations except for CTAS are done with analysis at this point.
26     if (!analysisResult.isCreateTableAsSelectStmt()) return result;
27   else if (analysisResult.isLoadDataStmt()) {
28     result.stmt_type = TStmtType.LOAD;
29     result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
30         new TColumn("summary", Type.STRING.toThrift()))));
31     result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift());
32     return result;
33   else if (analysisResult.isSetStmt()) {
34     result.stmt_type = TStmtType.SET;
35     result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
36         new TColumn("option", Type.STRING.toThrift()),
37         new TColumn("value", Type.STRING.toThrift()))));
38     result.setSet_query_option_request(analysisResult.getSetStmt().toThrift());
39     return result;
40   }
41   .
42   .
43   .
44   .
45    
46 }

如果是 DDL 命令(use, show tables, show databases, describe),那么調用 createCatalogOpRequest()。
如果是 Load Data 或者 Set 語句,就調用相應的 setmetadata 並轉換為 Thrift。

執行計划生成

另外一種情況就是 Query 或者 DML 命令,那么就得創建和填充 TQueryExecRequest 了。該部分代碼如下:

1 /**
2    * Create a populated TExecRequest corresponding to the supplied TQueryCtx.
3    */
4   public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
5       throws DsqlException {
6     .
7     .
8     .
9     .
10     .
11     // create TQueryExecRequest 如果是 Query、DML、或 CTAS 語句
12     Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt()
13         || analysisResult.isCreateTableAsSelectStmt());
14  
15     TQueryExecRequest queryExecRequest = new TQueryExecRequest();
16     // create plan
17     LOG.debug("create plan");
18     Planner planner = new Planner(analysisResult, queryCtx);
19     // 根據 SQL 語法樹生成執行計划(PlanNode 和 PlanFragment)
20     // 用 Planner 把 SQL 解析出的語法樹轉換成 Plan fragments,后者能在各個 backend 被執行。
21     ArrayList<PlanFragment> fragments = planner.createPlan();
22  
23     List<ScanNode> scanNodes = Lists.newArrayList();
24     // 建立 queryExecRequest.fragments 中 fragment 到它索引的映射;
25     // queryExecRequest.dest_fragment_idx 需要這些映射
26     Map<PlanFragment, Integer> fragmentIdx = Maps.newHashMap();
27  
28     for (int fragmentId = 0; fragmentId < fragments.size(); ++fragmentId) {
29       PlanFragment fragment = fragments.get(fragmentId);
30       Preconditions.checkNotNull(fragment.getPlanRoot());
31       fragment.getPlanRoot().collect(Predicates.instanceOf(ScanNode.class), scanNodes);
32       fragmentIdx.put(fragment, fragmentId);
33     }
34     .
35     .
36     .
37     .
38     .
39   }

上面的 createPlan() 函數是 frontend 最重要的函數:根據 SQL 解析的結果和 client 傳入的 query options,
生成執行計划。執行計划是用 PlanFragment 的數組表示的,最后會序列化到 TQueryExecRequest.fragments
然后傳給 backend 的 coordinator 去調度執行。現在讓我們來看看 createPlan()
(Planner.java)
的具體實現:

1 /**
2  * Returns a list of plan fragments for executing an analyzed parse tree.
3  * May return a single-node or distributed executable plan.
4  */
5 public ArrayList<PlanFragment> createPlan() throws ImpalaException {
6   SingleNodePlanner singleNodePlanner = new SingleNodePlanner(ctx_);
7   DistributedPlanner distributedPlanner = new DistributedPlanner(ctx_);
8   // 首先生成 SingleNodePlan,單節點執行計划樹
9   PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
10   ctx_.getRootAnalyzer().getTimeline().markEvent("Single node plan created");
11   ArrayList<PlanFragment> fragments = null;
12  
13   // Determine the maximum number of rows processed by any node in the plan tree
14   MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor();
15   singleNodePlan.accept(visitor);
16   long maxRowsProcessed = visitor.get() == -1 ? Long.MAX_VALUE : visitor.get();
17   boolean isSmallQuery =
18       maxRowsProcessed < ctx_.getQueryOptions().exec_single_node_rows_threshold;
19   if (isSmallQuery) {
20     // Execute on a single node and disable codegen for small results
21     ctx_.getQueryOptions().setNum_nodes(1);
22     ctx_.getQueryOptions().setDisable_codegen(true);
23     if (maxRowsProcessed < ctx_.getQueryOptions().batch_size ||
24         maxRowsProcessed < 1024 && ctx_.getQueryOptions().batch_size == 0) {
25       // Only one scanner thread for small queries
26       ctx_.getQueryOptions().setNum_scanner_threads(1);
27     }
28   }
29  
30   if (ctx_.isSingleNodeExec()) {// 如果是單節點執行計划樹
31     // 創建保護整個單點計划樹的片段
32     fragments = Lists.newArrayList(new PlanFragment(
33         ctx_.getNextFragmentId(), singleNodePlan, DataPartition.UNPARTITIONED));
34   else {// 分布式執行計划樹
35     // create distributed plan
36     fragments = distributedPlanner.createPlanFragments(singleNodePlan);
37   }
38       // 最后一個 Fragment 是根 fragment
39   PlanFragment rootFragment = fragments.get(fragments.size() - 1);
40   if (ctx_.isInsertOrCtas()) {
41     InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt();
42     if (!ctx_.isSingleNodeExec()) {
43       // repartition on partition keys
44       rootFragment = distributedPlanner.createInsertFragment(
45           rootFragment, insertStmt, ctx_.getRootAnalyzer(), fragments);
46     }
47     // set up table sink for root fragment
48     rootFragment.setSink(insertStmt.createDataSink());
49   }
50  
51   ColumnLineageGraph graph = ctx_.getRootAnalyzer().getColumnLineageGraph();
52   List<Expr> resultExprs = null;
53   Table targetTable = null;
54   if (ctx_.isInsertOrCtas()) {
55     InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt();
56     resultExprs = insertStmt.getResultExprs();
57     targetTable = insertStmt.getTargetTable();
58     graph.addTargetColumnLabels(targetTable);
59   else {
60     resultExprs = ctx_.getQueryStmt().getResultExprs();
61     graph.addTargetColumnLabels(ctx_.getQueryStmt().getColLabels());
62   }
63   resultExprs = Expr.substituteList(resultExprs,
64       rootFragment.getPlanRoot().getOutputSmap(), ctx_.getRootAnalyzer(), true);
65   rootFragment.setOutputExprs(resultExprs);
66   LOG.debug("desctbl: " + ctx_.getRootAnalyzer().getDescTbl().debugString());
67   LOG.debug("resultexprs: " + Expr.debugString(rootFragment.getOutputExprs()));
68   LOG.debug("finalize plan fragments");
69   for (PlanFragment fragment: fragments) {
70     fragment.finalize(ctx_.getRootAnalyzer());
71   }
72  
73   Collections.reverse(fragments);
74   ctx_.getRootAnalyzer().getTimeline().markEvent("Distributed plan created");
75  
76   if (RuntimeEnv.INSTANCE.computeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) {
77     // Compute the column lineage graph
78     if (ctx_.isInsertOrCtas()) {
79       Preconditions.checkNotNull(targetTable);
80       List<Expr> exprs = Lists.newArrayList();
81       if (targetTable instanceof HBaseTable) {
82         exprs.addAll(resultExprs);
83       else {
84         exprs.addAll(ctx_.getAnalysisResult().getInsertStmt().getPartitionKeyExprs());
85         exprs.addAll(resultExprs.subList(0,
86             targetTable.getNonClusteringColumns().size()));
87       }
88       graph.computeLineageGraph(exprs, ctx_.getRootAnalyzer());
89     else {
90       graph.computeLineageGraph(resultExprs, ctx_.getRootAnalyzer());
91     }
92     LOG.trace("lineage: " + graph.debugString());
93     ctx_.getRootAnalyzer().getTimeline().markEvent("Lineage info computed");
94   }
95  
96   return fragments;
97 }

createPlan 包括createSingleNodePlan 和 createPlanFragments
兩個主要部分。其中第一個是單節點計划樹,所有片段只能在一個節點 corrd 上執行,第二個是分布式執行計划樹,片段可以分配到不同的節點中運行。我們先來看看 SingleNodePlanner.createSingleNodePlan()
(SingleNodePlanner.java)
該方法根據 Planner Context 中分析的語法樹創建單節點執行計划樹並返回根節點。計划遞歸處理語法樹並執行以下操作,自上而下處理查詢語句:

  1. materialize the slots required for evaluating expressions of that statement
  2. migrate conjuncts from parent blocks into inline views and union operands In the bottom-up phase generate the plan tree for every query statement:
  3. perform join-order optimization when generating the plan of the FROM clause of a select statement; requires that all materialized slots are known for an accurate estimate of row sizes needed for cost-based join ordering
  4. assign conjuncts that can be evaluated at that node and compute the stats of that node (cardinality, etc.)
  5. apply combined expression substitution map of child plan nodes; if a plan node re-maps its input, set a substitution map to be applied by parents

具體代碼如下:

1 /**
2  * Generates and returns the root of the single-node plan for the analyzed parse tree
3  * in the planner context.
4  */
5 public PlanNode createSingleNodePlan() throws ImpalaException {
6   QueryStmt queryStmt = ctx_.getQueryStmt();
7   // Use the stmt's analyzer which is not necessarily the root analyzer
8   // to detect empty result sets.
9   Analyzer analyzer = queryStmt.getAnalyzer();
10   analyzer.computeEquivClasses();
11   analyzer.getTimeline().markEvent("Equivalence classes computed");
12  
13   // Mark slots referenced by output exprs as materialized, prior to generating the
14   // plan tree.
15   // We need to mark the result exprs of the topmost select block as materialized, so
16   // that PlanNode.init() can compute the final mem layout of materialized tuples
17   // (the byte size of tuples is needed for cost computations).
18   // TODO: instead of materializing everything produced by the plan root, derive
19   // referenced slots from destination fragment and add a materialization node
20   // if not all output is needed by destination fragment
21   // TODO 2: should the materialization decision be cost-based?
22   if (queryStmt.getBaseTblResultExprs() != null) {
23     analyzer.materializeSlots(queryStmt.getBaseTblResultExprs());
24   }
25  
26   LOG.trace("desctbl: " + analyzer.getDescTbl().debugString());
27   PlanNode singleNodePlan = createQueryPlan(queryStmt, analyzer,
28       ctx_.getQueryOptions().isDisable_outermost_topn());
29   Preconditions.checkNotNull(singleNodePlan);
30   return singleNodePlan;
31 }

上面的函數通過調用私有的 createQueryPlan()
(SingleNodePlanner.java)
函數實現。該函數為單節點執行創建計划樹。為查詢語句中的
Select/Project/Join/Union [All]/Group by/Having/Order by
生成 PlanNode。具體實現代碼如下:

1 /**
2  * Create plan tree for single-node execution. Generates PlanNodes for the
3  * Select/Project/Join/Union [All]/Group by/Having/Order by clauses of the query stmt.
4  */
5 private PlanNode createQueryPlan(QueryStmt stmt, Analyzer analyzer, booleandisableTopN)
6     throws ImpalaException {
7     // Analyzer 檢測結果集是否為空,如果是的話直接返回空節點
8   if (analyzer.hasEmptyResultSet()) return createEmptyNode(stmt, analyzer);
9  
10   PlanNode root;
11   if (stmt instanceof SelectStmt) {// 如果是 select 語句
12     SelectStmt selectStmt = (SelectStmt) stmt;
13     // 創建 SelectPlan
14     root = createSelectPlan(selectStmt, analyzer);
15  
16     // insert possible AnalyticEvalNode before SortNode
17     if (((SelectStmt) stmt).getAnalyticInfo() != null) {
18       AnalyticInfo analyticInfo = selectStmt.getAnalyticInfo();
19       ArrayList<TupleId> stmtTupleIds = Lists.newArrayList();
20       stmt.getMaterializedTupleIds(stmtTupleIds);
21       AnalyticPlanner analyticPlanner =
22           new AnalyticPlanner(stmtTupleIds, analyticInfo, analyzer, ctx_);
23       List<Expr> inputPartitionExprs = Lists.newArrayList();
24       AggregateInfo aggInfo = selectStmt.getAggInfo();
25       root = analyticPlanner.createSingleNodePlan(root,
26           aggInfo != null ? aggInfo.getGroupingExprs() : null, inputPartitionExprs);
27       if (aggInfo != null && !inputPartitionExprs.isEmpty()) {
28         // analytic computation will benefit from a partition on inputPartitionExprs
29         aggInfo.setPartitionExprs(inputPartitionExprs);
30       }
31     }
32   else {// 否則,創建 UnionPlan
33     Preconditions.checkState(stmt instanceof UnionStmt);
34     root = createUnionPlan((UnionStmt) stmt, analyzer);
35   }
36  
37   // 如果 sort 元組有沒有物化的槽,避免添加 sort node,
38   boolean sortHasMaterializedSlots = false;
39   if (stmt.evaluateOrderBy()) {
40     for (SlotDescriptor sortSlotDesc:
41       stmt.getSortInfo().getSortTupleDescriptor().getSlots()) {
42       if (sortSlotDesc.isMaterialized()) {
43         sortHasMaterializedSlots = true;
44         break;
45       }
46     }
47   }
48  
49   if (stmt.evaluateOrderBy() && sortHasMaterializedSlots) {
50     long limit = stmt.getLimit();
51     // TODO: External sort could be used for very large limits
52     // not just unlimited order-by
53     boolean useTopN = stmt.hasLimit() && !disableTopN;
54     // 創建 sort node
55     root = new SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(),
56         useTopN, stmt.getOffset());
57     Preconditions.checkState(root.hasValidStats());
58     root.setLimit(limit);
59     root.init(analyzer);
60   else {
61     root.setLimit(stmt.getLimit());
62     root.computeStats(analyzer);
63   }
64  
65   return root;
66 }

SingleNodePlanner.createSelectPlan()
(SingleNodePlanner.java)
函數創建實現 select 查詢語句塊中
Select/Project/Join/Group by/Having 等從句的 PlanNode 樹。具體實現代碼如下:

1 /**
2  * Create tree of PlanNodes that implements the Select/Project/Join/Group by/Having
3  * of the selectStmt query block.
4  */
5 private PlanNode createSelectPlan(SelectStmt selectStmt, Analyzer analyzer)
6     throws ImpalaException {
7   // no from clause -> materialize the select's exprs with a UnionNode
8   // 如果 select 語句沒有引用任何 table,創建 ConstantSelectPlan
9   if (selectStmt.getTableRefs().isEmpty()) {
10     return createConstantSelectPlan(selectStmt, analyzer);
11   }
12  
13   // Slot materialization:
14   // We need to mark all slots as materialized that are needed during the execution
15   // of selectStmt, and we need to do that prior to creating plans for the TableRefs
16   // (because createTableRefNode() might end up calling computeMemLayout() on one or
17   // more TupleDescriptors, at which point all referenced slots need to be marked).
18   //
19   // For non-join predicates, slots are marked as follows:
20   // - for base table scan predicates, this is done directly by ScanNode.init(), which
21   //   can do a better job because it doesn't need to materialize slots that are only
22   //   referenced for partition pruning, for instance
23   // - for inline views, non-join predicates are pushed down, at which point the
24   //   process repeats itself.
25   selectStmt.materializeRequiredSlots(analyzer);
26  
27   ArrayList<TupleId> rowTuples = Lists.newArrayList();
28   // collect output tuples of subtrees
29   for (TableRef tblRef: selectStmt.getTableRefs()) {
30     rowTuples.addAll(tblRef.getMaterializedTupleIds());
31   }
32  
33   // 如果 select 語句中的 select、project、join 部分返回空結果集
34   // 用空集創建滿足 select 語句的 AggregationPlan
35   // Make sure the slots of the aggregation exprs and the tuples that they reference
36   // are materialized (see IMPALA-1960).
37   if (analyzer.hasEmptySpjResultSet()) {
38     PlanNode emptySetNode = new EmptySetNode(ctx_.getNextNodeId(), rowTuples);
39     emptySetNode.init(analyzer);
40     emptySetNode.setOutputSmap(selectStmt.getBaseTblSmap());
41     return createAggregationPlan(selectStmt, analyzer, emptySetNode);
42   }
43  
44   // 為 table refs 創建 Plan;這里使用 list 而不是 map 是為了保證生成 join plan
45   // 時遍歷 TableRefs 有一個確定的順序
46   List<Pair<TableRef, PlanNode>> refPlans = Lists.newArrayList();
47   for (TableRef ref: selectStmt.getTableRefs()) {
48     PlanNode plan = createTableRefNode(analyzer, ref);
49     Preconditions.checkState(plan != null);
50     refPlans.add(new Pair(ref, plan));
51   }
52   // save state of conjunct assignment; needed for join plan generation
53   for (Pair<TableRef, PlanNode> entry: refPlans) {
54     entry.second.setAssignedConjuncts(analyzer.getAssignedConjuncts());
55   }
56  
57   PlanNode root = null;
58   // 如果有足夠的統計數據,例如 join 操作各個 table 的大小,創建開銷最小的 JoinPlan
59   if (!selectStmt.getSelectList().isStraightJoin()) {
60     Set<ExprId> assignedConjuncts = analyzer.getAssignedConjuncts();
61     root = createCheapestJoinPlan(analyzer, refPlans);
62     if (root == null) analyzer.setAssignedConjuncts(assignedConjuncts);
63   }
64   // 否則,根據 from 從句中 table 順序創建 JoinPlan
65   if (selectStmt.getSelectList().isStraightJoin() || root == null) {
66     // we didn't have enough stats to do a cost-based join plan, or the STRAIGHT_JOIN
67     // keyword was in the select list: use the FROM clause order instead
68     root = createFromClauseJoinPlan(analyzer, refPlans);
69     Preconditions.checkNotNull(root);
70   }
71  
72   // 如果有聚集操作,創建 AggregationPlan
73   if (selectStmt.getAggInfo() != null) {
74     root = createAggregationPlan(selectStmt, analyzer, root);
75   }
76  
77   // All the conjuncts_ should be assigned at this point.
78   // TODO: Re-enable this check here and/or elswehere.
79   //Preconditions.checkState(!analyzer.hasUnassignedConjuncts());
80   return root;
81 }

上面函數中調用的主要私有方法有:
createTableRefNode()、createCheapestJoinPlan()、 createFromClauseJoinPlan()、 createAggregationPlan(),各個函數的具體實現如下:

createTableRefNode()

1 /**
2  * Create a tree of PlanNodes for the given tblRef, which can be a BaseTableRef,
3  * CollectionTableRef or an InlineViewRef.
4  */
5 private PlanNode createTableRefNode(Analyzer analyzer, TableRef tblRef)
6     throws ImpalaException {
7   if (tblRef instanceof BaseTableRef || tblRef instanceof CollectionTableRef) {
8   // 創建 ScanNode
9     return createScanNode(analyzer, tblRef);
10   else if (tblRef instanceof InlineViewRef) {
11   // 創建 InlineViewPlan
12     return createInlineViewPlan(analyzer, (InlineViewRef) tblRef);
13   }
14   throw new InternalException(
15       "Unknown TableRef node: " + tblRef.getClass().getSimpleName());
16 }

createCheapestJoinPlan()

1 /**
2  * 返回物化 join refPlans 中所有 TblRefs 開銷最小的 plan
3  * 假設 refPlans 中的順序和查詢中的原始順序相同
4  * For this plan:
5  * - the plan is executable, ie, all non-cross joins have equi-join predicates
6  * - the leftmost scan is over the largest of the inputs for which we can still
7  *   construct an executable plan(左邊的是最大表)
8  * - all rhs's(right hand side?) are in decreasing order of selectiveness (percentage of rows they
9  *   eliminate)
10  * - outer/cross/semi joins: rhs serialized size is < lhs serialized size;(右邊的表比左邊的小)
11  *   enforced via join inversion, if necessary(否則通過 join 反轉實現)
12  * Returns null if we can't create an executable plan.
13  */
14 private PlanNode createCheapestJoinPlan(
15     Analyzer analyzer, List<Pair<TableRef, PlanNode>> refPlans)
16     throws ImpalaException {
17   LOG.trace("createCheapestJoinPlan");
18   if (refPlans.size() == 1return refPlans.get(0).second;
19  
20   // collect eligible candidates for the leftmost input; list contains
21   // (plan, materialized size)
22   ArrayList<Pair<TableRef, Long>> candidates = Lists.newArrayList();
23   for (Pair<TableRef, PlanNode> entry: refPlans) {
24     TableRef ref = entry.first;
25     JoinOperator joinOp = ref.getJoinOp();
26  
27     // The rhs table of an outer/semi join can appear as the left-most input if we
28     // invert the lhs/rhs and the join op. However, we may only consider this inversion
29     // for the very first join in refPlans, otherwise we could reorder tables/joins
30     // across outer/semi joins which is generally incorrect. The null-aware
31     // left anti-join operator is never considered for inversion because we can't
32     // execute the null-aware right anti-join efficiently.
33     // TODO: Allow the rhs of any cross join as the leftmost table. This needs careful
34     // consideration of the joinOps that result from such a re-ordering (IMPALA-1281).
35     if (((joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) &&
36         ref != refPlans.get(1).first) || joinOp.isNullAwareLeftAntiJoin()) {
37       // ref cannot appear as the leftmost input
38       continue;
39     }
40  
41     PlanNode plan = entry.second;
42     if (plan.getCardinality() == -1) {
43       // use 0 for the size to avoid it becoming the leftmost input
44       // TODO: Consider raw size of scanned partitions in the absence of stats.
45       candidates.add(new Pair(ref, new Long(0)));
46       LOG.trace("candidate " + ref.getUniqueAlias() + ": 0");
47       continue;
48     }
49     Preconditions.checkNotNull(ref.getDesc());
50     long materializedSize =
51         (long) Math.ceil(plan.getAvgRowSize() * (double) plan.getCardinality());
52     candidates.add(new Pair(ref, new Long(materializedSize)));
53     LOG.trace("candidate " + ref.getUniqueAlias() + ": " + Long.toString(materializedSize));
54   }
55   if (candidates.isEmpty()) return null;
56  
57   // order candidates by descending materialized size; we want to minimize the memory
58   // consumption of the materialized hash tables required for the join sequence
59   Collections.sort(candidates,
60       new Comparator<Pair<TableRef, Long>>() {
61         public int compare(Pair<TableRef, Long> a, Pair<TableRef, Long> b) {
62           long diff = b.second - a.second;
63           return (diff < 0 ? -1 : (diff > 0 1 0));
64         }
65       });
66  
67       // 根據已經按照大小排序的 table 創建 JoinPlan
68   for (Pair<TableRef, Long> candidate: candidates) {
69     PlanNode result = createJoinPlan(analyzer, candidate.first, refPlans);
70     if (result != nullreturn result;
71   }
72   return null;
73 }

createFromClauseJoinPlan()

1 /**
2  * 返回按照 from 語句順序的 JoinPlan
3  */
4 private PlanNode createFromClauseJoinPlan(
5     Analyzer analyzer, List<Pair<TableRef, PlanNode>> refPlans)
6     throws ImpalaException {
7   // create left-deep sequence of binary hash joins; assign node ids as we go along
8   Preconditions.checkState(!refPlans.isEmpty());
9   PlanNode root = refPlans.get(0).second;
10   for (int i = 1; i < refPlans.size(); ++i) {
11     TableRef innerRef = refPlans.get(i).first;
12     PlanNode innerPlan = refPlans.get(i).second;
13     root = createJoinNode(analyzer, root, innerPlan, null, innerRef);
14     root.setId(ctx_.getNextNodeId());
15   }
16   return root;
17 }

createAggregationPlan()

1 /**
2  * Returns a new AggregationNode that materializes the aggregation of the given stmt.
3  * Assigns conjuncts from the Having clause to the returned node.
4  */
5 private PlanNode createAggregationPlan(SelectStmt selectStmt, Analyzer analyzer,
6     PlanNode root) throws InternalException {
7   Preconditions.checkState(selectStmt.getAggInfo() != null);
8   // add aggregation, if required
9   AggregateInfo aggInfo = selectStmt.getAggInfo();
10   root = new AggregationNode(ctx_.getNextNodeId(), root, aggInfo);
11   root.init(analyzer);
12   Preconditions.checkState(root.hasValidStats());
13   // if we're computing DISTINCT agg fns, the analyzer already created the
14   // 2nd phase agginfo
15   if (aggInfo.isDistinctAgg()) {
16     ((AggregationNode)root).unsetNeedsFinalize();
17     // The output of the 1st phase agg is the 1st phase intermediate.
18     ((AggregationNode)root).setIntermediateTuple();
19     root = new AggregationNode(ctx_.getNextNodeId(), root,
20         aggInfo.getSecondPhaseDistinctAggInfo());
21     root.init(analyzer);
22     Preconditions.checkState(root.hasValidStats());
23   }
24   // add Having clause
25   root.assignConjuncts(analyzer);
26   return root;
27 }

上面的 createCheapestJoinPlan() 和 createFromClauseJoinPlan()
方法調用了 createJoinNode() 和 createJoinPlan() 兩個方法。它們的具體實現如下:

createJoinNode()

1 /**
2  * 創建 join outer 和 inner 的 node。兩者其中之一可能是一個根據 table ref 創建的 plan
3  * 但不能同時都是 plan。對應的 outer/inner tableRef 不能為空
4  */
5 private PlanNode createJoinNode(
6     Analyzer analyzer, PlanNode outer, PlanNode inner, TableRef outerRef,
7     TableRef innerRef) throws ImpalaException {
8   Preconditions.checkState(innerRef != null ^ outerRef != null);
9   TableRef tblRef = (innerRef != null) ? innerRef : outerRef;
10  
11   List<BinaryPredicate> eqJoinConjuncts = Lists.newArrayList();
12   List<Expr> eqJoinPredicates = Lists.newArrayList();
13   // get eq join predicates for the TableRefs' ids (not the PlanNodes' ids, which
14   // are materialized)
15   if (innerRef != null) {
16     getHashLookupJoinConjuncts(
17         analyzer, outer.getTblRefIds(), innerRef, eqJoinConjuncts, eqJoinPredicates);
18     // Outer joins should only use On-clause predicates as eqJoinConjuncts.
19     if (!innerRef.getJoinOp().isOuterJoin()) {
20       analyzer.createEquivConjuncts(outer.getTblRefIds(), innerRef.getId(),
21           eqJoinConjuncts);
22     }
23   else {
24     getHashLookupJoinConjuncts(
25         analyzer, inner.getTblRefIds(), outerRef, eqJoinConjuncts, eqJoinPredicates);
26     // Outer joins should only use On-clause predicates as eqJoinConjuncts.
27     if (!outerRef.getJoinOp().isOuterJoin()) {
28       analyzer.createEquivConjuncts(inner.getTblRefIds(), outerRef.getId(),
29           eqJoinConjuncts);
30     }
31     // Reverse the lhs/rhs of the join conjuncts.
32     for (BinaryPredicate eqJoinConjunct: eqJoinConjuncts) {
33       Expr swapTmp = eqJoinConjunct.getChild(0);
34       eqJoinConjunct.setChild(0, eqJoinConjunct.getChild(1));
35       eqJoinConjunct.setChild(1, swapTmp);
36     }
37   }
38  
39   // 處理隱含交叉 join
40   if (eqJoinConjuncts.isEmpty()) {
41     // Since our only implementation of semi and outer joins is hash-based, and we do
42     // not re-order semi and outer joins, we must have eqJoinConjuncts here to execute
43     // this query.
44     // TODO: Revisit when we add more semi/join implementations. Pick up and pass in
45     // the otherJoinConjuncts.
46     if (tblRef.getJoinOp().isOuterJoin() ||
47         tblRef.getJoinOp().isSemiJoin()) {
48       throw new NotImplementedException(
49           String.format("%s join with '%s' without equi-join " +
50           "conjuncts is not supported.",
51           tblRef.getJoinOp().isOuterJoin() ? "Outer" "Semi",
52           innerRef.getUniqueAlias()));
53     }
54     CrossJoinNode result =
55         new CrossJoinNode(outer, inner, tblRef, Collections.<Expr>emptyList());
56     result.init(analyzer);
57     return result;
58   }
59  
60   // 處理顯式交叉 join
61   if (tblRef.getJoinOp() == JoinOperator.CROSS_JOIN) {
62     tblRef.setJoinOp(JoinOperator.INNER_JOIN);
63   }
64  
65   analyzer.markConjunctsAssigned(eqJoinPredicates);
66  
67   List<Expr> otherJoinConjuncts = Lists.newArrayList();
68   if (tblRef.getJoinOp().isOuterJoin()) {// 外連接
69     // Also assign conjuncts from On clause. All remaining unassigned conjuncts
70     // that can be evaluated by this join are assigned in createSelectPlan().
71     otherJoinConjuncts = analyzer.getUnassignedOjConjuncts(tblRef);
72   else if (tblRef.getJoinOp().isSemiJoin()) {// 半連接
73     // Unassigned conjuncts bound by the invisible tuple id of a semi join must have
74     // come from the join's On-clause, and therefore, must be added to the other join
75     // conjuncts to produce correct results.
76     otherJoinConjuncts =
77         analyzer.getUnassignedConjuncts(tblRef.getAllTupleIds(), false);
78     if (tblRef.getJoinOp().isNullAwareLeftAntiJoin()) {// 對空值敏感的反連接
79       boolean hasNullMatchingEqOperator = false;
80       // Keep only the null-matching eq conjunct in the eqJoinConjuncts and move
81       // all the others in otherJoinConjuncts. The BE relies on this
82       // separation for correct execution of the null-aware left anti join.
83       Iterator<BinaryPredicate> it = eqJoinConjuncts.iterator();
84       while (it.hasNext()) {
85         BinaryPredicate conjunct = it.next();
86         if (!conjunct.isNullMatchingEq()) {
87           otherJoinConjuncts.add(conjunct);
88           it.remove();
89         else {
90           // Only one null-matching eq conjunct is allowed
91           Preconditions.checkState(!hasNullMatchingEqOperator);
92           hasNullMatchingEqOperator = true;
93         }
94       }
95       Preconditions.checkState(hasNullMatchingEqOperator);
96     }
97   }
98   analyzer.markConjunctsAssigned(otherJoinConjuncts);
99  
100   HashJoinNode result =
101       new HashJoinNode(outer, inner, tblRef, eqJoinConjuncts, otherJoinConjuncts);
102   result.init(analyzer);
103   return result;
104 }

createJoinPlan()

1 /**
2  * Returns a plan with leftmostRef's plan as its leftmost input; the joins
3  * are in decreasing order of selectiveness (percentage of rows they eliminate).
4  * The leftmostRef's join will be inverted if it is an outer/semi/cross join.
5  */
6 private PlanNode createJoinPlan(
7     Analyzer analyzer, TableRef leftmostRef, List<Pair<TableRef, PlanNode>> refPlans)
8     throws ImpalaException {
9  
10   LOG.trace("createJoinPlan: " + leftmostRef.getUniqueAlias());
11   // 等待 join 的 tableref
12   List<Pair<TableRef, PlanNode>> remainingRefs = Lists.newArrayList();
13   PlanNode root = null;  // root of accumulated join plan
14   for (Pair<TableRef, PlanNode> entry: refPlans) {
15     if (entry.first == leftmostRef) {
16       root = entry.second;
17     else {
18       remainingRefs.add(entry);
19     }
20   }
21   Preconditions.checkNotNull(root);
22   // 已經 join 的 refs;joinedRefs 和 remainingRefs 中 refs 的 union 就是所有 table refs
23   Set<TableRef> joinedRefs = Sets.newHashSet();
24   joinedRefs.add(leftmostRef);
25  
26   // 如果最左邊的 TblRef 是 outer/semi/cross join,反轉
27   boolean planHasInvertedJoin = false;
28   if (leftmostRef.getJoinOp().isOuterJoin()
29       || leftmostRef.getJoinOp().isSemiJoin()
30       || leftmostRef.getJoinOp().isCrossJoin()) {
31     // TODO: Revisit the interaction of join inversion here and the analysis state
32     // that is changed in analyzer.invertOuterJoin(). Changing the analysis state
33     // should not be necessary because the semantics of an inverted outer join do
34     // not change.
35     leftmostRef.invertJoin(refPlans, analyzer);
36     planHasInvertedJoin = true;
37   }
38  
39   long numOps = 0;
40   int i = 0;
41   while (!remainingRefs.isEmpty()) {
42     // Join 鏈中的每一步都最小化結果數目,從而最小化 hash table 查找
43     PlanNode newRoot = null;
44     Pair<TableRef, PlanNode> minEntry = null;
45     for (Pair<TableRef, PlanNode> entry: remainingRefs) {
46       TableRef ref = entry.first;
47       LOG.trace(Integer.toString(i) + " considering ref " + ref.getUniqueAlias());
48  
49       // Determine whether we can or must consider this join at this point in the plan.
50       // Place outer/semi joins at a fixed position in the plan tree (IMPALA-860),
51       // s.t. all the tables appearing to the left/right of an outer/semi join in
52       // the original query still remain to the left/right after join ordering. This
53       // prevents join re-ordering across outer/semi joins which is generally wrong.
54       // The checks below relies on remainingRefs being in the order as they originally
55       // appeared in the query.
56       JoinOperator joinOp = ref.getJoinOp();
57       if (joinOp.isOuterJoin() || joinOp.isSemiJoin()) {
58         List<TupleId> currentTids = Lists.newArrayList(root.getTblRefIds());
59         currentTids.add(ref.getId());
60         // Place outer/semi joins at a fixed position in the plan tree. We know that
61         // the join resulting from 'ref' must become the new root if the current
62         // root materializes exactly those tuple ids corresponding to TableRefs
63         // appearing to the left of 'ref' in the original query.
64         List<TupleId> tableRefTupleIds = ref.getAllTupleIds();
65         if (!currentTids.containsAll(tableRefTupleIds) ||
66             !tableRefTupleIds.containsAll(currentTids)) {
67           // Do not consider the remaining table refs to prevent incorrect re-ordering
68           // of tables across outer/semi/anti joins.
69           break;
70         }
71       else if (ref.getJoinOp().isCrossJoin()) {
72         if (!joinedRefs.contains(ref.getLeftTblRef())) continue;
73       }
74  
75       PlanNode rhsPlan = entry.second;
76       analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
77  
78       boolean invertJoin = false;
79       if (joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) {
80         // Invert the join if doing so reduces the size of build-side hash table
81         // (may also reduce network costs depending on the join strategy).
82         // Only consider this optimization if both the lhs/rhs cardinalities are known.
83         // The null-aware left anti-join operator is never considered for inversion
84         // because we can't execute the null-aware right anti-join efficiently.
85         long lhsCard = root.getCardinality();
86         long rhsCard = rhsPlan.getCardinality();
87         if (lhsCard != -1 && rhsCard != -1 &&
88             lhsCard * root.getAvgRowSize() < rhsCard * rhsPlan.getAvgRowSize() &&
89             !joinOp.isNullAwareLeftAntiJoin()) {
90           invertJoin = true;
91         }
92       }
93       PlanNode candidate = null;
94       if (invertJoin) {
95         ref.setJoinOp(ref.getJoinOp().invert());
96         candidate = createJoinNode(analyzer, rhsPlan, root, ref, null);
97         planHasInvertedJoin = true;
98       else {
99         candidate = createJoinNode(analyzer, root, rhsPlan, null, ref);
100       }
101       if (candidate == nullcontinue;
102       LOG.trace("cardinality=" + Long.toString(candidate.getCardinality()));
103  
104       // Use 'candidate' as the new root; don't consider any other table refs at this
105       // position in the plan.
106       if (joinOp.isOuterJoin() || joinOp.isSemiJoin()) {
107         newRoot = candidate;
108         minEntry = entry;
109         break;
110       }
111  
112       // 優先選擇 Hash Join 而不是 Cross Join, due to limited costing infrastructure
113       if (newRoot == null
114           || (candidate.getClass().equals(newRoot.getClass())
115               && candidate.getCardinality() < newRoot.getCardinality())
116           || (candidate instanceof HashJoinNode && newRoot instanceofCrossJoinNode)) {
117         newRoot = candidate;
118         minEntry = entry;
119       }
120     }
121     if (newRoot == null) {
122       // Currently, it should not be possible to invert a join for a plan that turns
123       // out to be non-executable because (1) the joins we consider for inversion are
124       // barriers in the join order, and (2) the caller of this function only considers
125       // other leftmost table refs if a plan turns out to be non-executable.
126       // TODO: This preconditions check will need to be changed to undo the in-place
127       // modifications made to table refs for join inversion, if the caller decides to
128       // explore more leftmost table refs.
129       Preconditions.checkState(!planHasInvertedJoin);
130       return null;
131     }
132  
133     // we need to insert every rhs row into the hash table and then look up
134     // every lhs row
135     long lhsCardinality = root.getCardinality();
136     long rhsCardinality = minEntry.second.getCardinality();
137     numOps += lhsCardinality + rhsCardinality;
138     LOG.debug(Integer.toString(i) + " chose " + minEntry.first.getUniqueAlias()
139         " #lhs=" + Long.toString(lhsCardinality)
140         " #rhs=" + Long.toString(rhsCardinality)
141         " #ops=" + Long.toString(numOps));
142     remainingRefs.remove(minEntry);
143     joinedRefs.add(minEntry.first);
144     root = newRoot;
145     // assign id_ after running through the possible choices in order to end up
146     // with a dense sequence of node ids
147     root.setId(ctx_.getNextNodeId());
148     analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
149     ++i;
150   }
151  
152   return root;
153 }

至此我們已經大概介紹了 createSingleNodePlan 的過程。
現在讓我們回到 createPlan() 函數,來看看創建分布式執行計划樹,即 createPlanFrangments 過程。

 

DistributedPlanner.createPlanFragments()
(Planner.java)
方法為單點計划樹生成多個片段。具體代碼如下:

1 /**
2  * 根據一些執行選項為單點計划樹創建多個片段
3  * 片段通過 list 返回,list 中位置 i 的片段只能使用片段 j 的輸出(j > i)。
4  *
5  * TODO: 考慮計划片段中的數據分片; 尤其是要比 createQueryPlan() 更加注重協調
6  * 聚集操作中 hash partitioning 以及分析計算中的 hash partitioning。
7  * (只有在相同 select 塊中進行聚集和分析計算時才會發生協調)
8  */
9 public ArrayList<PlanFragment> createPlanFragments(
10     PlanNode singleNodePlan) throws ImpalaException {
11   Preconditions.checkState(!ctx_.isSingleNodeExec());
12   AnalysisContext.AnalysisResult analysisResult = ctx_.getAnalysisResult();
13   QueryStmt queryStmt = ctx_.getQueryStmt();
14   ArrayList<PlanFragment> fragments = Lists.newArrayList();
15   // 對於 insert 或 CTAS,除非有 limit 限制才保持根片段 partitioned
16   // 否則,合並所有為一個單獨的 coordinator fragment 以便傳回到客戶端
17   boolean isPartitioned = false;
18   if ((analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt())
19       && !singleNodePlan.hasLimit()) {
20     Preconditions.checkState(!queryStmt.hasOffset());
21     isPartitioned = true;
22   }
23   LOG.debug("create plan fragments");
24   long perNodeMemLimit = ctx_.getQueryOptions().mem_limit;
25   LOG.debug("memlimit=" + Long.toString(perNodeMemLimit));
26   // 調用私有方法
27   createPlanFragments(singleNodePlan, isPartitioned, perNodeMemLimit, fragments);
28   return fragments;
29 }

上面的方法調用私有成員方法 DistributedPlanner.createPlanFragments()
DistributedPlanner.java
該方法返回生成 root 結果的 fragments。具體代碼如下:

1   /**
2  * 返回生成 'root' 結果的 fragments; 遞歸創建所有 input fragments 到返回的 fragment
3  * 如果創建了一個新的 fragment,會被追加到 ‘fragments’,這樣 fragment 就會在所有需要
4  * 它們的輸出的 fragments 前面。
5  * 如果 'isPartitioned' 為否,,那么返回的 fragment 就是 unpartitioned;
6  * 否則就可能是 partitioned, 取決於它的輸入是否 partitioned;
7  * the partition function is derived from the inputs.
8  */
9 private PlanFragment createPlanFragments(
10     PlanNode root, boolean isPartitioned,
11     long perNodeMemLimit, ArrayList<PlanFragment> fragments)
12     throws InternalException, NotImplementedException {
13   ArrayList<PlanFragment> childFragments = Lists.newArrayList();
14   for (PlanNode child: root.getChildren()) {
15     // 允許子 fragments 是 partition 的,除非它們保護 limit 從句。
16     // (因為包含 limit 限制的結果集需要集中計算);
17     // 如果需要的話在后面合並
18     boolean childIsPartitioned = !child.hasLimit();
19     // 遞歸調用 createPlanFragments,將 child 創建的 PlanFragments 添加到 childFragments
20     childFragments.add(
21         createPlanFragments(
22           child, childIsPartitioned, perNodeMemLimit, fragments));
23   }
24       // 根據 root 的不同 Node 類型創建不同的 Fragment
25   PlanFragment result = null;
26   if (root instanceof ScanNode) {
27     result = createScanFragment(root);
28     fragments.add(result);
29   else if (root instanceof HashJoinNode) {
30     Preconditions.checkState(childFragments.size() == 2);
31     result = createHashJoinFragment(
32         (HashJoinNode) root, childFragments.get(1), childFragments.get(0),
33         perNodeMemLimit, fragments);
34   else if (root instanceof CrossJoinNode) {
35     Preconditions.checkState(childFragments.size() == 2);
36     result = createCrossJoinFragment(
37         (CrossJoinNode) root, childFragments.get(1), childFragments.get(0),
38         perNodeMemLimit, fragments);
39   else if (root instanceof SelectNode) {
40     result = createSelectNodeFragment((SelectNode) root, childFragments);
41   else if (root instanceof UnionNode) {
42     result = createUnionNodeFragment((UnionNode) root, childFragments, fragments);
43   else if (root instanceof AggregationNode) {
44     result = createAggregationFragment(
45         (AggregationNode) root, childFragments.get(0), fragments);
46   else if (root instanceof SortNode) {
47     if (((SortNode) root).isAnalyticSort()) {
48       // don't parallelize this like a regular SortNode
49       result = createAnalyticFragment(
50           (SortNode) root, childFragments.get(0), fragments);
51     else {
52       result = createOrderByFragment(
53           (SortNode) root, childFragments.get(0), fragments);
54     }
55   else if (root instanceof AnalyticEvalNode) {
56     result = createAnalyticFragment(root, childFragments.get(0), fragments);
57   else if (root instanceof EmptySetNode) {
58     result = new PlanFragment(
59         ctx_.getNextFragmentId(), root, DataPartition.UNPARTITIONED);
60   else {
61     throw new InternalException(
62         "Cannot create plan fragment for this node type: " + root.getExplainString());
63   }
64   // move 'result' to end, it depends on all of its children
65   fragments.remove(result);
66   fragments.add(result);
67       // 如果已經分區,還需要創建 MergeFragment
68   if (!isPartitioned && result.isPartitioned()) {
69     result = createMergeFragment(result);
70     fragments.add(result);
71   }
72  
73   return result;
74 }

上面的方法調用了大量的 create*Fragment() 私有成員方法。這些成員方法的具體實現可以查看源文件:
DistributedPlanner.java

這些成員方法都返回了 PlanFragment 實例,關於該類的具體實現可以查看源代碼:
PlanFragment.java

至此,我們大概介紹了 createPlanFragments 的過程。

 

由於 createSingleNodePlan 和 createPlanFragments 兩個 createPlan 最重要的部分都已經介紹了,
createPlan 也就介紹到這里。現在讓我們回到 frontend.createExecRequest()
繼續來看剩下的內容。frontend.createExecRequest() 其余代碼如下:

1   /**
2  * Create a populated TExecRequest corresponding to the supplied TQueryCtx.
3  */
4 public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
5     throws ImpalaException {
6   .
7   .
8   .
9   .
10   .
11  
12   // 設置 fragment 的目的地
13   for (int i = 1; i < fragments.size(); ++i) {
14     PlanFragment dest = fragments.get(i).getDestFragment();
15     Integer idx = fragmentIdx.get(dest);
16     Preconditions.checkState(idx != null);
17     queryExecRequest.addToDest_fragment_idx(idx.intValue());
18   }
19  
20   // 為 Scan node 設置 scan 范圍/位置
21   // Also assemble list of tables names missing stats for assembling a warning message.
22   LOG.debug("get scan range locations");
23   Set<TTableName> tablesMissingStats = Sets.newTreeSet();
24   for (ScanNode scanNode: scanNodes) {
25     queryExecRequest.putToPer_node_scan_ranges(
26         scanNode.getId().asInt(),
27         scanNode.getScanRangeLocations());
28     if (scanNode.isTableMissingStats()) {
29       tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift());
30     }
31   }
32   // 設置主機列表
33   queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList());
34   for (TTableName tableName: tablesMissingStats) {
35     queryCtx.addToTables_missing_stats(tableName);
36   }
37  
38   // Optionally disable spilling in the backend. Allow spilling if there are plan hints
39   // or if all tables have stats.
40   if (queryCtx.request.query_options.isDisable_unsafe_spills()
41       && !tablesMissingStats.isEmpty()
42       && !analysisResult.getAnalyzer().hasPlanHints()) {
43     queryCtx.setDisable_spilling(true);
44   }
45  
46   // 計算資源需求,因為 scan node 的開銷估計取決於這些
47   try {
48     planner.computeResourceReqs(fragments, true, queryExecRequest);
49   catch (Exception e) {
50     // 將異常轉換為警告,以便查詢能繼續執行
51     LOG.error("Failed to compute resource requirements for query\n" +
52         queryCtx.request.getStmt(), e);
53   }
54  
55   // 到了這里 fragment 所有信息都設置好了,序列化到 Thrift
56   for (PlanFragment fragment: fragments) {
57     TPlanFragment thriftFragment = fragment.toThrift();
58     queryExecRequest.addToFragments(thriftFragment);
59   }
60  
61   // Use VERBOSE by default for all non-explain statements.
62   TExplainLevel explainLevel = TExplainLevel.VERBOSE;
63   // Use the query option for explain stmts and tests (e.g., planner tests).
64   if (analysisResult.isExplainStmt() || RuntimeEnv.INSTANCE.isTestEnv()) {
65     explainLevel = queryCtx.request.query_options.getExplain_level();
66   }
67  
68   // Global query parameters to be set in each TPlanExecRequest.
69   queryExecRequest.setQuery_ctx(queryCtx);
70  
71   explainString.append(
72       planner.getExplainString(fragments, queryExecRequest, explainLevel));
73   queryExecRequest.setQuery_plan(explainString.toString());
74   queryExecRequest.setDesc_tbl(analysisResult.getAnalyzer().getDescTbl().toThrift());
75  
76   String jsonLineageGraph = analysisResult.getJsonLineageGraph();
77   if (jsonLineageGraph != null && !jsonLineageGraph.isEmpty()) {
78     queryExecRequest.setLineage_graph(jsonLineageGraph);
79   }
80  
81   if (analysisResult.isExplainStmt()) {
82     // Return the EXPLAIN request
83     createExplainRequest(explainString.toString(), result);
84     return result;
85   }
86  
87   result.setQuery_exec_request(queryExecRequest);
88  
89   if (analysisResult.isQueryStmt()) {
90     // 填充元數據
91     LOG.debug("create result set metadata");
92     result.stmt_type = TStmtType.QUERY;
93     result.query_exec_request.stmt_type = result.stmt_type;
94     TResultSetMetadata metadata = new TResultSetMetadata();
95     QueryStmt queryStmt = analysisResult.getQueryStmt();
96     int colCnt = queryStmt.getColLabels().size();
97     for (int i = 0; i < colCnt; ++i) {
98       TColumn colDesc = new TColumn();
99       colDesc.columnName = queryStmt.getColLabels().get(i);
100       colDesc.columnType = queryStmt.getResultExprs().get(i).getType().toThrift();
101       metadata.addToColumns(colDesc);
102     }
103     result.setResult_set_metadata(metadata);
104   else {
105     Preconditions.checkState(analysisResult.isInsertStmt() ||
106         analysisResult.isCreateTableAsSelectStmt());
107  
108     // For CTAS the overall TExecRequest statement type is DDL, but the
109     // query_exec_request should be DML
110     result.stmt_type =
111         analysisResult.isCreateTableAsSelectStmt() ? TStmtType.DDL : TStmtType.DML;
112     result.query_exec_request.stmt_type = TStmtType.DML;
113  
114     // create finalization params of insert stmt
115     InsertStmt insertStmt = analysisResult.getInsertStmt();
116     if (insertStmt.getTargetTable() instanceof HdfsTable) {
117       TFinalizeParams finalizeParams = new TFinalizeParams();
118       finalizeParams.setIs_overwrite(insertStmt.isOverwrite());
119       finalizeParams.setTable_name(insertStmt.getTargetTableName().getTbl());
120       finalizeParams.setTable_id(insertStmt.getTargetTable().getId().asInt());
121       String db = insertStmt.getTargetTableName().getDb();
122       finalizeParams.setTable_db(db == null ? queryCtx.session.database : db);
123       HdfsTable hdfsTable = (HdfsTable) insertStmt.getTargetTable();
124       finalizeParams.setHdfs_base_dir(hdfsTable.getHdfsBaseDir());
125       finalizeParams.setStaging_dir(
126           hdfsTable.getHdfsBaseDir() + "/_impala_insert_staging");
127       queryExecRequest.setFinalize_params(finalizeParams);
128     }
129   }
130  
131   validateTableIds(analysisResult.getAnalyzer(), result);
132  
133   timeline.markEvent("Planning finished");
134   result.setTimeline(analysisResult.getAnalyzer().getTimeline().toThrift());
135   return result;
136 }

至此,FE 結束,返回 TExecRequest 型的對象給 backend 執行。

由於筆者剛開始接觸 Impala,分析可能存在某些謬誤,有任何疑問或建議都歡迎討論。



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM