PrestoSQL(trinodb)源碼分析 -優化和調度


通過TpchQueryRunner可以跑起來一個測試服務

仍然使用‘SELECT SUPPKEY, sum(QUANTITY) from lineitem where QUANTITY > 5 group by SUPPKEY limit 10’

Mac M1, Java CLI有bug,可以用python替代

conn = trino.dbapi.connect(
    host='localhost',
    port=8080,
    user='test',
    catalog='tpch',
    schema='tiny',
    request_timeout=30000
)
cur = conn.cursor()
cur.execute('SELECT SUPPKEY, sum(QUANTITY) from lineitem where QUANTITY > 5 group by SUPPKEY limit 10')
rows = cur.fetchmany()

 

SqlQueryExecution

前面的流程忽略,直接到SqlQueryExecution

start的核心邏輯,

 

planQuery -> doPlanQuery

 

plan,優化並生成plan

 

Presto的優化器不太具有參考價值,簡單看下數據的變化,

analysis是AST,語法樹,

 

 轉化成初始的邏輯計划,已經從語法樹變成布爾的邏輯算子,

PlanNode root = planStatement(analysis, analysis.getStatement())

 

 

優化完的結果,最大的不同是加上ExchangeNode

 

createSubPlans

首先是,用Fragmenter,Visit整個plan,根據ExchangeNode生成fragment,

 

產生的效果如下,SimplePlanRewriter.rewriteWith

 

RemoteSourceNode,被替換成,Fragment“1”,

在FragmentProperties可以找到所有的Fragments,這里生成出兩個fragments,

 

 

buildRootFragment

將上面的fragments,封裝成SubPlan,

這里會將root封裝成fragment“0”,代表OutputNode

 

planDistribution

將Fragment封裝成StageExecutionPlan,

doPlan

封裝StageExecutionPlan,這里除了Fragment,

還多出3個東西,SplitSource,dependencies,tables;

其中SplitSource和tables,只有包含tableScan的Stage會有,這里就是Fragment2 

 

dependences包含當前stage所依賴的stages,比如對於Fragment1,

 

splitSources

獲取存儲輸入的splits信息,依賴於存儲的實現,這里是tpch

獲取邏輯參考調用棧,

對於3個Fragment遞歸調用doPlan,在visit中,只有TableScan算子會觸發getSplits,其他的算子都是傳遞visit

 

Tpch的getSplits,只是根據節點數,每個節點splits數目,創建一堆TpchSplit

 

這里TpchSplit的組成很簡單,僅僅是partNum,node地址

 

最終doPlan得到的結果,

 

scheduler.start()

產生Scheduler,

SqlQueryExecution.start -> SqlQueryExecution.schedule

可以看到這里schedule是異步調用的,

 

對於每個stage,調動schedule

 

Schedule的過程,首先會選取一個Scheduler,

 

可以看到stage0和1,沒有source,所以選的是FixedCountScheduler,

對於不同Scheduler的區別,詳細參考,https://github.com/prestodb/presto/wiki/Stage-and-Source-Scheduler-and-Grouped-Execution

這個調度邏輯就是,對於每個node調度一個task

 

對於stage2,選擇FixedSourcePartitionedScheduler

邏輯是先類似FixedCountScheduler去創建task,然后再調用SourcePartitionedScheduler的邏輯(SourcePartitionedScheduler會為一批splits動態調度一個新的task,而FixedSourcePartitionedScheduler是使用先前調度好的task)

 

SourcePartitionedScheduler

調用棧,

 

SourcePartitionedScheduler,把splits分配到各個node上,

 

 

SqlStageExecution,把對應的splits,加入到task中,這里如果沒有事先生成的Task,會動態的生成一個新的task

最終Scheduler生成的調度結果是ScheduleResult

對於Fragment2對應的stage,生成了3個Task,平均分配了包含的24個splits

 

  

scheduleTask

在scheduleTask中,創建RemoteTask,並且start

繼續調用到,HttpRemoteTask的scheduleUpdate,

用線程池去調用,executor.execute(this::sendUpdate)

最終,通過HttpClient,將json化的task request發出到worker。

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM