PrestoSQL(trinodb)源碼分析 - 執行(下)


TaskExecutor

那么都准備好了,就要開始真正的執行了

初始化的時候

增加TaskRunner線程

 

  

TaskRunner

核心就是不斷的從waitingSplits中獲取split,然后process

 

到這會創建driver,

 

CreateDriver

 

先使用之前的operatorFactory,創建出Operator對象,然后創建Driver

這里看下,ExchangeOperator的例子

  

對於ScanFilterAndProjectOperator

 

需要封裝成,WorkProcessorSourceOperatorAdapter

這里會生成sourceOperator,ScanFilterAndProjectOperator

這里pages是workProcessor,如果要獲取page,需要調用process接口實際獲取

 

processFor

調用driver的processFor

 

 

 

processInternal

核心就是數據在pipeline中move的過程,可以看出在pipeline中數據是以push的方式進行的,在pipeline之間是pull的方式

 

Current.getOutput

 

pages的定義,

 

 所以按照這個反向執行,直到SplitToPages,

 

process,

先是生成source,這里就是訪問TPCH的鏈接,從這里可以get到cursor,這是要作為參數用於get數據的

然后調用processColumnSource,

 

 processColumnSource

返回一個processor,RecordCursorToPages,

 

RecordCursorToPages被調用時,process

這里就用到前面生成的cursorProcessor來獲取數據

cursorProcessor是CodeGen動態生成的,

 

代碼生成

在生成PhysicalOperation的同時,做代碼生成,

 

Visit,不停的迭代source,直到Filter

 

調用到visitScanFilterAndProject,幾個參數需要注意

sourceNode, 是TableScanNode

FilterExpression,代表過濾條件

outputSymbols,代表projects

 

 這里邏輯,將filter和project生成,RowExpression

 

分別通過Complier生成CursorProcessor和PageProcessor類

這兩個應該是對等的,只會使用一個

 

 

對於PageProcessor

會把filter,project,CodeGen成class,傳入PageProcessor

 

所有processor都是通過process來執行,

這個processor的功能,執行filter得到selectedPosition,然后生成新的processor

 

這個ProjectSelectedPositions,再被process

調用,ProcessBatchResult result = processBatch(batchSize);

邏輯就是對於filter后的結果,執行project,最終返回ProcessResult

 

所以可以看出,PageProcess的目的就是對於page進行filter和project的操作,由於這里的filter和project是CodeGen的,所以整個部分都需要codeGen出來

 

對於CursorProcessor,更徹底,整個class都是動態生成的

expressionCompiler.compileCursorProcessor

 

 

 

這里核心邏輯是產生method,

可以看到主要生成,3中method,核心就是process,其他的filter和projec都是在process中需要調用到的函數

 

Process,

 

 createProjectIfStatement,

 

 

Blocked

每個split執行時,如果上游數據不ready,會怎么處理?

在TaskExecutor.TaskRunner中,會根據返回的blocked來判斷

 

那么這個blocked怎么來的呢?

在Driver.processInternal中,如果沒有movedPage,即沒有數據被處理

那么就會從operator中獲取blocked

 

getBlockedFuture 

這里取名有問題,isBlocked,不應該得到一個future,應該是一個bool

 

實現isBlocked的operator都是可能會出現block case的,基本都是和IO相關,大部分operator是不會block的

比如對於,WorkProcessorSourceOperatorAdapter

 

這里的firstFinishedFuture的挺有意思,

如果一個driver中有多個blocked點,那么需要找到最先完成的,那我怎么知道誰先完成了?

答案是,不知道;所以這里用SettableFuture,對於每個blocked加上listener,這樣完成的時候就會把自己set到result,從而返回。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM