1. 概述
當向Flink集群提交用戶作業時,從用戶角度看,只需要作業處理邏輯正確,輸出正確的結果即可;而不用關心作業何時被調度的,作業申請的資源又是如何被分配的以及作業何時會結束;但是了解作業在運行時的具體行為對於我們深入了解Flink原理有非常大的幫助,並且對我們如何編寫更合理的作業邏輯有指導意義,因此本文詳細分析作業的調度及資源分配以及作業的生命周期。
2. 流程分析
基於社區master主線(1.11-SNAPSHOT),commit: 12f7873db54cfbc5bf853d66ccd4093f9b749c9a ,HA基於ZK實現分析
上圖概括了Flink作業從Client端提交到到Flink集群的提交的基本流程[1]。
當運行./flink run
腳本提交用戶作業至Dispathcer后,Dispatcher會拉起JobManagerRunner,而后JobManagerRunner會向Zookeeper注冊競爭Leader。對於之前流程有興趣可以參考 深入理解Flink-On-Yarn模式
當JobManagerRunner
競爭成為Leader時,會調用JobManagerRunnerImpl#grantLeadership
,此時便開始處理作業,會通過如下的代碼調用路徑啟動JobMaster。
- JobManagerRunnerImpl#grantLeadership
- JobManagerRunnerImpl#verifyJobSchedulingStatusAndStartJobManager
- JobManagerRunnerImpl#startJobMaster。
startJobMaster方法會首先將該作業的ID寫入對應的ZK目錄並置為RUNNING狀態,寫入該目錄可用於在Dispathcer接收作業時,判斷該作業是否重復提交或恢復作業時使用;在JobManagerRunner調度作業時也在從ZK上拉取作業信息來判斷作業狀態,若為DONE狀態,則無需調度。啟動JobMaster時會先啟動其RPC Endpoint,以便與其他組件進行RPC調用,之后JobMaster便通過JobMaster#startJobExecution開始執行作業,執行作業前會有些前置校驗,如必須確保運行在主線程中;啟動JobMaster上的一些服務(組件),如TaskManager和ResourceManager的心跳管理;啟動SlotPool、Scheduler;重連至ResourceManager,並且在ZK中注冊監聽ResourceManager Leader的變化的Retriever等。
當初始化完JobMaster上相應服務(組件)后,便開始調度,會有如下代碼調用路徑 - JobMaster#start
- JobMaster#startJobExecution
- JobMaster#resetAndStartScheduler
- JobMaster#startScheduling
- SchedulerBase#startScheduling。
我們知道用戶編寫的作業是以JobGraph提交到Dispatcher,但是在實際調度時會將JobGraph轉化為ExecutionGraph,JobGraph生成ExecutionGraph是在SchedulerBase對象初始化的時候完成轉化,如下圖所示表示了典型的轉化過程(JobVertex與ExecutionJobVertex一一對應),而具體的轉化邏輯實現可參考如何生成ExecutionGraph及物理執行圖
在SchedulerBase初始化時生成ExecutionGraph后,之后便基於ExecutionGraph調度,而調度基類SchedulerBase默認實現為DefaultScheduler,會繼續通過DefaultScheduler#startSchedulingInternal調度作業,此時會將作業(ExecutionGraph)的狀態從CREATED狀態變更為RUNNING狀態,此時在Flink web界面查看任務的狀態便已經為RUNNING,但注意此時作業(各頂點)實際並未開始調度,頂點還是處於CREATED狀態,任作業狀態與頂點狀態不完全相關聯,有其各自的演化生命周期,具體可參考Flink作業調度[2];然后根據不同的策略EagerSchedulingStrategy(主要用於流式作業,所有頂點(ExecutionVertex)同時開始調度)和LazyFromSourcesSchedulingStrategy(主要用於批作業,從Source開始開始調度,其他頂點延遲調度)調度。
當提交流式作業時,會有如下代碼調用路徑:
- EagerSchedulingStrategy#startScheduling
- EagerSchedulingStrategy#allocateSlotsAndDeploy,在部署之前會根據待部署的ExecutionVertex生成對應的ExecutionVertexDeploymentOption,然后調用DefaultScheduler#allocateSlotsAndDeploy開始部署。同樣,在部署之前也需要進行一些前置校驗(ExecutionVertex對應的Execution的狀態必須為CREATED),接着將待部署的ExecutionVertex對應的Execution狀態變更為SCHEDULED,然后開始為ExecutionVertex分配Slot。會有如下的調用代碼路徑:
- DefaultScheduler#allocateSlots(該過程會ExecutionVertex轉化為ExecutionVertexSchedulingRequirements,會封裝包含一些location信息、sharing信息、資源信息等)
- DefaultExecutionSlotAllocator#allocateSlotsFor,該方法會開始逐一異步部署各ExecutionVertex,部署也是根據不同的Slot提供策略來分配,接着會經過如下代碼調用路徑層層轉發,SlotProviderStrategy#allocateSlot -> SlotProvider#allocateSlot(SlotProvider默認實現為SchedulerImpl) -> SchedulerImpl#allocateSlotInternal -> SchedulerImpl#internalAllocateSlot(該方法會根據vertex是否共享slot來分配singleSlot/SharedSlot),以singleSlot為例說明。
在分配slot時,首先會在JobMaster中SlotPool中進行分配,具體是先SlotPool中獲取所有slot,然后嘗試選擇一個最合適的slot進行分配,這里的選擇有兩種策略,即按照位置優先和按照之前已分配的slot優先;若從SlotPool無法分配,則通過RPC請求向ResourceManager請求slot,若此時並未連接上ResourceManager,則會將請求緩存起來,待連接上ResourceManager后再申請。
當ResourceManager收到申請slot請求時,若發現該JobManager未注冊,則直接拋出異常;否則將請求轉發給SlotManager處理,SlotManager中維護了集群所有空閑的slot(TaskManager會向ResourceManager上報自己的信息,在ResourceManager中由SlotManager保存Slot和TaskManager對應關系),並從其中找出符合條件的slot,然后向TaskManager發送RPC請求申請對應的slot。
等待所有的slot申請完成后,然后會將ExecutionVertex對應的Execution分配給對應的Slot,即從Slot中分配對應的資源給Execution,完成分配后可開始部署作業。
部署作業代碼調用路徑如下:
- DefaultScheduler#waitForAllSlotsAndDeploy
- DefaultScheduler#deployAll
- DefaultScheduler#deployOrHandleError
- DefaultScheduler#deployTaskSafe
- DefaultExecutionVertexOperations#deploy
- ExecutionVertex#deploy
- Execution#deploy(每次調度ExecutionVertex,都會有一個Execute,在此階段會將Execution的狀態變更為DEPLOYING狀態,並且為該ExecutionVertex生成對應的部署描述信息,然后從對應的slot中獲取對應的TaskManagerGateway,以便向對應的TaskManager提交Task)
- RpcTaskManagerGateway#submitTask(此時便將Task通過RPC提交給了TaskManager)。
TaskManager(TaskExecutor)在接收到提交Task的請求后,會經過一些初始化(如從BlobServer拉取文件,反序列化作業和Task信息、LibaryCacheManager等),然后這些初始化的信息會用於生成Task(Runnable對象),然后啟動該Task,其代碼調用路徑如下 Task#startTaskThread(啟動Task線程)-> Task#run(將ExecutionVertex狀態變更為RUNNING狀態,此時在FLINK web前台查看頂點狀態會變更為RUNNING狀態,另外還會生成了一個AbstractInvokable對象,該對象是FLINK銜接執行用戶代碼的關鍵,而后會經過如下調用
- AbstractInvokable#invoke(AbstractInvokable有幾個關鍵的子類實現, BatchTask/BoundedStreamTask/DataSinkTask/DataSourceTask/StreamTask/SourceStreamTask。對於streaming類型的Source,會調用StreamTask#invoke)
- StreamTask#invoke
- StreamTask#beforeInvoke
- StreamTask#initializeStateAndOpen(初始化狀態和進行初始化,這里會調用用戶的open方法(如自定義實現的source))-> StreamTask#runMailboxLoop,便開始處理Source端消費的數據,並流入下游算子處理。
至此作業從提交到資源分配及調度運行整體流程就已經分析完畢,對於流式作業而言,正常情況下其會一直運行,不會結束。
3. 總結
對於作業的運行,會先提交至Dispatcher,由Dispatcher拉起JobManagerRunner,在JobManagerRunner成為Leader后,便開始處理作業,首先會根據JobGraph生成對應的ExecutionGraph,然后開始調度,作業的狀態首先會變更為RUNNING,然后對各ExecutionVertex申請slot,申請slot會涉及JM與RM、TM之間的通信,當在TM上分配完slot后,便可將Task提交至TaskManager,然后TaskManager會為每個提交的Task生成一個單獨的線程處理。