Executor Nodes
和impala的架構類似,一個sql語句從client輸入給Doris,會先經過fe
(frontend)解析並生成若干fragment
,
再分配並傳遞給be
(backend)執行.
查看執行計划
這里可以使用explain
來查看一個查詢的具體執行計划是什么樣的.
explain select sum(table1.pv) from table1 join table2 on table1.siteid=table2.siteid group by table1.siteid;
輸出為:
+-------------------------------------------------------------------------+
| Explain String |
+-------------------------------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:<slot 4> sum(`table1`.`pv`) |
| PARTITION: HASH_PARTITIONED: `default_cluster:test`.`table1`.`siteid` |
| |
| RESULT SINK |
| |
| 3:AGGREGATE (update finalize) |
| | output: sum(`table1`.`pv`) |
| | group by: `table1`.`siteid` |
| | cardinality=-1 |
| | |
| 2:HASH JOIN |
| | join op: INNER JOIN (BROADCAST) |
| | hash predicates: |
| | colocate: false, reason: Tables are not in the same group |
| | equal join conjunct: `table1`.`siteid` = `table2`.`siteid` |
| | runtime filters: RF000[in] <- `table2`.`siteid` |
| | cardinality=0 |
| | |
| |----4:EXCHANGE |
| | |
| 0:OlapScanNode |
| TABLE: table1 |
| PREAGGREGATION: ON |
| runtime filters: RF000[in] -> `table1`.`siteid` |
| partitions=0/1 |
| rollup: null |
| tabletRatio=0/0 |
| tabletList= |
| cardinality=0 |
| avgRowSize=12.0 |
| numNodes=1 |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 04 |
| UNPARTITIONED |
| |
| 1:OlapScanNode |
| TABLE: table2 |
| PREAGGREGATION: OFF. Reason: null |
| partitions=0/3 |
| rollup: null |
| tabletRatio=0/0 |
| tabletList= |
| cardinality=0 |
| avgRowSize=4.0 |
| numNodes=1 |
+-------------------------------------------------------------------------+
用explain graph
則能看到圖形化的樹形執行計划.
explain graph select sum(table1.pv) from table1 join table2 on table1.siteid=table2.siteid group by table1.siteid;
+------------------------------------------------------------------------------------------------------------+
| Explain String |
+------------------------------------------------------------------------------------------------------------+
| ┌───────────────┐ |
| │[3: ResultSink]│ |
| │[Fragment: 0] │ |
| │RESULT SINK │ |
| └───────────────┘ |
| │ |
| │ |
| ┌────────────────────────────────┐ |
| │[3: AGGREGATE (update finalize)]│ |
| │[Fragment: 0] │ |
| └────────────────────────────────┘ |
| │ |
| │ |
| ┌───────────────────────────────┐ |
| │[2: HASH JOIN] │ |
| │[Fragment: 0] │ |
| │join op: INNER JOIN (BROADCAST)│ |
| └───────────────────────────────┘ |
| ┌──────────┴─────────┐ |
| │ │ |
| ┌─────────────────┐ ┌─────────────┐ |
| │[0: OlapScanNode]│ │[4: EXCHANGE]│ |
| │[Fragment: 0] │ │[Fragment: 0]│ |
| │TABLE: table1 │ └─────────────┘ |
| └─────────────────┘ │ |
| │ |
| ┌───────────────────┐ |
| │[4: DataStreamSink]│ |
| │[Fragment: 1] │ |
| │STREAM DATA SINK │ |
| │ EXCHANGE ID: 04 │ |
| │ UNPARTITIONED │ |
| └───────────────────┘ |
| │ |
| │ |
| ┌─────────────────┐ |
| │[1: OlapScanNode]│ |
| │[Fragment: 1] │ |
| │TABLE: table2 │ |
| └─────────────────┘ |
+------------------------------------------------------------------------------------------------------------+
執行計划樹由exec node
(算子)組成,以上面的查詢為例,數據從樹的葉子節點往上一直到根節點,經過一系列操作最終得到查詢結果.
接下來我們具體分析這個計划樹中的各個節點分別起什么作用.
首先,這個查詢被規划成了2個fragment
,分別為\(f_{0}\)和\(f_{1}\).
其中\(f_{1}\)為\(f_{0}\)的子樹,它最底層的OlapScanNode
從存儲層讀取table2
的數據,然后經過DataStreamSinkNode
,將數據傳遞給\(f_{0}\).
在\(f_{0}\)中\(f_{1}\)所代表的子樹表示為一個ExchangeNode
,從\(f_{1}\)接受數據.
並且它自身也通過一個OlapScanNode
讀取了table1
的數據. table1
和table2
在HashJoinNode
被join算子合並。
然后向上傳遞給AggregateNode
(sum函數的聚合算子),最終結果通過ResultSinkNode
返回給fe
.
我們可以發現查詢規划實際上通過從執行計划樹拆分出若干子樹的方式,實現Shared Nothing
的分布式執行.
未完待續