Flink命令行提交job (源碼分析)


這篇文章主要介紹從命令行到任務在Driver端運行的過程

通過flink run 命令提交jar包運行程序

以yarn 模式提交任務命令類似於: flink run -m yarn-cluster XXX.jar

先來看一下腳本中的調用類

 在flink.sh腳本中可以看到提交的命令走到了這樣一個外觀類上,用於提交job解析用戶命令行參數

在其main方法中

 先會解析對應需要的flink參數包括flink-conf-dir等,接着

 1處會根據是否有hadoop權限安全控制走對應的doas(),具體的執行邏輯為2處解析對應的用戶參數

 拿到參數后會先將參數中的第一個先取出來作為action

這里我們只看job提交的,解析出來也就是run,然后將剩余的參數用於job運行

 在job運行前會先解析剩余的參數,比如運行的jar文件地址,運行的主類名(沒有后面回去Manifest里面找)作為entryPoint入口,並行度等參數

接着

 就用得到的這些參數構建program了,這里其實就是拿到了入口運行類的全額限定名,然后通過類加載器加載運行主類

 接着,會根據運行時用戶的主類是否為Program的實現類(用戶可以直接返回plan)來設置對應的packageProgram的屬性program是否為空

 那我們常規的提交main方法主類的這里就是空的,如果是主類實現progarm的就反射實例化了一個以后賦給它

接着,就是運行並且提交任務了

 

 這里比較重要,yarn模式提交的話這里會調度整個集群,提交常見的異常

Couldn't deploy Yarn session cluster

就是從這個方法里面拋出的,與yarn有關

這里只看yarn的調度集群,因為standalone模式的話Jobmanager和TaskManager是已經啟動好的了不需要這里

其中走到了這個方法deployInternal()

 可以看到這里就是申請AppMaster並且傳入了yarn模式啟動集群的類的全額限定名,其實就是這個類

 用於啟動jobmanager,和standalone 的入口類 

org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint  

功能差不多,但是還有有區別,當這個yarnsourceManager類申請到contain的時候就會

 就會去起對應的taskManager了

回到最開始,當集群調度完以后

 運行用戶程序

其實就是調用了用戶的main方法,結束

后面就是job往jobmanager提交了,前面的文章有

 

總結:

  通過一個外觀類解析用戶參數,拿到類名

  調度集群啟動申請AppMaster,Contaion起JM,TM

  然后類名通過類加載器加載類,然后反射實例調用用戶的main方法啟動Job

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM