Apache Doris 學習筆記: Backend執行Fragment


Executor Nodes

和impala的架構類似,一個sql語句從client輸入給Doris,會先經過fe(frontend)解析並生成若干fragment,
再分配並傳遞給be(backend)執行.

查看執行計划

這里可以使用explain來查看一個查詢的具體執行計划是什么樣的.

explain select sum(table1.pv) from table1 join table2 on table1.siteid=table2.siteid group by table1.siteid;

輸出為:

+-------------------------------------------------------------------------+
| Explain String                                                          |
+-------------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                         |
|  OUTPUT EXPRS:<slot 4> sum(`table1`.`pv`)                               |
|   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`table1`.`siteid` |
|                                                                         |
|   RESULT SINK                                                           |
|                                                                         |
|   3:AGGREGATE (update finalize)                                         |
|   |  output: sum(`table1`.`pv`)                                         |
|   |  group by: `table1`.`siteid`                                        |
|   |  cardinality=-1                                                     |
|   |                                                                     |
|   2:HASH JOIN                                                           |
|   |  join op: INNER JOIN (BROADCAST)                                    |
|   |  hash predicates:                                                   |
|   |  colocate: false, reason: Tables are not in the same group          |
|   |  equal join conjunct: `table1`.`siteid` = `table2`.`siteid`         |
|   |  runtime filters: RF000[in] <- `table2`.`siteid`                    |
|   |  cardinality=0                                                      |
|   |                                                                     |
|   |----4:EXCHANGE                                                       |
|   |                                                                     |
|   0:OlapScanNode                                                        |
|      TABLE: table1                                                      |
|      PREAGGREGATION: ON                                                 |
|      runtime filters: RF000[in] -> `table1`.`siteid`                    |
|      partitions=0/1                                                     |
|      rollup: null                                                       |
|      tabletRatio=0/0                                                    |
|      tabletList=                                                        |
|      cardinality=0                                                      |
|      avgRowSize=12.0                                                    |
|      numNodes=1                                                         |
|                                                                         |
| PLAN FRAGMENT 1                                                         |
|  OUTPUT EXPRS:                                                          |
|   PARTITION: RANDOM                                                     |
|                                                                         |
|   STREAM DATA SINK                                                      |
|     EXCHANGE ID: 04                                                     |
|     UNPARTITIONED                                                       |
|                                                                         |
|   1:OlapScanNode                                                        |
|      TABLE: table2                                                      |
|      PREAGGREGATION: OFF. Reason: null                                  |
|      partitions=0/3                                                     |
|      rollup: null                                                       |
|      tabletRatio=0/0                                                    |
|      tabletList=                                                        |
|      cardinality=0                                                      |
|      avgRowSize=4.0                                                     |
|      numNodes=1                                                         |
+-------------------------------------------------------------------------+

explain graph則能看到圖形化的樹形執行計划.

explain graph select sum(table1.pv) from table1 join table2 on table1.siteid=table2.siteid group by table1.siteid;

+------------------------------------------------------------------------------------------------------------+
| Explain String                                                                                             |
+------------------------------------------------------------------------------------------------------------+
|             ┌───────────────┐                                                                              |
|             │[3: ResultSink]│                                                                              |
|             │[Fragment: 0]  │                                                                              |
|             │RESULT SINK    │                                                                              |
|             └───────────────┘                                                                              |
|                     │                                                                                      |
|                     │                                                                                      |
|    ┌────────────────────────────────┐                                                                      |
|    │[3: AGGREGATE (update finalize)]│                                                                      |
|    │[Fragment: 0]                   │                                                                      |
|    └────────────────────────────────┘                                                                      |
|                     │                                                                                      |
|                     │                                                                                      |
|     ┌───────────────────────────────┐                                                                      |
|     │[2: HASH JOIN]                 │                                                                      |
|     │[Fragment: 0]                  │                                                                      |
|     │join op: INNER JOIN (BROADCAST)│                                                                      |
|     └───────────────────────────────┘                                                                      |
|          ┌──────────┴─────────┐                                                                            |
|          │                    │                                                                            |
| ┌─────────────────┐    ┌─────────────┐                                                                     |
| │[0: OlapScanNode]│    │[4: EXCHANGE]│                                                                     |
| │[Fragment: 0]    │    │[Fragment: 0]│                                                                     |
| │TABLE: table1    │    └─────────────┘                                                                     |
| └─────────────────┘           │                                                                            |
|                               │                                                                            |
|                     ┌───────────────────┐                                                                  |
|                     │[4: DataStreamSink]│                                                                  |
|                     │[Fragment: 1]      │                                                                  |
|                     │STREAM DATA SINK   │                                                                  |
|                     │  EXCHANGE ID: 04  │                                                                  |
|                     │  UNPARTITIONED    │                                                                  |
|                     └───────────────────┘                                                                  |
|                               │                                                                            |
|                               │                                                                            |
|                      ┌─────────────────┐                                                                   |
|                      │[1: OlapScanNode]│                                                                   |
|                      │[Fragment: 1]    │                                                                   |
|                      │TABLE: table2    │                                                                   |
|                      └─────────────────┘                                                                   |
+------------------------------------------------------------------------------------------------------------+

執行計划樹由exec node(算子)組成,以上面的查詢為例,數據從樹的葉子節點往上一直到根節點,經過一系列操作最終得到查詢結果.

接下來我們具體分析這個計划樹中的各個節點分別起什么作用.

首先,這個查詢被規划成了2個fragment,分別為\(f_{0}\)\(f_{1}\).
其中\(f_{1}\)\(f_{0}\)的子樹,它最底層的OlapScanNode從存儲層讀取table2的數據,然后經過DataStreamSinkNode,將數據傳遞給\(f_{0}\).

\(f_{0}\)\(f_{1}\)所代表的子樹表示為一個ExchangeNode,從\(f_{1}\)接受數據.

並且它自身也通過一個OlapScanNode讀取了table1的數據. table1table2HashJoinNode被join算子合並。
然后向上傳遞給AggregateNode(sum函數的聚合算子),最終結果通過ResultSinkNode返回給fe.

我們可以發現查詢規划實際上通過從執行計划樹拆分出若干子樹的方式,實現Shared Nothing的分布式執行.


未完待續


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM