通過TpchQueryRunner可以跑起來一個測試服務
仍然使用‘SELECT SUPPKEY, sum(QUANTITY) from lineitem where QUANTITY > 5 group by SUPPKEY limit 10’
Mac M1, Java CLI有bug,可以用python替代
conn = trino.dbapi.connect( host='localhost', port=8080, user='test', catalog='tpch', schema='tiny', request_timeout=30000 ) cur = conn.cursor() cur.execute('SELECT SUPPKEY, sum(QUANTITY) from lineitem where QUANTITY > 5 group by SUPPKEY limit 10') rows = cur.fetchmany()
SqlQueryExecution
前面的流程忽略,直接到SqlQueryExecution
start的核心邏輯,
planQuery -> doPlanQuery
plan,優化並生成plan
Presto的優化器不太具有參考價值,簡單看下數據的變化,
analysis是AST,語法樹,
轉化成初始的邏輯計划,已經從語法樹變成布爾的邏輯算子,
PlanNode root = planStatement(analysis, analysis.getStatement())
優化完的結果,最大的不同是加上ExchangeNode
createSubPlans
首先是,用Fragmenter,Visit整個plan,根據ExchangeNode生成fragment,
產生的效果如下,SimplePlanRewriter.rewriteWith
RemoteSourceNode,被替換成,Fragment“1”,
在FragmentProperties可以找到所有的Fragments,這里生成出兩個fragments,
buildRootFragment
將上面的fragments,封裝成SubPlan,
這里會將root封裝成fragment“0”,代表OutputNode
planDistribution
將Fragment封裝成StageExecutionPlan,
doPlan
封裝StageExecutionPlan,這里除了Fragment,
還多出3個東西,SplitSource,dependencies,tables;
其中SplitSource和tables,只有包含tableScan的Stage會有,這里就是Fragment2
dependences包含當前stage所依賴的stages,比如對於Fragment1,
splitSources
獲取存儲輸入的splits信息,依賴於存儲的實現,這里是tpch
獲取邏輯參考調用棧,
對於3個Fragment遞歸調用doPlan,在visit中,只有TableScan算子會觸發getSplits,其他的算子都是傳遞visit
Tpch的getSplits,只是根據節點數,每個節點splits數目,創建一堆TpchSplit
這里TpchSplit的組成很簡單,僅僅是partNum,node地址
最終doPlan得到的結果,
scheduler.start()
產生Scheduler,
SqlQueryExecution.start -> SqlQueryExecution.schedule
可以看到這里schedule是異步調用的,
對於每個stage,調動schedule
Schedule的過程,首先會選取一個Scheduler,
可以看到stage0和1,沒有source,所以選的是FixedCountScheduler,
對於不同Scheduler的區別,詳細參考,https://github.com/prestodb/presto/wiki/Stage-and-Source-Scheduler-and-Grouped-Execution
這個調度邏輯就是,對於每個node調度一個task
對於stage2,選擇FixedSourcePartitionedScheduler
邏輯是先類似FixedCountScheduler去創建task,然后再調用SourcePartitionedScheduler的邏輯(SourcePartitionedScheduler會為一批splits動態調度一個新的task,而FixedSourcePartitionedScheduler是使用先前調度好的task)
SourcePartitionedScheduler
調用棧,
SourcePartitionedScheduler,把splits分配到各個node上,
SqlStageExecution,把對應的splits,加入到task中,這里如果沒有事先生成的Task,會動態的生成一個新的task
最終Scheduler生成的調度結果是ScheduleResult
對於Fragment2對應的stage,生成了3個Task,平均分配了包含的24個splits
scheduleTask
在scheduleTask中,創建RemoteTask,並且start
繼續調用到,HttpRemoteTask的scheduleUpdate,
用線程池去調用,executor.execute(this::sendUpdate)
最終,通過HttpClient,將json化的task request發出到worker。