都知道Flink中的角色分為Jobmanager,TaskManger
在啟動腳本里面已經找到了jobmanager的啟動類org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint(local模式更簡單直接在Driver端的env.exection()直接啟動了,有興趣可以研究一下)
來看一下StandaloneSessionClusterEntrypoint啟動類的一些重要的方法
在main方法中會調用抽象類 ClusterEntrypoint.java的
在這里啟動了集群
在這個方法runCluster()中比較重要
其中1處初始化了一些ClusterEntrypoint.java中的一些服務像什么HA,blob,heartbeat,metricRegistry這些
還是主要看一下2處create()方法中
其中包括了一些創建以及啟動ResourceManager(有用於請求solt的RPC,初始化所有solt到resourceManager的soltManager的RPC(這個會在jobmanager接收到jobGraph后調用),TM心跳等),啟動web服務
來看一下ResourceManager的初始化
1處創建的services里面會包含一個soltManager但是里面並沒有solt,solt並沒有初始化
主要看下2處,創建了一個resourceManager對象這個抽象類實現了接口
這個接口下的幾個重要的RPC方法具體實現
這里可以看到是具體向resourceManager請求solt的RPC,另外一個
這個RPC會發送信息到resourceManager包括像taskManagaer有多少可分配的solt,哪些已分配的solt,solt的狀態等
然后在create()方法中就將這個resourceManager的PRC服務啟動起來了
起來以后
主要看一下這里
創建了一個Dispatcher調度對象
看下Dispatcher是用來干嘛的(StandaloneDispatcher都是調用了父類的初始化方法super()創建一個Dispatcher.java對象)
來看一下Dispatcher實現了什么接口(ResourceManager同理)
看一下實現的這個接口,當然還有一些其他的
具體實現
看到這里就應該很熟悉了
他實現了submitJob()接口用於啟動一個RPC,接受參數可以看到接受到一個JobGraph,這就意味着這和job任務啟動有關,后面隨緣更新到job啟動Graph轉換會提到
回到前面的Dispatcher.start()將傳入的rpcService啟動起來了,等待接受來自Driver端提交上來的JobGraph差不多啟動完成了
這里jobmanager其實還不完整,負責一些服務沒有起來,要等到Driver端的Jobgraph提交以后才會起來,像Coordinator后面隨緣到job提交會詳細的說一下