TaskExecutor
那么都准備好了,就要開始真正的執行了
初始化的時候
增加TaskRunner線程
TaskRunner
核心就是不斷的從waitingSplits中獲取split,然后process
到這會創建driver,
CreateDriver
先使用之前的operatorFactory,創建出Operator對象,然后創建Driver
這里看下,ExchangeOperator的例子
對於ScanFilterAndProjectOperator
需要封裝成,WorkProcessorSourceOperatorAdapter
這里會生成sourceOperator,ScanFilterAndProjectOperator
這里pages是workProcessor,如果要獲取page,需要調用process接口實際獲取
processFor
調用driver的processFor
processInternal
核心就是數據在pipeline中move的過程,可以看出在pipeline中數據是以push的方式進行的,在pipeline之間是pull的方式
Current.getOutput
pages的定義,
所以按照這個反向執行,直到SplitToPages,
process,
先是生成source,這里就是訪問TPCH的鏈接,從這里可以get到cursor,這是要作為參數用於get數據的
然后調用processColumnSource,
processColumnSource
返回一個processor,RecordCursorToPages,
RecordCursorToPages被調用時,process
這里就用到前面生成的cursorProcessor來獲取數據
cursorProcessor是CodeGen動態生成的,
代碼生成
在生成PhysicalOperation的同時,做代碼生成,
Visit,不停的迭代source,直到Filter
調用到visitScanFilterAndProject,幾個參數需要注意
sourceNode, 是TableScanNode
FilterExpression,代表過濾條件
outputSymbols,代表projects
這里邏輯,將filter和project生成,RowExpression
分別通過Complier生成CursorProcessor和PageProcessor類
這兩個應該是對等的,只會使用一個
對於PageProcessor
會把filter,project,CodeGen成class,傳入PageProcessor
所有processor都是通過process來執行,
這個processor的功能,執行filter得到selectedPosition,然后生成新的processor
這個ProjectSelectedPositions,再被process
調用,ProcessBatchResult result = processBatch(batchSize);
邏輯就是對於filter后的結果,執行project,最終返回ProcessResult
所以可以看出,PageProcess的目的就是對於page進行filter和project的操作,由於這里的filter和project是CodeGen的,所以整個部分都需要codeGen出來
對於CursorProcessor,更徹底,整個class都是動態生成的
expressionCompiler.compileCursorProcessor
這里核心邏輯是產生method,
可以看到主要生成,3中method,核心就是process,其他的filter和projec都是在process中需要調用到的函數
Process,
createProjectIfStatement,
Blocked
每個split執行時,如果上游數據不ready,會怎么處理?
在TaskExecutor.TaskRunner中,會根據返回的blocked來判斷
那么這個blocked怎么來的呢?
在Driver.processInternal中,如果沒有movedPage,即沒有數據被處理
那么就會從operator中獲取blocked
getBlockedFuture
這里取名有問題,isBlocked,不應該得到一個future,應該是一個bool
實現isBlocked的operator都是可能會出現block case的,基本都是和IO相關,大部分operator是不會block的
比如對於,WorkProcessorSourceOperatorAdapter
這里的firstFinishedFuture的挺有意思,
如果一個driver中有多個blocked點,那么需要找到最先完成的,那我怎么知道誰先完成了?
答案是,不知道;所以這里用SettableFuture,對於每個blocked加上listener,這樣完成的時候就會把自己set到result,從而返回。