【Flink】Flink作業調度流程分析


1. 概述

當向Flink集群提交用戶作業時,從用戶角度看,只需要作業處理邏輯正確,輸出正確的結果即可;而不用關心作業何時被調度的,作業申請的資源又是如何被分配的以及作業何時會結束;但是了解作業在運行時的具體行為對於我們深入了解Flink原理有非常大的幫助,並且對我們如何編寫更合理的作業邏輯有指導意義,因此本文詳細分析作業的調度及資源分配以及作業的生命周期。

2. 流程分析

基於社區master主線(1.11-SNAPSHOT),commit: 12f7873db54cfbc5bf853d66ccd4093f9b749c9a ,HA基於ZK實現分析

Flink作業申請流程圖
上圖概括了Flink作業從Client端提交到到Flink集群的提交的基本流程[1]。

當運行./flink run腳本提交用戶作業至Dispathcer后,Dispatcher會拉起JobManagerRunner,而后JobManagerRunner會向Zookeeper注冊競爭Leader。對於之前流程有興趣可以參考 深入理解Flink-On-Yarn模式

JobManagerRunner競爭成為Leader時,會調用JobManagerRunnerImpl#grantLeadership,此時便開始處理作業,會通過如下的代碼調用路徑啟動JobMaster。

  • JobManagerRunnerImpl#grantLeadership
  • JobManagerRunnerImpl#verifyJobSchedulingStatusAndStartJobManager
  • JobManagerRunnerImpl#startJobMaster。
    startJobMaster方法會首先將該作業的ID寫入對應的ZK目錄並置為RUNNING狀態,寫入該目錄可用於在Dispathcer接收作業時,判斷該作業是否重復提交或恢復作業時使用;在JobManagerRunner調度作業時也在從ZK上拉取作業信息來判斷作業狀態,若為DONE狀態,則無需調度。啟動JobMaster時會先啟動其RPC Endpoint,以便與其他組件進行RPC調用,之后JobMaster便通過JobMaster#startJobExecution開始執行作業,執行作業前會有些前置校驗,如必須確保運行在主線程中;啟動JobMaster上的一些服務(組件),如TaskManager和ResourceManager的心跳管理;啟動SlotPool、Scheduler;重連至ResourceManager,並且在ZK中注冊監聽ResourceManager Leader的變化的Retriever等。
    當初始化完JobMaster上相應服務(組件)后,便開始調度,會有如下代碼調用路徑
  • JobMaster#start
  • JobMaster#startJobExecution
  • JobMaster#resetAndStartScheduler
  • JobMaster#startScheduling
  • SchedulerBase#startScheduling。

我們知道用戶編寫的作業是以JobGraph提交到Dispatcher,但是在實際調度時會將JobGraph轉化為ExecutionGraph,JobGraph生成ExecutionGraph是在SchedulerBase對象初始化的時候完成轉化,如下圖所示表示了典型的轉化過程(JobVertex與ExecutionJobVertex一一對應),而具體的轉化邏輯實現可參考如何生成ExecutionGraph及物理執行圖

JobGraph->ExecutionGraph

在SchedulerBase初始化時生成ExecutionGraph后,之后便基於ExecutionGraph調度,而調度基類SchedulerBase默認實現為DefaultScheduler,會繼續通過DefaultScheduler#startSchedulingInternal調度作業,此時會將作業(ExecutionGraph)的狀態從CREATED狀態變更為RUNNING狀態,此時在Flink web界面查看任務的狀態便已經為RUNNING,但注意此時作業(各頂點)實際並未開始調度,頂點還是處於CREATED狀態,任作業狀態與頂點狀態不完全相關聯,有其各自的演化生命周期,具體可參考Flink作業調度[2];然后根據不同的策略EagerSchedulingStrategy(主要用於流式作業,所有頂點(ExecutionVertex)同時開始調度)和LazyFromSourcesSchedulingStrategy(主要用於批作業,從Source開始開始調度,其他頂點延遲調度)調度。

當提交流式作業時,會有如下代碼調用路徑:

  • EagerSchedulingStrategy#startScheduling
  • EagerSchedulingStrategy#allocateSlotsAndDeploy,在部署之前會根據待部署的ExecutionVertex生成對應的ExecutionVertexDeploymentOption,然后調用DefaultScheduler#allocateSlotsAndDeploy開始部署。同樣,在部署之前也需要進行一些前置校驗(ExecutionVertex對應的Execution的狀態必須為CREATED),接着將待部署的ExecutionVertex對應的Execution狀態變更為SCHEDULED,然后開始為ExecutionVertex分配Slot。會有如下的調用代碼路徑:
  • DefaultScheduler#allocateSlots(該過程會ExecutionVertex轉化為ExecutionVertexSchedulingRequirements,會封裝包含一些location信息、sharing信息、資源信息等)
  • DefaultExecutionSlotAllocator#allocateSlotsFor,該方法會開始逐一異步部署各ExecutionVertex,部署也是根據不同的Slot提供策略來分配,接着會經過如下代碼調用路徑層層轉發,SlotProviderStrategy#allocateSlot -> SlotProvider#allocateSlot(SlotProvider默認實現為SchedulerImpl) -> SchedulerImpl#allocateSlotInternal -> SchedulerImpl#internalAllocateSlot(該方法會根據vertex是否共享slot來分配singleSlot/SharedSlot),以singleSlot為例說明。
    在分配slot時,首先會在JobMaster中SlotPool中進行分配,具體是先SlotPool中獲取所有slot,然后嘗試選擇一個最合適的slot進行分配,這里的選擇有兩種策略,即按照位置優先和按照之前已分配的slot優先;若從SlotPool無法分配,則通過RPC請求向ResourceManager請求slot,若此時並未連接上ResourceManager,則會將請求緩存起來,待連接上ResourceManager后再申請。

當ResourceManager收到申請slot請求時,若發現該JobManager未注冊,則直接拋出異常;否則將請求轉發給SlotManager處理,SlotManager中維護了集群所有空閑的slot(TaskManager會向ResourceManager上報自己的信息,在ResourceManager中由SlotManager保存Slot和TaskManager對應關系),並從其中找出符合條件的slot,然后向TaskManager發送RPC請求申請對應的slot。

等待所有的slot申請完成后,然后會將ExecutionVertex對應的Execution分配給對應的Slot,即從Slot中分配對應的資源給Execution,完成分配后可開始部署作業。
部署作業代碼調用路徑如下:

  • DefaultScheduler#waitForAllSlotsAndDeploy
  • DefaultScheduler#deployAll
  • DefaultScheduler#deployOrHandleError
  • DefaultScheduler#deployTaskSafe
  • DefaultExecutionVertexOperations#deploy
  • ExecutionVertex#deploy
  • Execution#deploy(每次調度ExecutionVertex,都會有一個Execute,在此階段會將Execution的狀態變更為DEPLOYING狀態,並且為該ExecutionVertex生成對應的部署描述信息,然后從對應的slot中獲取對應的TaskManagerGateway,以便向對應的TaskManager提交Task)
  • RpcTaskManagerGateway#submitTask(此時便將Task通過RPC提交給了TaskManager)。

TaskManager(TaskExecutor)在接收到提交Task的請求后,會經過一些初始化(如從BlobServer拉取文件,反序列化作業和Task信息、LibaryCacheManager等),然后這些初始化的信息會用於生成Task(Runnable對象),然后啟動該Task,其代碼調用路徑如下 Task#startTaskThread(啟動Task線程)-> Task#run(將ExecutionVertex狀態變更為RUNNING狀態,此時在FLINK web前台查看頂點狀態會變更為RUNNING狀態,另外還會生成了一個AbstractInvokable對象,該對象是FLINK銜接執行用戶代碼的關鍵,而后會經過如下調用

  • AbstractInvokable#invoke(AbstractInvokable有幾個關鍵的子類實現, BatchTask/BoundedStreamTask/DataSinkTask/DataSourceTask/StreamTask/SourceStreamTask。對於streaming類型的Source,會調用StreamTask#invoke)
  • StreamTask#invoke
  • StreamTask#beforeInvoke
  • StreamTask#initializeStateAndOpen(初始化狀態和進行初始化,這里會調用用戶的open方法(如自定義實現的source))-> StreamTask#runMailboxLoop,便開始處理Source端消費的數據,並流入下游算子處理。

至此作業從提交到資源分配及調度運行整體流程就已經分析完畢,對於流式作業而言,正常情況下其會一直運行,不會結束。

3. 總結

對於作業的運行,會先提交至Dispatcher,由Dispatcher拉起JobManagerRunner,在JobManagerRunner成為Leader后,便開始處理作業,首先會根據JobGraph生成對應的ExecutionGraph,然后開始調度,作業的狀態首先會變更為RUNNING,然后對各ExecutionVertex申請slot,申請slot會涉及JM與RM、TM之間的通信,當在TM上分配完slot后,便可將Task提交至TaskManager,然后TaskManager會為每個提交的Task生成一個單獨的線程處理。

參考

  1. https://www.infoq.cn/article/RWTM9o0SHHV3Xr8o8giT
  2. https://flink.sojb.cn/internals/job_scheduling.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM