以Flink源碼中自帶的WordCount為例,執行的入口從用戶程序的execute()函數入手,execute()的源碼如下:
1 public JobExecutionResult execute(String jobName) throws Exception { 2 StreamGraph streamGraph = getStreamGraph(); 3 streamGraph.setJobName(jobName); 4 JobGraph jobGraph = streamGraph.getJobGraph(); 5 . . . . . . . 6 LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(configuration, true); 7 try { 8 exec.start(); 9 return exec.submitJobAndWait(jobGraph, getConfig().isSysoutLoggingEnabled()); 10 } 11 finally { 12 transformations.clear(); 13 exec.stop(); 14 } 15 }
函數內部主要有getStreamGraph()、getJobGraph()、exec.start()、exec.submitJobAndWait()等。getStreamGraph()的作用是生成StreamGraph圖,getJobGraph()的作用是生成JobGraph的圖,exec.start()的作用是建立Client、JobManager、TaskManager三者之間通信初始化,exec.submitJobAndWait()的作用提交job並且等待job執行后的結果,該函數提供了任務執行調度執行的入口,進入Client類中,首先執行createUserCodeClassLoader()函數,創建用戶代碼的加載器,然后執行jobClient.SubmitJobAndWait(),進入JobClient類,在函數內部會執行submit函數,從該函數開始進入AKKA通信階段,首先會進入JobClientActor,會創建一個jobclientActor來對JobManager和client進行通信,當通信對象創建之后,會執行akka機制的ask函數,該函數的作用是發出一個消息,然后要求收到方給予回復。當消息發出之后,OnReceive()函數會收到actor發出的消息請求,然后調用handleMessage()方法來處理消息請求,該函數內部有connectToJobManager()方法,此方法內部的tryToSubmitJob()函數是正式提交任務的操作,主要做的工作就是uploadUserJars()上傳用戶程序的jar文件,接着會jobManager.tell()向JobManager發出一個submit消息請求。
當JobManager收到Client發送的消息之后,會執行JobManager內部的submitJob方法,
1 case SubmitJob(jobGraph, listeningBehaviour) => 2 val client = sender() 3 4 val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(), 5 jobGraph.getSessionTimeout) 6 log.info("liuzf---開始執行JobManager的submitJob()") 7 submitJob(jobGraph, jobInfo)
首先會把由client收到的job信息封裝在jobinfo中,然后把jobinfo以及job的任務圖jobGraph一起發送給submit()去執行,在JobManager的submit函數中處理的函數邏輯比較復雜,比較重要的函數執行過程如下:
1 private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = { 2 try { 3 libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys, 4 jobGraph.getClasspaths) 5 } 6 val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID) 7 8 } 9 executionGraph = ExecutionGraphBuilder.buildGraph() 10 try { 11 submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo)) 12 jobInfo.notifyClients( 13 decorateMessage(JobSubmitSuccess(jobGraph.getJobID))) 14 log.info(s"開始調度 job $jobId ($jobName).") 15 executionGraph.scheduleForExecution()
首先執行libraryCacheManager.registerJob(),向CacheManager進行注冊,請求緩存,然后執行getClassLoader()來加載用戶的代碼加載器,接下來會調用ExecutionGraph中的buildGraph()構造ExecutionGraph的並行化版本的執行圖,當邏輯執行圖構造完畢之后,這時候可以通知Client任務已經成功提交,並且提交過程結束。接下來會調用sheduleForExecution()來會整體的資源進行調度分配,主要是每個TaskManager中的slot的分配,並且當slot分配完成之后,所有的task的任務狀態發生改變,由CREATEDàSCHEDULED。接下分配完之后,接下來執行depolyToSlot()函數,就要進入部署狀態,同樣會執行transitionState()函數,將SCHEDULED狀態變為DEPOLYING狀態,接着的重要函數是shumitTask()函數,該函數會通過AKKA機制,向TaskManager發出一個submitTask的消息請求,TaskManager收到消息請求后,會執行submitTask()方法,該函數的重要執行過程如下:
1 public submitTask(){ 2 val task = new Task(. . . .) 3 log.info(s"Received task ${task.getTaskInfo.getTaskNameWithSubtasks()}") 4 val execId = tdd.getExecutionAttemptId 5 val prevTask = runningTasks.put(execId, task) 6 if (prevTask != null) { 7 runningTasks.put(execId, prevTask) 8 throw new IllegalStateException("TaskM}anager already contains a task for id " + execId) 9 } 10 task.startTaskThread() 11 sender ! decorateMessage(Acknowledge.get()) 12 }
首先執行Task的構造函數,生成具體物理執行的相關組件,比如ResultPartition等,最后創建執行Task的線程,然后調用startTaskThread()來啟動具體的執行線程,Task線程內部的run()方法承載了被執行的核心邏輯,該方法具體的內容為:
1 public void run() { 2 while (true) { 3 ExecutionState current = this.executionState; 4 if (current == ExecutionState.CREATED) { 5 if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) { 6 break; 7 } 8 } 9 invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass); 10 network.registerTask(this); 11 Environment env = new RuntimeEnvironment(. . . . ); 12 invokable.setEnvironment(env); 13 // ---------------------------------------------------------------- 14 // actual task core work 15 if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { 16 } 17 // notify everyone that we switched to running 18 notifyObservers(ExecutionState.RUNNING, null); 19 executingThread.setContextClassLoader(userCodeClassLoader); 20 // run the invokable 21 invokable.invoke(); 22 23 if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { 24 notifyObservers(ExecutionState.FINISHED, null); 25 } 26 Finally{ 27 // free the network resources 28 network.unregisterTask(this); 29 // free memory resources 30 if (invokable != null) { 31 memoryManager.releaseAll(invokable); 32 } 33 libraryCache.unregisterTask(jobId, executionId); 34 removeCachedFiles(distributedCacheEntries, fileCache);
首先執行transitionState()函數將TaskManager的狀態由CREATED轉變為DEPOLYING狀態,然后調用loadAndTrantiateInvokable()對用戶代碼打包成jar包,並且生成用戶代碼加載器,然后執行network.registerTask(),執行該函數之前,會執行NetworkEnvironment的構造函數,該類是TaskManager通信的主對象,主要用於跟蹤中間結果並負責所有的數據交換,在該類中會創建協助通信的關鍵部件,比如網絡緩沖池,連接管理器,結果分區管理器,結果分區可消費通知器等。當網絡對象准備完成后,創建一個運行環境,然后執行invoke.setEnvironment(env),將各種配置打包到運行環境中。
當運行環境准備之后,接下來到了具體分析任務執行的時候,首先會調用transitionState()函數將任務狀態由DEPOLYING改為RUNNING狀態,然后會調用notifyObservers()通知所有的task觀察者也改變狀態,然后執行setContextClassLoader()將執行的類加載器設置為用戶執行的加載器,然后執行invokable.invoke(),該函數是分界點,執行前用戶邏輯沒有被觸發,執行之后說明用戶邏輯已完成。當執行完成之后,調用transitionState()函數執行的RUNNING狀態改成FINISHED狀態。同樣調用notifyObservers()來通知其他觀察者改變狀態,最后,釋放資源。
總體的函數執行圖如下:因圖片太大------>>>>>
鏈接: https://pan.baidu.com/s/15F1rBAmTSmNrkC8I4GmKnA 密碼: du23