這篇文章主要介紹從命令行到任務在Driver端運行的過程
通過flink run 命令提交jar包運行程序
以yarn 模式提交任務命令類似於: flink run -m yarn-cluster XXX.jar
先來看一下腳本中的調用類
在flink.sh腳本中可以看到提交的命令走到了這樣一個外觀類上,用於提交job解析用戶命令行參數
在其main方法中
先會解析對應需要的flink參數包括flink-conf-dir等,接着
1處會根據是否有hadoop權限安全控制走對應的doas(),具體的執行邏輯為2處解析對應的用戶參數
拿到參數后會先將參數中的第一個先取出來作為action
這里我們只看job提交的,解析出來也就是run,然后將剩余的參數用於job運行
在job運行前會先解析剩余的參數,比如運行的jar文件地址,運行的主類名(沒有后面回去Manifest里面找)作為entryPoint入口,並行度等參數
接着
就用得到的這些參數構建program了,這里其實就是拿到了入口運行類的全額限定名,然后通過類加載器加載運行主類
接着,會根據運行時用戶的主類是否為Program的實現類(用戶可以直接返回plan)來設置對應的packageProgram的屬性program是否為空
那我們常規的提交main方法主類的這里就是空的,如果是主類實現progarm的就反射實例化了一個以后賦給它
接着,就是運行並且提交任務了
這里比較重要,yarn模式提交的話這里會調度整個集群,提交常見的異常
Couldn't deploy Yarn session cluster
就是從這個方法里面拋出的,與yarn有關
這里只看yarn的調度集群,因為standalone模式的話Jobmanager和TaskManager是已經啟動好的了不需要這里
其中走到了這個方法deployInternal()
可以看到這里就是申請AppMaster並且傳入了yarn模式啟動集群的類的全額限定名,其實就是這個類
用於啟動jobmanager,和standalone 的入口類
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
功能差不多,但是還有有區別,當這個yarnsourceManager類申請到contain的時候就會
就會去起對應的taskManager了
回到最開始,當集群調度完以后
運行用戶程序
其實就是調用了用戶的main方法,結束
后面就是job往jobmanager提交了,前面的文章有
總結:
通過一個外觀類解析用戶參數,拿到類名
調度集群啟動申請AppMaster,Contaion起JM,TM
然后類名通過類加載器加載類,然后反射實例調用用戶的main方法啟動Job